Avro Data Validation with KafkaConnect
Overview
In certain scenarios, users could load data in Avro format to TigerGraph DB, via an external Kafka connector, such as MirrorMakerConnector and experience malformed data errors during this process. This generated vague error messages and loading failures.
The KafkaConnect feature flag ErrorTolerance
and the new transformations enables data loading services to handle malformed data and report errors effectively.
The KafkaConnect feature flag was introduced in 3.10.0. |
ErrorTolerance
ErrorTolerance
can now be set to none
or all
.
In errorTolance=none
mode, it will throw an exception to stop data loading immediately.
In errorTolerance=all
mode, the loading process continues even in the presence of malformed data.
New converters, specifically KafkaConnect transformations, are introduced to manage errors, recording error stack messages in headers.
The KafkaLoader reports these errors to KafkaStrm-LL and proceeds to process the subsequent records.
Users can query the loading job status to view error messages related to failed records when errorTolerance=all
mode is active.
Types of Avro Transformation
Two new Avro transformations are available:
Transformation | Notes |
---|---|
|
A schema registry (URL) is required. See note below. |
|
A schema registry is not required. |
For The Avro message will contain a |
How to Enable Avro Data Validation
To enable Avro data validation, configure Kafka Connector with ErrorTolerance
and the new transformation.
Below are the additional settings to be added to the connector config:
connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
...
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
header.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
transforms=TigerGraphAvroTransform
transforms.TigerGraphAvroTransform.type=com.tigergraph.kafka.connect.transformations.TigergraphAvroWithSchemaRegistryTransformation
transforms.TigerGraphAvroTransform.schema.registry.url=<URL to schema registry>
transforms.TigerGraphAvroTransform.errors.tolerance=all
|
Old Schema vs. New Schema:
Below are examples of the old schema and the new changes.
{"type":"record", "name":"companyInfo", "fields": [{"name": "company", "type": "string"}]}'
{"company":"TigerGraph"}
{"type":"record","name":"companyInfo","fields":[{"name":"company","type":"string"},{"name":"employeeNum","type":"int"}]}
{"company":"Tigergraph","employeeNum":23456}
KafkaConnect currently employing mm2 (MirrorMaker 2), encounters an issue when using the With the introduction of the new schema and transformation a failure occurs in converting older data that does not include the expected |
How to Show Data Malformation Errors
Here is an example command to query loading status:
curl -s http://$(gmyip):$(gadmin config get KafkaStreamLL.Port)/log-aggregation/local/loading-progress
If data malformation exists, the result will display errors aiding users in identifying problematic files or records, including:
-
errorCode: 55
-
Other error messages containing error stacks.