Avro Data Validation with KafkaConnect

Overview

In certain scenarios, users could load data in Avro format to TigerGraph DB, via an external Kafka connector, such as MirrorMakerConnector and experience malformed data errors during this process. This generated vague error messages and loading failures.

The KafkaConnect feature flag ErrorTolerance and the new transformations enables data loading services to handle malformed data and report errors effectively.

The KafkaConnect feature flag was introduced in 3.10.0.

ErrorTolerance

ErrorTolerance can now be set to none or all.

In errorTolance=none mode, it will throw an exception to stop data loading immediately.

In errorTolerance=all mode, the loading process continues even in the presence of malformed data. New converters, specifically KafkaConnect transformations, are introduced to manage errors, recording error stack messages in headers. The KafkaLoader reports these errors to KafkaStrm-LL and proceeds to process the subsequent records. Users can query the loading job status to view error messages related to failed records when errorTolerance=all mode is active.

Types of Avro Transformation

Two new Avro transformations are available:

Transformation Notes

TigergraphAvroWithSchemaRegistryTransformation

A schema registry (URL) is required. See note below.

TigergraphAvroWithoutSchemaRegistryTransformation

A schema registry is not required.

For TigergraphAvroWithSchemaRegistryTransformation, users need to set up a web service exposing this URL.

The Avro message will contain a scheme ID, the Transformation (or previous converter) will fetch the schema with the schema ID via the URL.

How to Enable Avro Data Validation

To enable Avro data validation, configure Kafka Connector with ErrorTolerance and the new transformation.

Below are the additional settings to be added to the connector config:

connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
...
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
header.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
transforms=TigerGraphAvroTransform
transforms.TigerGraphAvroTransform.type=com.tigergraph.kafka.connect.transformations.TigergraphAvroWithSchemaRegistryTransformation
transforms.TigerGraphAvroTransform.schema.registry.url=<URL to schema registry>
transforms.TigerGraphAvroTransform.errors.tolerance=all
  • In the above settings, MirrorSourceConnector is the connector.

  • The transformation type is set to TigergraphAvroWithSchemaRegistryTransformation, but alternatively, it could be set to TigergraphAvroWithoutSchemaRegistryTransformation.

Old Schema vs. New Schema:

Below are examples of the old schema and the new changes.

Old schema:
{"type":"record", "name":"companyInfo", "fields": [{"name": "company", "type": "string"}]}'
Old data:
{"company":"TigerGraph"}
New Schema:
{"type":"record","name":"companyInfo","fields":[{"name":"company","type":"string"},{"name":"employeeNum","type":"int"}]}
Example data added to Avro file:
{"company":"Tigergraph","employeeNum":23456}

KafkaConnect currently employing mm2 (MirrorMaker 2), encounters an issue when using the TigergraphAvroWithSchemaRegistryTransformation with a new schema.

With the introduction of the new schema and transformation a failure occurs in converting older data that does not include the expected employeeNum field.

How to Show Data Malformation Errors

Here is an example command to query loading status:

curl -s http://$(gmyip):$(gadmin config get KafkaStreamLL.Port)/log-aggregation/local/loading-progress

If data malformation exists, the result will display errors aiding users in identifying problematic files or records, including:

  • errorCode: 55

  • Other error messages containing error stacks.