This document walks you through how to configure and use the Kafka Loader to load data into graphs in your TigerGraph instance.

1. Prerequisites

• You should have access to a running Kafka cluster.

2. Configure Kafka loader

Once you have the external Kafka cluster setup, you need to prepare the following two configuration files and place them in your desired location in the TigerGraph system:

2.1. Kafka data source configuration file

The data source configuration file’s content, structured as a JSON object, describes the Kafka server’s global settings, including the data source IP address and port. There may be one or more brokers. The different broker IP addresses are separated by commas.

A sample `kafka.conf` is shown in the following example:

``````{
"broker": "192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092"
}``````

The `broker` key is required, and its value is a fully qualified domain name (or IP) and port. Additional Kafka configuration parameters may be provided (see Kafka documentation) by using the optional `"kafka_config"` key. For its value, provide a list of key-value pairs. For example:

``````{
"broker": "localhost:9092",
"kafka_config": {"session.timeout.ms":"20000"}
}``````

2.1.1. SSL/SASL configuration

If you want to configure the Kafka loader to use SSL or SASL for encryption and authentication, you need to specify the configurations in the data source configuration files as well. For details, see SSL/SASL Authentication for Kafka Loader.

2.2. Kafka topic and partition configuration file

The topic-partition configuration file tells the TigerGraph system exactly which Kafka records to read. Similar to the data source configuration file described above, the contents are in JSON object format.

There are two root-level fields in the configuration file:

`topic`

Required. Specifies the Kafka topic to read from.

`partition_list`

Optional. Specifies which topic partitions to read and what start offsets to use. If the `partition_list` key is missing or empty, all partitions in this topic will be used for loading. The default offset for loading is `-1`, which means you will load data from the most recent message in the topic, that is, the end of the topic. If you want to load from the beginning of a topic, the `start_offset` value should be `-2`.

An example file is shown below:

Example Kafka topic and partition configuration file
``````{
"topic": "topicName1",
"partition_list": [
{
"start_offset": -1,
"partition": 0
},
{
"start_offset": -1,
"partition": 1
},
{
"start_offset": -1,
"partition": 2
}
]
}``````

You can also overwrite the default offset by setting `"default_start_offset"` in the Kafka topic configuration file. For example,

``````{
"topic": "topicName1" (1)
}

{
"topic": "topicName1",
"partition_list": [] (1)
}

{
"topic": "topicName1",
"default_start_offset": 0 (2)
}``````
 1 All partition will be used if the config file doesn’t provide `partition_list` or if the partition list is empty. 2 This overwrites the default start offset to 0.

3. Use Kafka loader

There are three basic steps to use the Kafka Loader to load data:

The GSQL syntax for the Kafka Loader is designed to be consistent with the existing GSQL loading syntax.

3.1. Define the data source

Before starting a Kafka data loading job, you need to define the Kafka server as a data source. The `CREATE DATA_SOURCE` statement defines a data source variable with a subtype of `KAFKA`:

``GSQL > CREATE DATA_SOURCE KAFKA example_data_source``

After the data source is created, then use the `SET` command to specify the path to a configuration file for that data source. Each time when the config file is updated, you must run `SET example_data_source` to update the data source details in the dictionary.

``GSQL > SET example_data_source = "/path/to/kafka.conf" (1)``
 1 If you have a TigerGraph cluster, the configuration file must be on machine m1, where the GSQL server and GSQL client both reside, and it must be in JSON format. If the configuration file uses a relative path, the path should be relative to the GSQL client working directory.

For simplicity, you can merge the `CREATE DATA_SOURCE` and `SET` statements:

``GSQL > CREATE DATA_SOURCE KAFKA data_source_name = "/path/to/kafka.conf"``

To further simplify, instead of specifying the Kafka data source config file path, you can also directly provide the Kafka data source configuration as a string argument, as shown below:

``GSQL > CREATE DATA_SOURCE KAFKA data_source_name = "{\"broker\":\"broker.full.domain.name:9092\"}"``
 The above simplified statement is useful for using Kafka Data Loader in TigerGraph Cloud. In TigerGraph Cloud (tgcloud.io), you can use GSQL web shell to define and create Kafka data sources, without creating the Kafka data source configuration file in filesystem.

The Kafka Loader uses the same basic `CREATE LOADING JOB` syntax used for standard GSQL loading jobs. A `DEFINE FILENAME` statement should be used to assign a loader `FILENAME` variable to a Kafka data source name and the path to its config file.

In addition, the filename can be specified in the `RUN LOADING JOB` statement with the `USING` clause. The filename value set by a `RUN` statement overrides the value set in the `CREATE LOADING JOB`.

 If you are loading JSON data, the data needs to be in the JSON lines format instead of regular JSON.

3.2.1. Syntax

In the syntax, `$DATA_SOURCE_NAME` is the Kafka data source name, and the path points to a configuration file with topic and partition information of the Kafka server. The Kafka configuration file must be in JSON format. ``````DEFINE FILENAME filevar "=" [filepath_string | data_source_string]; data_source_string =$DATA_SOURCE_NAME":"<path_to_configfile>``````

Example: Load a Kafka data source `k1`, where the path to the topic-partition configuration file is `"~/topic_partition_conf.json"`:

``DEFINE FILENAME f1 = "$k1:~/topic_partition_conf.json";`` Instead of specifying the config file path, you can also provide the topic-partition configuration as a string argument, as shown below: ``````DEFINE FILENAME f1 = "$k1:~/topic_partition_config.json";
DEFINE FILENAME f1 = "$k1:{\"topic\":\"zzz\",\"default_start_offset\":2,\"partition_list\":[]}";`````` 3.3. Run loading job The Kafka Loader uses the same `RUN LOADING JOB` statement that is used for GSQL loading from files. Each filename variable can be assigned a string `<data_source_name>:<topic_partition_configuration_filepath>`, which will override the value defined in the loading job. In the example below, the config files for f3 and f4 are being set by the `RUN command`, whereas f1 is using the config which was specified in the `CREATE LOADING JOB` statement. ``RUN LOADING JOB job1 USING f1, f3="$k1:~/topic_part3_config.json", f4="$k1:~/topic_part4_config.json", EOF="true";``  A `RUN LOADING JOB` command may only use one type of data source. For example, you may not mix both Kafka data sources and regular file data sources in one loading job. All filename variables in one loading job statement must refer to the same `DATA_SOURCE` variable. There are two modes for the Kafka Loader: streaming mode and EOF mode. The default mode is streaming mode. In streaming mode, loading will never stop until the job is aborted. In EOF mode, loading will stop after consuming the current Kafka message. To set EOF mode, an optional parameter is added to the `RUN LOADING JOB` syntax: ``````RUN LOADING JOB [USING (<using-param> | <using-param> [, <using-param>]*)] <using-param> ::= filevar [="filepath_string"] | CONCURRENCY="cnum" | BATCH_SIZE="bnum" | EOF=("true"|"false")`````` To learn about each option and parameter of the `RUN LOADING JOB` command, see Loading job options. 4. Examples Here is an example code for loading data through Kafka Loader: 4.1. Loading CSV data • GSQL script • Data source configuration • Topic-partition configurations ``````// Create data_source kafka k1 = "kafka_config.json" for graph test_graph CREATE DATA_SOURCE KAFKA k1 FOR GRAPH test_graph SET k1 = "kafka.conf" // Define the loading jobs CREATE LOADING JOB load_person FOR GRAPH test_graph { DEFINE FILENAME f1 = "$k1:topic_partition_config.json";
TO VERTEX Person VALUES ($2,$0, $1), TO EDGE Person2Comp VALUES ($0, $1,$2)
USING SEPARATOR=",";
}

// Load the data

The following is the data source configuration file `kafka.conf`:

kafka.conf
```{
"broker": "localhost:9092",
"kafka_config": {"session.timeout.ms":"20000"}
}```

The following is topic-partition configuration file `topic_partition_config.json`:

topic_partition_config.json
```{
"topic": "topicName1",
"partition_list": [
{
"start_offset": -1,
"partition": 0
},
{
"start_offset": -1,
"partition": 1
},
{
"start_offset": -1,
"partition": 2
}
]
}```

``````CREATE LOADING JOB realTimeLoader FOR GRAPH storeGraph {
DEFINE FILENAME users = "$kafkaLoader:/home/tigergraph/config/topic-users.config"; DEFINE FILENAME transactions = "$kafkaLoader:/home/tigergraph/config/topic-transactions.config";
LOAD users TO VERTEX user VALUES ($"payload":"userid",$"payload":"regionid", $"payload":"gender") USING JSON_FILE="true"; LOAD transactions TO EDGE hasTransaction VALUES ($"payload":"user_id", $"payload":"transaction_id") USING JSON_FILE="true"; LOAD transactions TO VERTEX transaction VALUES ($"payload":"transaction_id", $"payload":"card_id",$"payload":"user_id", $"payload":"purchase_id",$"payload":"store_id") USING JSON_FILE="true";