Data Stream Sources
Connect Validio to your streaming data infrastructure.
Connect Validio to your streaming data infrastructure to validate messages in real time. Validio consumes from your streams, validates each message against declared schemas, and detects data quality issues as they flow through your pipelines.
Supported Streams
Schema Considerations
When validating stream data, keep the following in mind:
- Schema inference requires data: Validio infers schemas from existing messages. If a stream is empty (for example, all events expired due to retention settings), schema inference cannot run until new messages arrive.
- Fields must match the declared schema: Fields added after the source is created are ignored. Missing fields are treated as empty, which affects any validators using those fields.
- Timestamp fields: Validio infers fields with
Timestampdata types. For Pub/Sub, Validio also infers avalidio_publish_timefield containing the message publish timestamp.
Cost and Performance
Each Validio source creates one consumer on your stream. If your stream and Validio are in different cloud regions, cross-region network costs may apply.
Message Formats
You must specify the message format when configuring a data stream source:
| Format | Schema requirement |
|---|---|
| JSON | Validio automatically infers the schema |
| AVRO | Upload an Apache Avro schema |
| PROTOBUF | Upload a Protocol Buffer schema (must be self-contained; imports from custom files are not supported) |
Protobuf Message Schema
The Protobuf Schema needs to describe the format of the message in the topic and without imports to custom files. Validio does not support references (Protobuf schemas that reference other Protobuf schemas). Any references to other nested messages should be included inline so that the uploaded schema is self-contained.
The following is an example of a valid Protobuf schema, which also demonstrates how to include a nested message:
syntax = "proto3";
import "google/protobuf/timestamp.proto";
message MyMessage {
message MyInnerNestedMessage {
int32 val1 = 1;
optional string val2 = 2;
}
message MyNestedMessage {
repeated int32 numvec = 1;
MyInnerNestedMessage inner_nested = 2;
}
int32 id = 1;
string values = 2;
MyNestedMessage nested = 3;
google.protobuf.Timestamp created = 4;
}Updated 11 days ago