GSQL Data Streaming Connector

The TigerGraph Data Streaming Connector is an integrated Kafka connector that provides fast and scalable data streaming between TigerGraph and external data sources.

For data coming from a separate Kafka instance, you need to perform a few more steps to set up data streaming. This process has its own separate instructions.

Architecture overview

The connector streams data from a source data system into TigerGraph’s internal Kafka cluster in the form of Kafka messages in specified topics. The messages are then ingested by a Kafka loading job and loaded into the database.

Data streaming connector streams from data source to TigerGraph’s internal Kafka
Figure 1. Data streaming connector architecture

Multiple connectors can run at the same time, streaming data from various sources into the TigerGraph system concurrently.

TigerGraph automatically sets up the streaming connector and Kafka job when an external data source is specified during loading job creation.

Create the loading job

Loading jobs for the Data Streaming Connector are created and run in GSQL. Refer to the full Creating a Loading Job documentation for more detail about loading jobs in general.

Sample loading job syntax

This example uses the LDBC_SNB dataset in CSV format.

USE GRAPH test_graph
CREATE DATA_SOURCE s1 = "ds_config.json" FOR GRAPH test_graph

CREATE LOADING JOB stream_csv FOR GRAPH test_graph {
    DEFINE FILENAME file_Comment = "$s1:s3://s3-loading-test/tg_ldbc_snb/sf0.1_csv/dynamic/Comment";
    DEFINE FILENAME file_Person = "$s1:s3://s3-loading-test/tg_ldbc_snb/sf0.1_csv/dynamic/Person";
    DEFINE FILENAME file_Comment_hasCreator_Person = "$s1:s3://s3-loading-test/tg_ldbc_snb/sf0.1_csv/dynamic/Comment_hasCreator_Person";
    LOAD file_Comment
    TO VERTEX Comment VALUES ($1, $0, $2, $3, $4, $5) USING header="true", separator="|";
    LOAD file_Person
    TO VERTEX Person VALUES ($1, $2, $3, $4, $5, $0, $6, $7, SPLIT($8,";"), SPLIT($9,";")) USING header="true", separator="|";
    LOAD file_Comment_hasCreator_Person
    TO EDGE HAS_CREATOR VALUES ($1, $2) USING header="true", separator="|";
}

In short, the steps are:

  1. Specify a graph, in this case test_graph.

  2. Create a data source for GSQL to use, here named s1, and associate it with a JSON configuration file.

  3. Create the loading job.

  4. Set up the files. In this example there are three files, one for Comment vertices, one for Person vertices, and one for edges showing that the comments have Person creators. Each of the files is stored in a different S3 data source. Credentials for accessing the data source are found in the configuration file from step 2.

  5. Load each file to the appropriate data type in the graph (vertices or edges). As part of the LOAD command, specify which columns of the CSV file to use.

The following sections give more details about each of these steps.

Specify a graph and config file

USE GRAPH test_graph
CREATE DATA_SOURCE s1 = "ds_config.json" FOR GRAPH test_graph

In this example, we create a data source and name it s1.

Older versions of TigerGraph required a keyword after DATA_SOURCE such as STREAM or KAFKA. This is still supported, but no longer required. Therefore, CREATE DATA_SOURCE STREAM s1 is equally valid.

We associate our data source with the ds_config.json file.

  • Amazon S3

  • Google Cloud Storage

  • Azure Blob Storage

  • Google BigQuery

AWS uses the standard IAM credential provider and uses your access key for authentication.

Access keys can be used for an individual user or for an IAM role. See Using IAM Roles for Amazon EC2 for more information.

{
    "type": "s3",
    "access.key": "<access key>",
    "secret.key": "<secret key>"
}
{
  "type": "gcs",
  "project_id": "<project id>",
  "private_key_id": "<private key id>",
  "private_key": "<private key>",
  "client_email": "<email address>"
}

The service key JSON file can be created and downloaded following the Google Cloud tutorial. After following the tutorial, the JSON file can be used after changing the type entry to gcs.

{
    "type" : "abs",
    "account.key" : "<account key>"
}
{
    "type":"bigquery",
    "ProjectId":"tigergraph-dev",
    "OAuthType":2,
    "parameters" : {
        "OAuthRefreshToken":"<refresh token>",
        "OAuthClientId":"<client ID>.apps.googleusercontent.com",
        "OAuthClientSecret":"<client secret>"
    }
}

For more information on authorization types for Google BigQuery, see this PDF guide: Magnitude Simba Google BigQuery JDBC Data Connector.

JSON data can also be sent inline. Inline syntax is required for JSON configurations on TigerGraph Cloud instances.

Inline JSON data format when creating a data source
CREATE DATA_SOURCE s1 = "{
    type: s3,
    access.key: <access key>,
    secret.key: <secret key>
}" FOR GRAPH test_graph

String literals can be enclosed with a double quote ", triple double quotes """, or triple single quotes '''. Double quotes " in the JSON can be omitted if the key name does not contain a colon : or comma ,.

Alternate quote syntax for inline JSON data
CREATE DATA_SOURCE s1 = """{
    "type": "s3",
    "access.key": "<access key>",
    "secret.key": "<secret key>"
}""" FOR GRAPH test_graph

Key names accept a separator of either a period . or underscore _, so for example, a key could also be named access_key.

Define the filename

Filenames can be defined with a URI to a separate file, a supplied JSON-formatted file, or JSON-formatted inline content supplied in the command. Use the following examples to create the DEFINE FILENAME command based on your data type.

  • AWS/GCP/ABS

  • BigQuery

DEFINE FILENAME file_name = "$[data source name]:[URI]";
DEFINE FILENAME file_name = "$[data source name]:[json config file]";
DEFINE FILENAME file_name = "$[data source name]:[inline json content]";
DEFINE FILENAME file_name = "$[data source name]:[SQL]";
DEFINE FILENAME file_name = "$[data source name]:[json config file]";
DEFINE FILENAME file_name = "$[data source name]:[inline json content]";

Here are some examples with Google BigQuery getting data from a SQL query.

DEFINE FILENAME bq_sql = "$s1:SELECT id, firstName, lastName, gender, birthday, creationDate, locationIP, browserUsed, language, email FROM `tigergraph-ldbc-benchmark.snb_bi_sf01.Person`";

DEFINE FILENAME bq_inline_json = """$s1:{
	"query":"SELECT id, creationDate, locationIP, browserUsed, content, length, CreatorPersonId FROM `tigergraph-ldbc-benchmark.snb_bi_sf01.Comment`",
	"partition":4
}""";

DEFINE FILENAME bq_inline_json = """$s1:myfile.json""";

If you use a separate JSON file, it must follow the same format as the inline example shown.

Here are some examples of different DEFINE FILENAME statements. If the filename is in URI format and refers to a folder or prefix, all files in that folder or with that prefix are loaded.

The filename can be used as a parameter when running loading jobs.

DEFINE FILENAME uri_s3 = "$s1:s3://s3-loading-test/tg_ldbc_snb/sf0.1_csv/dynamic/Comment";
DEFINE FILENAME uri_gcs = "$s1:gs://tg_ldbc_snb/sf0.1_csv/dynamic/Person";
DEFINE FILENAME uri_abs = "$s1:abfss://person@yandblobstorage.dfs.core.windows.net/persondata.csv";

DEFINE FILENAME parquet_s3 = """$s1:{"file.uris":"s3://s3-loading-test/tg_ldbc_snb/sf0.1_parquet/dynamic/Comment", "file.type":"parquet"}""";

DEFINE FILENAME csv_gcs="""$s1:{
    "file.uris": "gs://tg_ldbc_snb/sf0.1_csv/dynamic/Person",
    "file.type": "text",
    "partition": 6
  }""";

DEFINE FILENAME uri_s3 = "$s1:myfile.json";

Define the parameters

These are the parameters that should be in the JSON-formatted configuration.

Parameter Description Required Default value

file.uris

The URI or URIs split by a comma.

Required

N/A

file.type

The file type. Use text for CSV and JSON and parquet for Parquet files.

Optional

If the file extension is parquet, then the file.type default is Parquet, but if not, the default is text.

partition

The number of partitions to use. When loading data, each partition is distributed evenly across each node. If one filename contains much more data than others, consider using a larger partition number.

Optional

The default value is calculated by ceiling(number of nodes / number of filenames).

batch.size

The batch size of the loading job, referring to the number of CSV lines or JSON objects that will be loaded.

Optional

10000

recursive

If a directory of files is loaded as an input, this parameter determines whether the data loader will load files recursively from subdirectories.

Optional

true

regexp

Whether to interpret filenames as containing regular expressions to filter filenames to be loaded. Uses Java regular expression patterns.

Optional

.*, which permits all filenames.

default

The default value for any field left empty.

Optional

"", an empty string.

archive.type

The file type for archive files. Accepted values: auto (where it uses the file extension as the file type), tar, zip, gzip, and none (loading from an uncompressed file).

Optional

auto

tasks.max

The number of threads used to download data.

Optional

1

Define the attributes where data will be loaded

In this stage, we define which attributes of vertices and edges will receive data from the external data source.

When loading a CSV, column names are specified by their numerical indices rather than header text.

In this example, the first and second columns in the CSV are loaded to vertices of Comment type and the third and fourth columns are loaded to edges of HAS_CREATOR type.

LOAD file_Comment
    TO VERTEX Comment VALUES ($0, $1),
    TO EDGE HAS_CREATOR VALUES ($2, $3);

In contrast, when using JSON or Parquet files, value names are specified by the key. Parquet files require USING JSON_FILE set to TRUE.

LOAD file_Comment
    TO VERTEX Comment VALUES ($"id", $"content"),
    TO EDGE HAS_CREATOR VALUES ($"id", $"CreatorPersonId")
        USING JSON_FILE="TRUE";

For Google BigQuery, SQL results are joined by a specified separator to form CSV-formatted content.

LOAD bq_sql TO VERTEX Comment VALUES ($1, $0, $2, $3, $4, $5) USING header="true", separator="|";

Run the loading job

Use the command RUN LOADING JOB to run the loading job.

RUN LOADING JOB stream_csv

Continuous file loading

By default, after a loading job stops, changes to files in an external data source are not automatically loaded into TigerGraph.

The data streaming connector also supports continuous loading in stream mode. This is controlled with the EOF flag for the RUN LOADING JOB command. If the EOF flag is set to true, the continuous loading will stop when the loader encounters an end-of-file (EOF) character in the data.

If you run this command with the EOF flag set to false, the loading job is kept active and any new data in the external data source will be loaded automatically. The connector can detect both new lines in existing files and new files added to the designated source folder.

RUN LOADING JOB stream_csv USING EOF="false"
Continuous loading works only on an incremental basis. Only new lines in existing files and new files are loaded with continuous loading. If any existing lines are changed or deleted, these changes will not be part of the loading job.

For example, consider a file data.txt in cloud storage that is part of a loading job.

data.txt
line-1

The line of data is loaded successfully into the loading job for ingestion to TigerGraph. If a user edits the file and adds a new line, the stream loader notices the new modification and loads new lines, starting from where it previously left off. The actual data on each line is not compared to what was already loaded.

data.txt after a new line is added to the end
line-1
line-2

In this case, the new line line-2 is successfully loaded into the loading job for ingestion to TigerGraph.

If a user edits the file and adds a line before the end, like so, the entire file is loaded again, causing potentially repeated data.

data.txt after a new line is added before the end
line-1
added-line
line-2

The data loaded into TigerGraph thus looks like this. Because two lines had already been loaded, the first two lines are skipped, even though the second contains new data. The third line from the file is then loaded, resulting in a repeat of what was already loaded in the last pass.

Data in TigerGraph
line-1
line-2
line-2

To avoid this, only use stream loading jobs when there is no chance of data being altered or added to the middle of a file.

Known issues

Automatic message removal is an Alpha feature and may be subject to change.

Messages in TigerGraph’s internal Kafka cluster are automatically removed from the topics at regular intervals. There are several known issues with this process:

  • Messages are only removed if the loading job is actively running. If the loading job finishes much sooner before the interval is reached, the messages are not removed.

  • If loading job uses EOF mode, meaning the loading job will terminate as soon as it finishes, it is likely some partial data will be left in the topic.

  • If a topic is deleted and recreated while a loading job on the topic is running, the data in the topic may get removed.

  • Deleting the connector does not delete the connect offsets for topics that are mapped to a folder URI.

NULL is not a valid input value.

TigerGraph does not store NULL values. Therefore, your input data should not contain any NULLs.