Data Stream Sources

Connect Validio to your streaming data infrastructure.

Connect Validio to your streaming data infrastructure to validate messages in real time. Validio consumes from your streams, validates each message against declared schemas, and detects data quality issues as they flow through your pipelines.

Supported Streams

  • Kafka — Apache Kafka
  • Kinesis — Amazon Kinesis Data Streams
  • Pub/Sub — Google Cloud Pub/Sub

Schema Considerations

When validating stream data, keep the following in mind:

  • Schema inference requires data: Validio infers schemas from existing messages. If a stream is empty (for example, all events expired due to retention settings), schema inference cannot run until new messages arrive.
  • Fields must match the declared schema: Fields added after the source is created are ignored. Missing fields are treated as empty, which affects any validators using those fields.
  • Timestamp fields: Validio infers fields with Timestamp data types. For Pub/Sub, Validio also infers a validio_publish_time field containing the message publish timestamp.

Cost and Performance

Each Validio source creates one consumer on your stream. If your stream and Validio are in different cloud regions, cross-region network costs may apply.

Message Formats

You must specify the message format when configuring a data stream source:

FormatSchema requirement
JSONValidio automatically infers the schema
AVROUpload an Apache Avro schema
PROTOBUFUpload a Protocol Buffer schema (must be self-contained; imports from custom files are not supported)

Protobuf Message Schema

The Protobuf Schema needs to describe the format of the message in the topic and without imports to custom files. Validio does not support references (Protobuf schemas that reference other Protobuf schemas). Any references to other nested messages should be included inline so that the uploaded schema is self-contained.

The following is an example of a valid Protobuf schema, which also demonstrates how to include a nested message:

syntax = "proto3";
import "google/protobuf/timestamp.proto";

message MyMessage {
	message MyInnerNestedMessage {
		int32 val1 = 1;
		optional string val2 = 2;
	}

	message MyNestedMessage {
		repeated int32 numvec = 1;
		MyInnerNestedMessage inner_nested = 2;
	}

	int32 id = 1;
	string values = 2;
	MyNestedMessage nested = 3;
	google.protobuf.Timestamp created = 4;
}