Data Streaming Connector

TigerGraph Data Streaming Connector is a Kafka connector that provides fast and scalable data streaming between TigerGraph and other data systems.

1. Supported data systems

The data streaming connector supports the following data systems:

  • Google Cloud Storage (GCS)

  • Apache Kafka

2. Stream data from Google Cloud Storage

You can create a data connector between TigerGraph’s internal Kafka server and your GCS service with a specified topic. The connector will stream data from the data source in your GCS buckets to TigerGraph’s internal Kafka cluster. You can then create and run a loading job to load data from Kafka into the graph store using the Kafka loader.

Before you begin, make sure the following prerequisites are met:

2.1. Specify connector configurations

The connector configurations provide the following information:

  • Connector class

  • Your GCP service account credentials

  • Information on how to parse the source data

  • Mapping between connector and source file

2.1.1. Specify connector class

The connector class indicates what type of connector the configuration file is used to create. Connector class is specified by the connector.class key. For connecting to GCS, its value is set to com.tigergraph.kafka.connect.filesystem.FsSourceConnector.

2.1.2. Provide GCP account credentials

You need to provide following information for the connector to access the files in your GCS bucket. You should be able to find all items on this list in your service account key:

  • file.reader.settings.fs.gs.auth.service.account.email: The email address associated with the service account

  • file.reader.settings.fs.gs.auth.service.account.private.key.id: The private key ID associated with the service account used for GCS access.

  • file.reader.settings.fs.gs.auth.service.account.private.key: The private key associated with the service account used for GCS access.

  • file.reader.settings.client_email: The service account email.

  • file.reader.settings.fs.gs.project.id: Google Cloud Project ID with access to GCS buckets.

  • file.reader.settings.fs.gs.auth.service.account.enable: Whether to create objects for the parent directories of objects with / in their path. For example, creating gs://bucket/foo/ upon deleting or renaming gs://bucket/foo/bar.

  • file.reader.settings.fs.gs.impl: The FileSystem for gs: (GCS) uris. The value for this configuration should be com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.

  • file.reader.settings.fs.AbstractFileSystem.gs.impl: The AbstractFileSystem for gs: (GCS) URIs. Only necessary for use with Hadoop 2. The value for this configuration should be com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS.

2.1.3. Specify parsing rules

The streaming connector supports the following file types:

  • CSV files

  • JSON files. Each JSON object must be on a separate line.

  • directories

  • tar files

  • zip files

For URIs that point to directories and compressed files, the connector looks for all files inside the directory or the compressed file that match the file.regexp parameter. By default, this includes both CSV and JSON files.

If you set file.recursive to true, the connector looks for files recursively.

The following parsing options are available:

Name Description Default

file.regexp

The regular expression to filter which files to read. The default value matches all files.

.*

file.recursive

Whether to retrieve files recursively if the URI points to a directory.

true

file.reader.type

The type of file reader to use. The only supported value is text.

text

file.reader.delimited.separator

The character that separates columns. This parameter does not affect JSON files.

,

file.reader.delimited.quote

The explicit boundary markers for string tokens, either single or double quotation marks. This parameter does not affect JSON files.

The parser will not treat separator characters found within a pair of quotation marks as a separator.

Accepted values:

  • single: single quotes ' are boundary markers.

  • double: double quotes " are boundary markers.

  • Empty string "": Quotation marks are treated as ordinary characters.

Empty string ""

file.reader.delimited.value.default

The default value for a column when its value is null. This parameter does not affect JSON files.

Empty string ""

file.reader.batch.size

The maximum number of lines to include in a single batch.

1000

file.reader.text.eol

End of line character.

\n

file.reader.text.header

Whether the first line of the files is a header line. If you are using JSON files, set this parameter to false.

false

file.reader.text.archive.type

File type for archive files. Setting the value of this configuration to auto will allow the connector to decide file types automatically based on the file extensions. Accepted values:

  • tar

  • zip

  • gzip

  • none

  • auto

auto

file.reader.text.archive.extensions.tar

If a file has this extension, treat it as a tar file

tar

file.reader.text.archive.extensions.zip

If a file has this extension, treat it as a zip file

zip

file.reader.text.archive.extensions.gzip

If a file has this extension, treat it as a gzip file

gz, tgz

2.1.4. Map source file to connector

The below configurations are required:

Name Description Default

name

Name of the connector.

None. Must be provided by the user.

topic

Name of the topic to create in Kafka.

None. Must be provided by the user.

tasks.max

The maximum number of tasks which can run in parallel.

1

file.uris

The path(s) to the data files on Google Cloud Storage. The URI may point to a CSV file, a zip file, a gzip file, or a directory

None. Must be provided by the user.

Below is an example configuration:

connector.class=com.tigergraph.kafka.connect.filesystem.FsSourceConnector
file.reader.settings.fs.gs.auth.service.account.email=gcsconnect@example.iam.gserviceaccount.com
file.reader.settings.fs.gs.auth.service.account.private.key.id=55c1d79a46c1f3f598ec38
file.reader.settings.fs.gs.auth.service.account.private.key="-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDSqbYRwD68FvA7\nLkC1HpjrJ9QIJ+iOyQPFeSoI+3pjmVTrX2B2aYIMByNubV6Js+n1x5ro/XW0nt3y\nk/BAn8BgaYita8\nIAhLLOf/kFmFmmlM2k02iAZPIBjFvAs7ChGEHsXecQKBgQDq9AKPXaMOiy43EzHPf5JiDeMAMeltz
...
\nokX4AaXPZj5lOmhEii9V8oIa1msPE5AmGrRmQhhI82xVIdnrbVItZcOIUd+Tbs2K\nJZzA2Spvo3yxi2nFptqRk/xi2/8sVXQ8XllQs6UbAoGAdqnrlEAIlCb5hdVNrLXT\nToqdq54G9g82L8/Y+WraqJSNOFKXCGQvC2N16ava4sZ65DCjT6FnCR/UhYS7Z6Vf\nR5EtMRYAyAcyn3g9tcfzINmEbpvwpHBqsr1xPcrfx/WRurIC6EBgLPgX+lALBI0G\n+Uu87tgHhcGFJfmQMQNeQWM=\n-----END PRIVATE KEY-----\n"
file.reader.settings.client_email="gcsconnect@example.iam.gserviceaccount.com"
file.reader.settings.fs.gs.project.id="example"
file.reader.settings.fs.gs.auth.service.account.enable=true
file.reader.settings.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
file.reader.settings.fs.AbstractFileSystem.gs.impl="com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"

mode=eof
file.regexp=".*"
file.recursive=true
file.reader.type=text
file.reader.batch.size=10000
file.reader.text.eol="\\n"
file.reader.text.header=true
file.reader.text.archive.type=auto
file.reader.text.archive.extensions.tar=tar
file.reader.text.archive.extensions.zip=zip
file.reader.text.archive.extensions.gzip=tar.gz,tgz

[connector_person]
name = fs-person-demo-104
tasks.max=10
topic=person-demo-104
file.uris=gs://example/p.csv

[connector_friend]
name = fs-friend-demo-104
tasks.max=10
topic=friend-demo-104
file.uris=gs://example/f.csv

2.2. Create connector

Once you have prepared the configuration file, run command gadmin connector create and specify the configuration file to create the connector:

gadmin connector create --c <config_file>

The connectors start streaming from the data source immediately after creation if the configurations are valid. You can run gadmin connector status to verify the status of the connectors. If the configurations are valid, the connectors should be in RUNNING status. Data streamed from the source stays in the destination cluster Kafka topics.

If the destination cluster is TigerGraph’s internal Kafka cluster, the messages are removed as they are loaded in to the graph during the course of the loading job at regular intervals.

Automatic removal of loaded Kafka messages is an alpha feature and may be subject to change.

2.3. Create data source

Now that the connector has started loading data into TigerGraph’s internal Kafka cluster, you can create a data source and point it to the Kafka cluster:

  1. Create a data source configuration file. The only required field is the address of the broker. The broker’s IP and hostname is the private IP of your TigerGraph server and port of TigerGraph’s internal Kafka server:

    • If you don’t know your private IP, you can retrieve it by running gmyip in the bash terminal.

    • To retrieve the port of your Kafka cluster, run gadmin config get Kafka.Port to retrieve the port number. The default port is 30002.

    In the kafka.config field, you can provide additionally configurations from Kafka’s consumer configurations. For example, you can specify group.id to be tigergraph to specify the group that this consumer belongs to: :

    {
        "broker":"10.128.0.240:30002", (1)
        "kafka_config":
            {
                "session.timeout.ms":"20000"
            }
    }
    1 Make sure to use the internal ID of the machine instead of localhost for the hostname of the broker. localhost only works for single-server instances.
  2. Run CREATE DATA SOURCE to create the data source:

    CREATE DATA_SOURCE KAFKA datasource_name FOR GRAPH Graph_Name

2.4. Create loading job

Create a loading job to load data from the data source:

  1. Create a topic-partition configuration for each topic.

    {
      "topic": <topic_name>, (1)
      "partition_list": [ (2)
        {
          "start_offset": <offset_value>, (3)
          "partition": <partition_number> (4)
        },
        {
          "start_offset": <offset_value>, (3)
          "partition": <partition_number> (4)
        }
        ...
      ]
    }
    1 Replace <topic_name> with the name of the topic this configuration applies to.
    2 List of partitions you want to stream from. For each partition, you can set a start offset. If this list is empty, or partition_list isn’t included, all partitions are used with the default offset.
    3 Replace <offset_value> with the offset value you want. The default offset for loading is -1, which means you will load data from the most recent message in the topic. If you want to load from the beginning of a topic, the start_offset value should be -2.
    4 Replace <partition_number> with the partition number if you want to configure.
  2. Create a loading job and map data to graph. See Kafka loader guide for how to map data from a Kafka data source to the graph. See Loading JSON data on how to create a loading job for JSON data.

Known bug: to use the -1 value for offset, delete the start_offset key instead of setting it to -1.

For example, suppose we have the following two CSV files and schema:

  • Schema

  • p.csv

  • f.csv

CREATE VERTEX person (PRIMARY_ID name STRING, name STRING, age INT, gender STRING, state STRING)
CREATE UNDIRECTED EDGE friendship (FROM person, TO person, connect_day DATETIME)
CREATE GRAPH social (person, friendship)
name,gender,age,state
Tom,male,40,ca
Dan,male,34,ny
Jenny,female,25,tx
Kevin,male,28,az
Amily,female,22,ca
Nancy,female,20,ky
Jack,male,26,fl
A,male,29,ca
person1,person2,date
Tom,Dan,2017-06-03
Tom,Jenny,2015-01-01
Dan,Jenny,2016-08-03
Jenny,Amily,2015-06-08
Dan,Nancy,2016-01-03
Nancy,Jack,2017-03-02
Dan,Kevin,2015-12-30
Amily,Dan,1990-1-1

The following topic-partition configurations and loading job will load the two CSV files into the graph:

  • topic_person.json

  • topic_friend.json

  • Loading job

{
  "topic": "person-demo-104",
  "partition_list": [
    {
      "start_offset": -2,
      "partition": 0
    }
  ]
}
{
  "topic": "friend-demo-104",
  "partition_list": [
    {
      "start_offset": -2,
      "partition": 0
    }
  ]
}
CREATE LOADING JOB load_person FOR GRAPH social {
    DEFINE FILENAME f1 = "$k1:/home/mydata/topic_person.json";
    DEFINE FILENAME f2 = "$k1:/home/mydata/topic_friend.json";
    LOAD f1 TO VERTEX person VALUES ($0, $0, $2, $1, $3) USING separator=",";
    LOAD f2 TO EDGE friendship VALUES ($0, $1, $2)  USING separator=",";
}

2.4.1. Run loading job

Run the loading job created in the last step will load the streamed data into the graph. If you make changes to the topic-partition configuration file, you can overwrite the values for the filename variables with the USING clause.

GSQL > RUN LOADING JOB load_person

By default, loading jobs that use Kafka data sources run in streaming mode and do not stop until manually aborted. As data is streamed from the data source, the running loading job will continuously ingest the streamed data into the graph store.

2.5. Stream data source updates

To load updates to the data source after you’ve created the connectors, you need to delete the connector and recreate another connector.

In the future, the streaming data connector will support automatically scanning for updates and stream data to TigerGraph.

3. Stream data from an external Kafka cluster

You can create a data connector to stream data from an external Kafka cluster (source cluster) to a destination cluster, which is often TigerGraph’s internal Kafka cluster. If the data from the source cluster is in AVRO format, the connector can transform the data into JSON format. You can then create and run a loading job to load data from the destination cluster into graphs in your database using the Kafka loader.

Before you begin, make sure the following prerequisites are met:

  • Your TigerGraph instance has access to your external Kafka cluster

3.1. Specify connector configurations

Before creating the connector, you must provide the required connector configurations. This includes basic configurations and subsection configurations.

The data streaming connector for external Kafka data is based on Apache Kafka MirrorMaker and shares many configurations as MirrorMaker.

3.1.1. Basic configurations

The configurations in this subsection specify basic characteristics of the connector such as connector class, the addresses of the source and destination clusters, as well as the converters used to covert Kafka Connect format and the serialized form that is written to Kafka.

The following configurations are required:

Field Description Example value

connector.class

Must be set to org.apache.kafka.connect.mirror.MirrorSourceConnector. This class of connectors connect a source Kafka cluster to a destination Kafka cluster.

connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector

source.cluster.alias

Alias of source Kafka cluster in string format

source.cluster.alias=source-cluster

target.cluster.alias

Alias of destination Kafka cluster in string format

target.cluster.alias=tg-cluster

source.cluster.bootstrap.servers

Address of the external source Kafka cluster

source.cluster.bootstrap.servers=localhost:9092

target.cluster.bootstrap.servers

Address of destination Kafka cluster, which is the Kafka component inside TigerGraph. The ip address can be fetched via gmyip, and default port is 30002.

target.cluster.bootstrap.servers=localhost:30002

source→target.enabled

Enables source-to-target mirroring. This field must be set to true.

source→target.enabled=true

topics

Topic names to be loaded into TigerGraph, split by comma ,

topics=test-topic

key.converter

Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, Must be set to org.apache.kafka.connect.converters.ByteArrayConverter.

key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

header.converter

Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the headers in messages written to or read from Kafka. Must be set to org.apache.kafka.connect.converters.ByteArrayConverter.

header.converter=org.apache.kafka.connect.converters.ByteArrayConverter

value.converter

Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka.

  • If records are in Avro format with Schema Registry service, this field should be com.tigergraph.kafka.connect.converters.TigerGraphAvroConverter.

  • If records are in Avro format without using Schema Registry service, this field should be com.tigergraph.kafka.connect.converters.TigerGraphAvroConverterWithoutSchemaRegistry.

  • If records are in plaintext or JSON format, this field should be org.apache.kafka.connect.converters.ByteArrayConverter.

value.converter=com.tigergraph.kafka.connect.converters.TigerGraphAvroConverter

Other configuration options are available to fine-tune the connector. You can find all configuration options in Kafka MirrorMaker Configurations.

3.1.2. SSL/SASL configurations (optional)

If the source cluster has configured SSL or SASL protocols, you need to provide SSL/SASL configurations to the connector in order to communicate with the source cluster.

You need to upload the following credential files to your TigerGraph cluster:

  • If the source cluster uses SASL, you need to upload the keytab of each Kerberos principal to every node of your TigerGraph cluster at the same absolute path.

  • If the source cluster uses SSL, you need to upload the trust store and key store to every node of your TigerGraph cluster. Each file must be at the same absolute path on all nodes.

  • If the source cluster uses SASL and SSL, you need to upload the keytab of each Kerberos principal, as well as the key store and truststore to every node of your TigerGraph cluster. Each file must be at the same absolute path on all nodes.

The following configurations are required for admin, producer and consumer. To supply the configuration for the corresponding component, replace <prefix> with source.admin, producer, or consumer. For example, to specify GSSAPI as the SASL mechanism for producer, put producer.sasl.mecahnism=GSSAPI:

Field Description

<prefix>.security.protocol

Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.

<prefix>.sasl.mechanism

SASL mechanism used for client connections. This may be any mechanism for which a security provider is available. GSSAPI is the default mechanism

<prefix>.sasl.kerberos.service.name

The Kerberos principal name that Kafka runs as. This can be defined either in Kafka’s JAAS config or in Kafka’s config.

<prefix>.sasl.jaas.config

JAAS login context parameters for SASL connections in the format used by JAAS configuration files. See JAAS Login Configuration File for

<prefix>.ssl.endpoint.identification.algorithm

The endpoint identification algorithm to validate server hostname using server certificate. Default is https. Disable server host name verification by setting ssl.endpoint.identification.algorithm to an empty string

<prefix>.ssl.keystore.location

The location of the key store file.

<prefix>.ssl.keystore.password

The password of the key store file.

<prefix>.ssl.key.password

The password of the private key in the key store file or the PEM key specified in ssl.keystore.key.

<prefix>.ssl.truststore.location

The location of the trust store file.

<prefix>.ssl.truststore.password

The password for the trust store file.

3.1.3. Subsection configurations

The subsection configurations come after the other configurations, and specify the names of the connectors you want to create using the provided configurations before the subsections, as well as the maximum number of tasks that can be created for the connector.

You can create one or more connectors from one configuration file by providing a subsection for each connector you want to create. In the case of the MirrorMaker connector class used in this procedure, we suggest you only create one connector from each file, due to limited available configurations in the subsection. If you create multiple connectors, they will have almost identical properties.

Start a subsection with a section title enclosed in square brackets. This title has no effect on the connector to be created, but the titles need to be unique in the configuration file.

The following subsection configurations are available:

Field Description

name

Name of the connector.

tasks.max

The maximum number of tasks that to be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism. The value for this configuration should not be larger than the number of partitions of the topic(s) in source Kafka. The default value is 1.

3.1.4. Example

The following is an example configuration file:

connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
source.cluster.alias=hello
target.cluster.alias=world
source.cluster.bootstrap.servers=source.kafka.server:9092
target.cluster.bootstrap.servers=localhost:30002
source->target.enabled=true
topics=avro-without-registry-topic
replication.factor=1
sync.topic.acls.enabled=false
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
emit.heartbeats.interval.seconds=5
world.scheduled.rebalance.max.delay.ms=35000
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
header.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=com.tigergraph.kafka.connect.converters.TigerGraphAvroConverterWithoutSchemaRegistry

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=GSSAPI
producer.sasl.kerberos.service.name=kafka
producer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/path/to/kafka-producer.keytab\" principal=\"kafka-producer@TIGERGRAPH.COM\";
producer.ssl.endpoint.identification.algorithm=
producer.ssl.keystore.location=/path/to/client.keystore.jks
producer.ssl.keystore.password=******
producer.ssl.key.password=******
producer.ssl.truststore.location=/path/to/client.truststore.jks
producer.ssl.truststore.password=******

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=GSSAPI
consumer.sasl.kerberos.service.name=kafka
consumer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/path/to/kafka-consumer.keytab\" principal=\"kafka-consumer@TIGERGRAPH.COM\";
consumer.ssl.endpoint.identification.algorithm=
consumer.ssl.keystore.location=/path/to/client.keystore.jks
consumer.ssl.keystore.password=******
consumer.ssl.key.password=******
consumer.ssl.truststore.location=/path/to/client.truststore.jks
consumer.ssl.truststore.password=******

source.admin.security.protocol=SASL_SSL
source.admin.sasl.mechanism=GSSAPI
source.admin.sasl.kerberos.service.name=kafka
source.admin.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/path/to/kafka-admin.keytab\" principal=\"kafka-admin@TIGERGRAPH.COM\";
source.admin.ssl.endpoint.identification.algorithm=
source.admin.ssl.keystore.location=/path/to/client.keystore.jks
source.admin.ssl.keystore.password=******
source.admin.ssl.key.password=******
source.admin.ssl.truststore.location=/path/to/client.truststore.jks
source.admin.ssl.truststore.password=******

[connector_1]
name=avro-test-without-registry
tasks.max=10

The file in the example creates a connector named avro-test-without-registry with all configurations specified in the file.

3.2. Create connector

Once you have prepared the configuration file, run command gadmin connector create and specify the configuration file to create the connector:

gadmin connector create --c <config_file>

The connectors start streaming from the data source immediately after creation if the configurations are valid. You can run gadmin connector status to verify the status of the connectors. If the configurations are valid, the connectors should be in RUNNING status. Data streamed from the source stays in the destination cluster Kafka topics.

If the destination cluster is TigerGraph’s internal Kafka cluster, the messages are removed as they are loaded in to the graph during the course of the loading job at regular intervals.

Automatic removal of loaded Kafka messages is an alpha feature and may be subject to change.

3.3. Create data source

Now that the connector has started loading data into TigerGraph’s internal Kafka cluster, you can create a data source and point it to the Kafka cluster:

  1. Create a data source configuration file. The only required field is the address of the broker. The broker’s IP and hostname is the private IP of your TigerGraph server and port of TigerGraph’s internal Kafka server:

    • If you don’t know your private IP, you can retrieve it by running gmyip in the bash terminal.

    • To retrieve the port of your Kafka cluster, run gadmin config get Kafka.Port to retrieve the port number. The default port is 30002.

    In the kafka.config field, you can provide additionally configurations from Kafka’s consumer configurations. For example, you can specify group.id to be tigergraph to specify the group that this consumer belongs to: :

    {
        "broker":"10.128.0.240:30002", (1)
        "kafka_config":
            {
                "session.timeout.ms":"20000"
            }
    }
    1 Make sure to use the internal ID of the machine instead of localhost for the hostname of the broker. localhost only works for single-server instances.
  2. Run CREATE DATA SOURCE to create the data source:

    CREATE DATA_SOURCE KAFKA datasource_name FOR GRAPH Graph_Name

3.4. Create loading job

Create a loading job to load data from the data source:

  1. Create a topic-partition configuration for each topic.

    {
      "topic": <topic_name>, (1)
      "partition_list": [ (2)
        {
          "start_offset": <offset_value>, (3)
          "partition": <partition_number> (4)
        },
        {
          "start_offset": <offset_value>, (3)
          "partition": <partition_number> (4)
        }
        ...
      ]
    }
    1 Replace <topic_name> with the name of the topic this configuration applies to.
    2 List of partitions you want to stream from. For each partition, you can set a start offset. If this list is empty, or partition_list isn’t included, all partitions are used with the default offset.
    3 Replace <offset_value> with the offset value you want. The default offset for loading is -1, which means you will load data from the most recent message in the topic. If you want to load from the beginning of a topic, the start_offset value should be -2.
    4 Replace <partition_number> with the partition number if you want to configure.
  2. Create a loading job and map data to graph. See Kafka loader guide for how to map data from a Kafka data source to the graph. See Loading JSON data on how to create a loading job for JSON data.

Known bug: to use the -1 value for offset, delete the start_offset key instead of setting it to -1.

For example, suppose we have the following graph schema with only one vertex type and the following JSON line data in the destination Kafka:

  • Schema

  • Data

CREATE VERTEX Company (primary_id name STRING, employee_num INT)
CREATE GRAPH Company_Graph (Company)
{"company":"TigerGraph","employee_num":346}
{"company":"Apple","employee_num":373704}
{"company":"Google","employee_num":277618}

To create the loading job, we start by creating a data source configuration file and point it to the Kafka cluster and create the data source:

  • Data source configuration

  • Create data source

    {
        "broker":"localhost:30002",
        "kafka_config":
            {
                "group.id": "tigergraph"
            }
    }

Run the following command in GSQL and replace <path_to_datasource_file with the absolute path to your data source file to create the data source.

CREATE DATA_SOURCE KAFKA k1=<path_to_datasource_file>

Then we create a topic-partition configuration file, and then write the loading job. In the LOAD statement, map the JSON line fields to the corresponding attributes of the vertex types:

  • Topic-partition configuration file

  • Loading job

{
    "topic": "Hello.avro-with-registry-topic",
    "default_start_offset": -2
}
CREATE LOADING JOB load_company FOR GRAPH Company_Graph {
    DEFINE FILENAME f1 =
      "$k1:<path_to_topic_partition_configuration>}";
    LOAD f1 TO VERTEX company VALUES ($"company", $"employee_num")
      USING JSON_FILE="true";
}

Once the loading job is created, run the loading job to load data into the graph:

RUN LOADING JOB -noprint load_company USING EOF="true"

There are two modes for the Kafka Loader: streaming mode and EOF mode. The default mode is streaming mode. In streaming mode, loading never stops until the job is aborted. In EOF mode, loading stops after consuming the current Kafka messages.

4. Manage connectors

After creating a connector, you can choose to delete it or pause it. You can also use gadmin commands to view status of all existing connectors.

4.1. List connectors

You can list all running connectors or view detailed configuration of a specific connector.

To view a list of all connectors, run gadmin connector list from your bash terminal as the TigerGraph Linux user.

To view configuration details of a specific connector, run gadmin connector list <name_of_connector> and replace <name_of_connector> with the name of the connector you want to inspect.

4.2. Pause a connector

Pausing a connector stops the connector from streaming data from the data source. The data that has been streamed to TigerGraph’s internal Kafka can still be loaded into the graph store via a loading job.

To pause a connector, run the below command and replace <connector_name> with the name of the connector:

$ gadmin connector pause <connector_name>

4.3. Resume a connector

Resuming a connector resumes streaming for a paused connector.

To resume a connector, run the below command and replace <connector_name> with the name of the connector:

$ gadmin connector resume <connector_name>

4.4. Delete a connector

Deleting a connector removes a connector. It stops the connector from streaming, but the data that has been streamed to Kafka can still be ingested by a loading job. This action cannot be undone and a removed connector cannot be resumed.

To delete a connector, run the below command and replace <connector_name> with the name of the connector:

$ gadmin connector delete <connector_name>

5. Known issues

Messages in TigerGraph’s internal Kafka cluster are automatically removed from the topics at regular intervals. There are several known issues with this process:

  • Messages are only removed if the loading job is actively running. If the loading job finishes much sooner before the interval is reached, the messages are not removed.

  • If loading job uses EOF mode, meaning the loading job will terminate as soon as it finishes, it is likely some partial data will be left in the topic.

  • If a topic is deleted and recreated while a loading job on the topic is running, the data in the topic may get removed.

Automatic message removal is an alpha feature and may be subject to change.