# Stream Data From an External Kafka Cluster

You can create a data connector to stream data from an external Kafka cluster (source cluster) to a destination cluster, which is often TigerGraph’s internal Kafka cluster. If the data from the source cluster is in AVRO format, the connector can transform the data into JSON format. You can then create and run a loading job to load data from the destination cluster into graphs in your database using the Kafka loader.

## 2. Procedure

### 2.1. Specify connector configurations

Before creating the connector, you must provide the required connector configurations. This includes basic configurations and subsection configurations.

The data streaming connector for external Kafka data is based on Apache Kafka MirrorMaker and shares many configurations as MirrorMaker.

#### 2.1.1. Basic configurations

The configurations in this subsection specify basic characteristics of the connector such as connector class, the addresses of the source and destination clusters, as well as the converters used to covert Kafka Connect format and the serialized form that is written to Kafka.

The following configurations are required:

Field Description Example value

`connector.class`

Must be set to `org.apache.kafka.connect.mirror.MirrorSourceConnector`. This class of connectors connect a source Kafka cluster to a destination Kafka cluster.

`connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector`

`source.cluster.alias`

Alias of source Kafka cluster in string format

`source.cluster.alias=source-cluster`

`target.cluster.alias`

Alias of destination Kafka cluster in string format

`target.cluster.alias=tg-cluster`

`source.cluster.bootstrap.servers`

Address of the external source Kafka cluster

`source.cluster.bootstrap.servers=localhost:9092`

`target.cluster.bootstrap.servers`

Address of destination Kafka cluster, which is the Kafka component inside TigerGraph. The ip address can be fetched via gmyip, and default port is 30002.

`target.cluster.bootstrap.servers=localhost:30002`

`source→target.enabled`

Enables source-to-target mirroring. This field must be set to `true`.

`source→target.enabled=true`

`topics`

Topic names to be loaded into TigerGraph, split by comma `,`

`topics=test-topic`

`key.converter`

Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, Must be set to `org.apache.kafka.connect.converters.ByteArrayConverter`.

`key.converter=org.apache.kafka.connect.converters.ByteArrayConverter`

`header.converter`

Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the headers in messages written to or read from Kafka. Must be set to `org.apache.kafka.connect.converters.ByteArrayConverter`.

`header.converter=org.apache.kafka.connect.converters.ByteArrayConverter`

`value.converter`

Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka.

• If records are in Avro format with Schema Registry service, this field should be `com.tigergraph.kafka.connect.converters.TigerGraphAvroConverter`.

• If records are in Avro format without using Schema Registry service, this field should be `com.tigergraph.kafka.connect.converters.TigerGraphAvroConverterWithoutSchemaRegistry`.

• If records are in plaintext or JSON format, this field should be `org.apache.kafka.connect.converters.ByteArrayConverter`.

`value.converter=com.tigergraph.kafka.connect.converters.TigerGraphAvroConverter`

`value.converter.schema.registry.url`

URL of the schema registry service. Required if `value.converter` is `com.tigergraph.kafka.connect.converters.TigerGraphAvroConverterWithoutSchemaRegistry`. For details, see Confluent documentation.

`http://localhost:8081`

Other configuration options are available to fine-tune the connector. You can find all configuration options in Kafka MirrorMaker Configurations.

#### 2.1.2. SSL/SASL configurations (optional)

If the source cluster has configured SSL or SASL protocols, you need to provide SSL/SASL configurations to the connector in order to communicate with the source cluster.

You need to upload the following credential files to your TigerGraph cluster:

• If the source cluster uses SASL, you need to upload the keytab of each Kerberos principal to every node of your TigerGraph cluster at the same absolute path.

• If the source cluster uses SSL, you need to upload the trust store and key store to every node of your TigerGraph cluster. Each file must be at the same absolute path on all nodes.

• If the source cluster uses SASL and SSL, you need to upload the keytab of each Kerberos principal, as well as the key store and truststore to every node of your TigerGraph cluster. Each file must be at the same absolute path on all nodes.

The following configurations are required for admin, producer and consumer. To supply the configuration for the corresponding component, replace `<prefix>` with `source.admin`, `producer`, or `consumer`. For example, to specify `GSSAPI` as the SASL mechanism for producer, put `producer.sasl.mecahnism=GSSAPI`:

Field Description

`<prefix>.security.protocol`

Protocol used to communicate with brokers. Valid values are: `PLAINTEXT`, `SSL`, `SASL_PLAINTEXT`, `SASL_SSL`.

`<prefix>.sasl.mechanism`

SASL mechanism used for client connections. This may be any mechanism for which a security provider is available. GSSAPI is the default mechanism

`<prefix>.sasl.kerberos.service.name`

The Kerberos principal name that Kafka runs as. This can be defined either in Kafka’s JAAS config or in Kafka’s config.

`<prefix>.sasl.jaas.config`

JAAS login context parameters for SASL connections in the format used by JAAS configuration files. See JAAS Login Configuration File for details.

`<prefix>.ssl.endpoint.identification.algorithm`

The endpoint identification algorithm to validate server hostname using server certificate. Default is https. Disable server host name verification by setting `ssl.endpoint.identification.algorithm` to an empty string

`<prefix>.ssl.keystore.location`

The location of the key store file.

`<prefix>.ssl.keystore.password`

The password of the key store file.

`<prefix>.ssl.key.password`

The password of the private key in the key store file or the PEM key specified in `ssl.keystore.key`.

`<prefix>.ssl.truststore.location`

The location of the trust store file.

`<prefix>.ssl.truststore.password`

The password for the trust store file.

#### 2.1.3. Subsection configurations

The subsection configurations come after the other configurations, and specify the names of the connectors you want to create using the provided configurations before the subsections, as well as the maximum number of tasks that can be created for the connector.

You can create one or more connectors from one configuration file by providing a subsection for each connector you want to create. In the case of the MirrorMaker connector class used in this procedure, we suggest you only create one connector from each file, due to limited available configurations in the subsection. If you create multiple connectors, they will have almost identical properties.

Start a subsection with a section title enclosed in square brackets. This title has no effect on the connector to be created, but the titles need to be unique in the configuration file.

The following subsection configurations are available:

Field Description

`name`

Name of the connector.

`tasks.max`

The maximum number of tasks that to be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism. The value for this configuration should not be larger than the number of partitions of the topic(s) in source Kafka. The default value is 1.

#### 2.1.4. Example

The following is an example configuration file:

``````connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
source.cluster.alias=hello
target.cluster.alias=world
source.cluster.bootstrap.servers=source.kafka.server:9092
target.cluster.bootstrap.servers=localhost:30002
source->target.enabled=true
topics=avro-without-registry-topic
replication.factor=1
sync.topic.acls.enabled=false
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
emit.heartbeats.interval.seconds=5
world.scheduled.rebalance.max.delay.ms=35000
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=com.tigergraph.kafka.connect.converters.TigerGraphAvroConverterWithoutSchemaRegistry

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=GSSAPI
producer.sasl.kerberos.service.name=kafka
producer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/path/to/kafka-producer.keytab\" principal=\"kafka-producer@TIGERGRAPH.COM\";
producer.ssl.endpoint.identification.algorithm=
producer.ssl.keystore.location=/path/to/client.keystore.jks
producer.ssl.truststore.location=/path/to/client.truststore.jks

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=GSSAPI
consumer.sasl.kerberos.service.name=kafka
consumer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/path/to/kafka-consumer.keytab\" principal=\"kafka-consumer@TIGERGRAPH.COM\";
consumer.ssl.endpoint.identification.algorithm=
consumer.ssl.keystore.location=/path/to/client.keystore.jks
consumer.ssl.truststore.location=/path/to/client.truststore.jks

[connector_1]
name=avro-test-without-registry

The file in the example creates a connector named `avro-test-without-registry` with all configurations specified in the file.

### 2.2. Create connector

Once you have prepared the configuration file, run command `gadmin connector create` and specify the configuration file to create the connector:

``gadmin connector create --c <config_file>``

The connectors start streaming from the data source immediately after creation if the configurations are valid. You can run `gadmin connector status` to verify the status of the connectors. If the configurations are valid, the connectors should be in `RUNNING` status. Data streamed from the source stays in the destination cluster Kafka topics.

If the destination cluster is TigerGraph’s internal Kafka cluster, the messages are removed as they are loaded in to the graph during the course of the loading job at regular intervals.

 Automatic removal of loaded Kafka messages is an alpha feature and may be subject to change.

### 2.3. Create data source

Now that the connector has started loading data into TigerGraph’s internal Kafka cluster, you can create a data source and point it to the Kafka cluster:

1. Create a data source configuration file. The only required field is the address of the broker. The broker’s IP and hostname is the private IP of your TigerGraph server and port of TigerGraph’s internal Kafka server:

• If you don’t know your private IP, you can retrieve it by running `gmyip` in the bash terminal.

• To retrieve the port of your Kafka cluster, run `gadmin config get Kafka.Port` to retrieve the port number. The default port is 30002.

In the `kafka.config` field, you can provide additionally configurations from Kafka’s consumer configurations. For example, you can specify `group.id` to be `tigergraph` to specify the group that this consumer belongs to: :

``````{
"broker":"10.128.0.240:30002", (1)
"kafka_config":
{
"session.timeout.ms":"20000"
}
}``````
 1 Make sure to use the internal ID of the machine instead of `localhost` for the hostname of the broker. `localhost` only works for single-server instances.
2. Run `CREATE DATA SOURCE` to create the data source:

``CREATE DATA_SOURCE KAFKA datasource_name FOR GRAPH Graph_Name``

1. Create a topic-partition configuration for each topic.

``````{
"topic": <topic_name>, (1)
"partition_list": [ (2)
{
"start_offset": <offset_value>, (3)
"partition": <partition_number> (4)
},
{
"start_offset": <offset_value>, (3)
"partition": <partition_number> (4)
}
...
]
}``````
 1 Replace `` with the name of the topic this configuration applies to. 2 List of partitions you want to stream from. For each partition, you can set a start offset. If this list is empty, or `partition_list` isn’t included, all partitions are used with the default offset. 3 Replace `` with the offset value you want. The default offset for loading is `-1`, which means you will load data from the most recent message in the topic. If you want to load from the beginning of a topic, the `start_offset` value should be `-2`. 4 Replace `` with the partition number if you want to configure.
2. Create a loading job and map data to graph. See Kafka loader guide for how to map data from a Kafka data source to the graph. See Loading JSON data on how to create a loading job for JSON data.

 Known bug: to use the `-1` value for offset, delete the `start_offset` key instead of setting it to `-1`.

For example, suppose we have the following graph schema with only one vertex type and the following JSON line data in the destination Kafka:

• Schema

• Data

``````CREATE VERTEX Company (primary_id name STRING, employee_num INT)
CREATE GRAPH Company_Graph (Company)``````
``````{"company":"TigerGraph","employee_num":346}
{"company":"Apple","employee_num":373704}

To create the loading job, we start by creating a data source configuration file and point it to the Kafka cluster and create the data source:

• Data source configuration

• Create data source

``````    {
"broker":"localhost:30002",
"kafka_config":
{
"group.id": "tigergraph"
}
}``````

Run the following command in GSQL and replace `<path_to_datasource_file` with the absolute path to your data source file to create the data source.

``CREATE DATA_SOURCE KAFKA k1=<path_to_datasource_file>``

Then we create a topic-partition configuration file, and then write the loading job. In the `LOAD` statement, map the JSON line fields to the corresponding attributes of the vertex types:

• Topic-partition configuration file

``````{
"topic": "Hello.avro-with-registry-topic",
"default_start_offset": -2
}``````
``````CREATE LOADING JOB load_company FOR GRAPH Company_Graph {
DEFINE FILENAME f1 =
"$k1:<path_to_topic_partition_configuration>}"; LOAD f1 TO VERTEX company VALUES ($"company", \$"employee_num")
USING JSON_FILE="true";
}``````

``RUN LOADING JOB -noprint load_company USING EOF="true"``