GSQL Data Streaming Connector
The TigerGraph Data Streaming Connector is an integrated Kafka connector that provides fast and scalable data streaming between TigerGraph and external data sources.
For data coming from a separate Kafka instance, you need to perform a few more steps to set up data streaming. This process has its own separate instructions.
Architecture overview
The connector streams data from a source data system into TigerGraph’s internal Kafka cluster in the form of Kafka messages in specified topics. The messages are then ingested by a Kafka loading job and loaded into the database.

Multiple connectors can run at the same time, streaming data from various sources into the TigerGraph system concurrently.
TigerGraph automatically sets up the streaming connector and Kafka job when an external data source is specified during loading job creation.
Create the loading job
Loading jobs for the Data Streaming Connector are created and run in GSQL. Refer to the full Creating a Loading Job documentation for more detail about loading jobs in general.
Sample loading job syntax
This example uses the LDBC_SNB dataset in CSV format.
USE GRAPH test_graph
CREATE DATA_SOURCE s1 = "ds_config.json" FOR GRAPH test_graph
CREATE LOADING JOB stream_csv FOR GRAPH test_graph {
DEFINE FILENAME file_Comment = "$s1:s3://s3-loading-test/tg_ldbc_snb/sf0.1_csv/dynamic/Comment";
DEFINE FILENAME file_Person = "$s1:s3://s3-loading-test/tg_ldbc_snb/sf0.1_csv/dynamic/Person";
DEFINE FILENAME file_Comment_hasCreator_Person = "$s1:s3://s3-loading-test/tg_ldbc_snb/sf0.1_csv/dynamic/Comment_hasCreator_Person";
LOAD file_Comment
TO VERTEX Comment VALUES ($1, $0, $2, $3, $4, $5) USING header="true", separator="|";
LOAD file_Person
TO VERTEX Person VALUES ($1, $2, $3, $4, $5, $0, $6, $7, SPLIT($8,";"), SPLIT($9,";")) USING header="true", separator="|";
LOAD file_Comment_hasCreator_Person
TO EDGE HAS_CREATOR VALUES ($1, $2) USING header="true", separator="|";
}
In short, the steps are:
-
Specify a graph, in this case
test_graph
. -
Create a data source for GSQL to use, here named
s1
, and associate it with a JSON configuration file. -
Create the loading job.
-
Set up the files. In this example there are three files, one for Comment vertices, one for Person vertices, and one for edges showing that the comments have Person creators. Each of the files is stored in a different S3 data source. Credentials for accessing the data source are found in the configuration file from step 2.
-
Load each file to the appropriate data type in the graph (vertices or edges). As part of the
LOAD
command, specify which columns of the CSV file to use.
The following sections give more details about each of these steps.
Specify a graph and config file
USE GRAPH test_graph
CREATE DATA_SOURCE s1 = "ds_config.json" FOR GRAPH test_graph
In this example, we create a data source and name it s1
.
Older versions of TigerGraph required a keyword after DATA_SOURCE
such as STREAM
or KAFKA
.
This is still supported, but no longer required. Therefore, CREATE DATA_SOURCE STREAM s1
is equally valid.
We associate our data source with the ds_config.json
file.
AWS uses the standard IAM credential provider and uses your access key for authentication.
Access keys can be used for an individual user or for an IAM role. See Using IAM Roles for Amazon EC2 for more information.
{
"type": "s3",
"access.key": "<access key>",
"secret.key": "<secret key>"
}
{
"type": "gcs",
"project_id": "<project id>",
"private_key_id": "<private key id>",
"private_key": "<private key>",
"client_email": "<email address>"
}
The service key JSON file can be created and downloaded following the Google Cloud tutorial.
After following the tutorial, the JSON file can be used after changing the type
entry to gcs
.
{
"type" : "abs",
"account.key" : "<account key>"
}
{
"type":"bigquery",
"ProjectId":"tigergraph-dev",
"OAuthType":2,
"parameters" : {
"OAuthRefreshToken":"<refresh token>",
"OAuthClientId":"<client ID>.apps.googleusercontent.com",
"OAuthClientSecret":"<client secret>"
}
}
For more information on authorization types for Google BigQuery, see this PDF guide: Magnitude Simba Google BigQuery JDBC Data Connector.
JSON data can also be sent inline. Inline syntax is required for JSON configurations on TigerGraph Cloud instances.
CREATE DATA_SOURCE s1 = "{
type: s3,
access.key: <access key>,
secret.key: <secret key>
}" FOR GRAPH test_graph
String literals can be enclosed with a double quote "
, triple double quotes """
, or triple single quotes '''
.
Double quotes "
in the JSON can be omitted if the key name does not contain a colon :
or comma ,
.
CREATE DATA_SOURCE s1 = """{
"type": "s3",
"access.key": "<access key>",
"secret.key": "<secret key>"
}""" FOR GRAPH test_graph
Key names accept a separator of either a period .
or underscore _
, so for example, a key could also be named access_key
.
Define the filename
Filenames can be defined with a URI to a separate file, a supplied JSON-formatted file, or JSON-formatted inline content supplied in the command.
Use the following examples to create the DEFINE FILENAME
command based on your data type.
DEFINE FILENAME file_name = "$[data source name]:[URI]";
DEFINE FILENAME file_name = "$[data source name]:[json config file]";
DEFINE FILENAME file_name = "$[data source name]:[inline json content]";
DEFINE FILENAME file_name = "$[data source name]:[SQL]";
DEFINE FILENAME file_name = "$[data source name]:[json config file]";
DEFINE FILENAME file_name = "$[data source name]:[inline json content]";
Here are some examples with Google BigQuery getting data from a SQL query.
DEFINE FILENAME bq_sql = "$s1:SELECT id, firstName, lastName, gender, birthday, creationDate, locationIP, browserUsed, language, email FROM `tigergraph-ldbc-benchmark.snb_bi_sf01.Person`";
DEFINE FILENAME bq_inline_json = """$s1:{
"query":"SELECT id, creationDate, locationIP, browserUsed, content, length, CreatorPersonId FROM `tigergraph-ldbc-benchmark.snb_bi_sf01.Comment`",
"partition":4
}""";
DEFINE FILENAME bq_inline_json = """$s1:myfile.json""";
If you use a separate JSON file, it must follow the same format as the inline example shown.
Here are some examples of different DEFINE FILENAME
statements.
If the filename is in URI format and refers to a folder or prefix, all files in that folder or with that prefix are loaded.
The filename can be used as a parameter when running loading jobs.
DEFINE FILENAME uri_s3 = "$s1:s3://s3-loading-test/tg_ldbc_snb/sf0.1_csv/dynamic/Comment";
DEFINE FILENAME uri_gcs = "$s1:gs://tg_ldbc_snb/sf0.1_csv/dynamic/Person";
DEFINE FILENAME uri_abs = "$s1:abfss://person@yandblobstorage.dfs.core.windows.net/persondata.csv";
DEFINE FILENAME parquet_s3 = """$s1:{"file.uris":"s3://s3-loading-test/tg_ldbc_snb/sf0.1_parquet/dynamic/Comment", "file.type":"parquet"}""";
DEFINE FILENAME csv_gcs="""$s1:{
"file.uris": "gs://tg_ldbc_snb/sf0.1_csv/dynamic/Person",
"file.type": "text",
"partition": 6
}""";
DEFINE FILENAME uri_s3 = "$s1:myfile.json";
Define the parameters
These are the parameters that should be in the JSON-formatted configuration.
Parameter | Description | Required | Default value |
---|---|---|---|
|
The URI or URIs split by a comma. |
Required |
N/A |
|
The file type. Use |
Optional |
If the file extension is |
|
The number of partitions to use. When loading data, each partition is distributed evenly across each node. If one filename contains much more data than others, consider using a larger partition number. |
Optional |
The default value is calculated by |
|
The batch size of the loading job, referring to the number of CSV lines or JSON objects that will be loaded. |
Optional |
|
|
If a directory of files is loaded as an input, this parameter determines whether the data loader will load files recursively from subdirectories. |
Optional |
|
|
Whether to interpret filenames as containing regular expressions to filter filenames to be loaded. Uses Java regular expression patterns. |
Optional |
|
|
The default value for any field left empty. |
Optional |
|
|
The file type for archive files. Accepted values: |
Optional |
|
|
The number of threads used to download data. |
Optional |
|
Define the attributes where data will be loaded
In this stage, we define which attributes of vertices and edges will receive data from the external data source.
When loading a CSV, column names are specified by their numerical indices rather than header text.
In this example, the first and second columns in the CSV are loaded to vertices of Comment
type and the third and fourth columns are loaded to edges of HAS_CREATOR
type.
LOAD file_Comment
TO VERTEX Comment VALUES ($0, $1),
TO EDGE HAS_CREATOR VALUES ($2, $3);
In contrast, when using JSON or Parquet files, value names are specified by the key. Parquet files require USING JSON_FILE
set to TRUE
.
LOAD file_Comment
TO VERTEX Comment VALUES ($"id", $"content"),
TO EDGE HAS_CREATOR VALUES ($"id", $"CreatorPersonId")
USING JSON_FILE="TRUE";
For Google BigQuery, SQL results are joined by a specified separator to form CSV-formatted content.
LOAD bq_sql TO VERTEX Comment VALUES ($1, $0, $2, $3, $4, $5) USING header="true", separator="|";
Run the loading job
Use the command RUN LOADING JOB
to run the loading job.
RUN LOADING JOB stream_csv
Continuous file loading
By default, after a loading job stops, changes to files in an external data source are not automatically loaded into TigerGraph.
The data streaming connector also supports continuous loading in stream mode.
This is controlled with the EOF
flag for the RUN LOADING JOB
command.
If the EOF
flag is set to true
, the continuous loading will stop when the loader encounters an end-of-file (EOF) character in the data.
If you run this command with the EOF
flag set to false
, the loading job is kept active and any new data in the external data source will be loaded automatically. The connector can detect both new lines in existing files and new files added to the designated source folder.
RUN LOADING JOB stream_csv USING EOF="false"
Continuous loading works only on an incremental basis. Only new lines in existing files and new files are loaded with continuous loading. If any existing lines are changed or deleted, these changes will not be part of the loading job. |
For example, consider a file data.txt
in cloud storage that is part of a loading job.
line-1
The line of data is loaded successfully into the loading job for ingestion to TigerGraph. If a user edits the file and adds a new line, the stream loader notices the new modification and loads new lines, starting from where it previously left off. The actual data on each line is not compared to what was already loaded.
line-1
line-2
In this case, the new line line-2
is successfully loaded into the loading job for ingestion to TigerGraph.
If a user edits the file and adds a line before the end, like so, the entire file is loaded again, causing potentially repeated data.
line-1
added-line
line-2
The data loaded into TigerGraph thus looks like this. Because two lines had already been loaded, the first two lines are skipped, even though the second contains new data. The third line from the file is then loaded, resulting in a repeat of what was already loaded in the last pass.
line-1
line-2
line-2
To avoid this, only use stream loading jobs when there is no chance of data being altered or added to the middle of a file.
Known issues
Automatic message removal is an Alpha feature and may be subject to change. |
Messages in TigerGraph’s internal Kafka cluster are automatically removed from the topics at regular intervals. There are several known issues with this process:
-
Messages are only removed if the loading job is actively running. If the loading job finishes much sooner before the interval is reached, the messages are not removed.
-
If loading job uses EOF mode, meaning the loading job will terminate as soon as it finishes, it is likely some partial data will be left in the topic.
-
If a topic is deleted and recreated while a loading job on the topic is running, the data in the topic may get removed.
-
Deleting the connector does not delete the connect offsets for topics that are mapped to a folder URI.
NULL is not a valid input value.
TigerGraph does not store NULL values. Therefore, your input data should not contain any NULLs. |