Kafka is a popular pub-sub system in enterprise IT, offering a distributed and fault-tolerant real-time data pipeline. TigerGraph's Kafka Loader feature lets you easily integrate with a Kafka cluster and speed up your real time data ingestion. It is easily extensible using the many plugins available in the Kafka ecosystem.
The Kafka Loader consumes data in a Kafka cluster and loads into TigerGraph system.
From a high level, a user provides instructions to the TigerGraph system through GSQL, and the external Kafka cluster loads data into TigerGraph's RestPP server. The following diagram demonstrates the Kafka Loader data architecture.
You should have a Kafka cluster configured and set up in your environment.
Once you have the external Kafka cluster setup, you need to prepare the following two configuration files and place them in your desired location in TigerGraph system:
Kafka data source configuration file: This file includes the external Kafka broker's domain name and port. Through the configuration file, TigerGraph system knows the location and port of the external Kafka broker. Please see an example in Step 1. Define the Data Source.
Kafka topic and partition configuration file: This file includes the Kafka topic, partition list, and start offset for the loading messages. Please see an example in Step 2. Create a Loading Job.
There are three basic steps:
The GSQL syntax for the Kafka Loader is designed to be consistent with the existing GSQL loading syntax.
Before starting a Kafka data loading job, you need to define the Kafka server as a data source. The CREATE DATA_SOURCE statement defines a data_source variable with a subtype of KAFKA:
Kafka Data Source Configuration File
After the data source is created, then use the SET command to specify the path to a configuration file for that data source.
This SET command reads, validates, and applies the configuration file, integrating its settings into TigerGraph's dictionary. The data source configuration file's content, structured as a JSON object, describes the Kafka server's global settings, including the data source ip and port. A sample kafka.conf is shown in the following example:
The "broker" key is required. Additional Kafka configuration parameters may be provided (see Kafka documentation) by using the optional "kafka_config" key. For its value, provide a list of key-value pairs. For example:
For simplicity, you can merge the CREATE DATA_SOURCE and SET statements:
ADVANCED: MultiGraph Support
The Kafka Loader supports the TigerGraph MultiGraph feature. In the MultiGraph context, a data source can be either global or local:
A global data source can only be created by a superuser, who can grant it to any graph.
An admin user can only create a local data source, which cannot be accessed by other graphs.
The following are examples of permitted DATA_SOURCE operations.
A superuser may create a global level data source without assigning it to a particular graph:
2. A superuser may grant/revoke a data source to/from one or more graphs:
3. An admin user may create a local data source for a specified graph which they administer:
A data_source variable can be dropped by a user who has privilege. A global data source can only be dropped by a superuser. A local data_source can only be dropped by an admin for the relevant graph or by a superuser. The syntax for the DROP command is as follows:
Below is an example of several legal kafka data_source create and drop commands.
The SHOW DATA_SOURCE command will display a summary of all existing data_sources for which the user has privilege:
The Kafka Loader uses the same basic CREATE LOADING JOB syntax used for standard GSQL loading jobs. A DEFINE FILENAME statement should be used to assign a loader FILENAME variable to a Kafka data source name and the path to its config file.
In addition, the filename can be specified in the RUN LOADING JOB statement with the USING clause. The filename value set by a RUN statement overrides the value set in the CREATE LOADING JOB.
Below is the syntax for DEFINE FILENAME for use with the Kakfa Loader. In the syntax, $DATA_SOURCE_NAME is the Kafka data source name, and the path points to a configuration file with topic and partition information of the Kafka server. The Kafka configuration file must be in JSON format.
Example: Load a Kafka Data Source k1, where the path to the topic-partition configuration file is "~/topic_partition1.conf":
Kafka Topic-Partition Configuration File
The topic-partition configuration file tells the TigerGraph system exactly which Kafka records to read. Similar to the data source configuration file described above, the contents are in JSON object format. An example file is shown below:
The "topic" key is required. Optionally, a "partition_list" array can be included to specify which topic partitions to read and what start offsets to use. If the "partition_list" key is missing or empty, all partitions in this topic will be used for loading. The default offset for loading is "-1", which means you will load data from the most recent message in the topic, i.e., the end of the topic. If you want to load from the beginning of a topic, the "start_offset" value should be "-2".
You can also overwrite the default offset by setting "default_start_offset" in the Kafka topic configuration file. For example,
Instead of specifying the config file path, you can also directly provide the topic-partition configuration as a string argument, as shown below:
The Kafka Loader uses the same RUN LOADING JOB statement that is used for GSQL loading from files. Each filename variable can be assigned a string "DATA_SOURCE Var:topic_partition configure", which will override the value defined in the loading job. In the example below, the config files for f3 and f4 are being set by the RUN command, whereas f1 is using the config which was specified in the CREATE LOADING JOB statement.
All filename variables in one loading job statement must refer to the same DATA_SOURCE variable.
There are two modes for the Kafka Loader: streaming mode and EOF mode. The default mode is streaming mode. In streaming mode, loading will never stop until the job is aborted. In EOF mode, loading will stop after consuming the current Kafka message.
To set EOF mode, an optional parameter is added to the RUN LOADING JOB syntax:
Kafka Loader loading jobs are managed the same way as native loader jobs. The three key commands are
SHOW LOADING STATUS
ABORT LOADING JOB
RESUME LOADING JOB
For example, the syntax for the SHOW LOADING STATUS command is as follows:
To refer to a specific job instance, using the job_id which is provided when RUN LOADING JOB is executed. For each loading job, the above command reports the following information :
current loaded offset for each partition
average loading speed
See Inspecting and Managing Loading Jobs for more details.
Here is an example code for loading data through Kafka Loader: