# Data Streaming Connector

TigerGraph Data Streaming Connector is a Kafka connector that provides fast and scalable data streaming between TigerGraph and other data systems.

## 1. Supported data systems

The data streaming connector supports the following data systems:

## 2. Stream data from Google Cloud Storage

You can create a data connector between TigerGraph’s internal Kafka server and your GCS service with a specified topic. The connector will stream data from the data source in your GCS buckets to TigerGraph’s internal Kafka cluster. You can then create and run a loading job to load data from Kafka into the graph store using the Kafka loader.

### 2.1. Prerequisites

• Your source data files are stored in Google Cloud Storage buckets.

### 2.2. Procedure

#### 2.2.1. Specify connector configurations

The connector configurations provide the following information:

• Connector class

• Your GCP service account credentials

• Information on how to parse the source data

• Mapping between connector and source file

#### Specify connector class

The connector class indicates what type of connector the configuration file is used to create. Connector class is specified by the `connector.class` key. For connecting to GCS, its value is set to `com.tigergraph.kafka.connect.filesystem.FsSourceConnector`.

#### Provide GCP account credentials

You need to provide following information for the connector to access the files in your GCP Filestore bucket. You should be able to find all items on this list in your service account key:

• `file.reader.settings.fs.gs.auth.service.account.email`: The email address associated with the service account

• `file.reader.settings.fs.gs.auth.service.account.private.key.id`: The private key ID associated with the service account used for GCS access.

• `file.reader.settings.fs.gs.auth.service.account.private.key`: The private key associated with the service account used for GCS access.

• `file.reader.settings.client_email`: The service account email.

• `file.reader.settings.fs.gs.project.id`: Google Cloud Project ID with access to GCS buckets.

• `file.reader.settings.fs.gs.auth.service.account.enable`: Whether to create objects for the parent directories of objects with `/` in their path. For example, creating `gs://bucket/foo/` upon deleting or renaming `gs://bucket/foo/bar`.

• `file.reader.settings.fs.gs.impl`: The FileSystem for gs: (GCS) uris. The value for this configuration should be `com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem`.

• `file.reader.settings.fs.AbstractFileSystem.gs.impl`: The AbstractFileSystem for `gs:` (GCS) URIs. Only necessary for use with Hadoop 2. The value for this configuration should be `com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS`.

#### Specify parsing rules

The streaming connector supports the following file types:

• CSV files

• JSON files. Each JSON object must be on a separate line.

• directories

• tar files

• zip files

For URIs that point to directories and compressed files, the connector looks for all files inside the directory or the compressed file that match the `file.regexp` parameter. By default, this includes both CSV and JSON files.

If you set `file.recursive` to true, the connector looks for files recursively.

The following parsing options are available:

Name Description Default

`file.regexp`

The regular expression to filter which files to read. The default value matches all files.

`.*`

`file.recursive`

Whether to retrieve files recursively if the URI points to a directory.

`true`

`file.reader.type`

The type of file reader to use. The only supported value is `text`.

`text`

`file.reader.delimited.separator`

The character that separates columns. This parameter does not affect JSON files.

`,`

`file.reader.delimited.quote`

The explicit boundary markers for string tokens, either single or double quotation marks. This parameter does not affect JSON files.

The parser will not treat separator characters found within a pair of quotation marks as a separator.

Accepted values:

• `single`: single quotes `'` are boundary markers.

• `double`: double quotes `"` are boundary markers.

• Empty string `""`: Quotation marks are treated as ordinary characters.

Empty string `""`

`file.reader.delimited.value.default`

The default value for a column when its value is null. This parameter does not affect JSON files.

Empty string `""`

`file.reader.batch.size`

The maximum number of lines to include in a single batch.

`1000`

`file.reader.text.eol`

End of line character.

To use double-quote`"` as the end-of-line character, it needs to be escaped with a backslash: `file.reader.text.eol=\"`.

Other characters such as `'`, `/`, `\t`, and `\n`, do not need to be espaced.

You cannot use a backslash character as the end-of-line character, since it cannot be escaped.

`\n`

`file.reader.text.header`

Whether the first line of the files is a header line. If you are using JSON files, set this parameter to false.

`false`

`file.reader.text.archive.type`

File type for archive files. Setting the value of this configuration to `auto` will allow the connector to decide file types automatically based on the file extensions. Accepted values:

• `tar`

• `zip`

• `gzip`

• The file reader only supports gzip files containing tar files. Standard gzip files are not supported.

• `none`

• `auto`

`auto`

`file.reader.text.archive.extensions.tar`

If a file has this extension, treat it as a tar file.

`tar`

`file.reader.text.archive.extensions.zip`

If a file has this extension, treat it as a zip file

`zip`

`file.reader.text.archive.extensions.gzip`

If a file has this extension, treat it as a gzip file. The file reader only supports gzip files containing tar files. Standard gzip files are not supported.

`gz`, `tgz`

#### Map source file to connector

The below configurations are required:

Name Description Default

`name`

Name of the connector.

None. Must be provided by the user.

`topic`

Name of the topic to create in Kafka.

None. Must be provided by the user.

`tasks.max`

The maximum number of tasks which can run in parallel.

1

`file.uris`

The path(s) to the data files on Google Cloud Filestore. The URI may point to a CSV file, a zip file, a gzip file, or a directory

None. Must be provided by the user.

Below is an example configuration:

``````connector.class=com.tigergraph.kafka.connect.filesystem.FsSourceConnector
...

mode=eof
file.regexp=".*"
file.recursive=true

[connector_person]
name = fs-person-demo-104
topic=person-demo-104
file.uris=gs://example/p.csv

[connector_friend]
name = fs-friend-demo-104
topic=friend-demo-104
file.uris=gs://example/f.csv``````

#### 2.2.2. Create connector

Run command `gadmin connector create` and specify the configuration file to create the connector:

`gadmin connector create --c <config_file>`

The connectors start streaming from the data source immediately after creation if the configurations are valid. You can run `gadmin connector status` to verify the status of the connectors. If the configurations are valid, the connectors should be in `RUNNING` status.

Data streamed from the source stays in TigerGraph’s internal Kafka topics until they are ingested by a loading job.

#### 2.2.3. Create data source

Now that the connector has started loading data into TigerGraph’s internal Kafka cluster, you can create a data source and point it to the Kafka cluster:

1. Create a data source configuration file. The broker’s IP and hostname should be `localhost:30002`, which points to the port for TigerGraph’s internal Kafka cluster. In the `kafka.config` field, set `group.id` to `tigergraph`:

``````{
"broker":"localhost:30002",
"kafka_config":
{
"group.id": "tigergraph"
}
}``````
2. Run `CREATE DATA SOURCE` to create the data source:

``CREATE DATA_SOURCE KAFKA k1 FOR GRAPH social``

1. Create a topic-partition configuration for each topic.

``````{
"topic": <topic_name>, (1)
"partition_list": [ (2)
{
"start_offset": <offset_value>, (3)
"partition": <partition_number> (4)
},
{
"start_offset": <offset_value>, (3)
"partition": <partition_number> (4)
}
...
]
}``````
 1 Replace `` with the name of the topic this configuration applies to. 2 List of partitions you want to stream from. For each partition, you can set a start offset. If this list is empty, or `partition_list` isn’t included, all partitions are used with the default offset. 3 Replace `` with the offset value you want. The default offset for loading is `-1`, which means you will load data from the most recent message in the topic. If you want to load from the beginning of a topic, the `start_offset` value should be `-2`. 4 Replace `` with the partition number if you want to configure.
2. Create a loading job and map data to graph. See Kafka loader guide for how to map data from a Kafka data source to the graph. See Loading JSON data on how to create a loading job for JSON data.

 Known bug: to use the `-1` value for offset, delete the `start_offset` key instead of setting it to `-1`.

For example, suppose we have the following two CSV files and schema:

• Schema

• p.csv

• f.csv

``````CREATE VERTEX person (PRIMARY_ID name STRING, name STRING, age INT, gender STRING, state STRING)
CREATE UNDIRECTED EDGE friendship (FROM person, TO person, connect_day DATETIME)
CREATE GRAPH social (person, friendship)``````
``````name,gender,age,state
Tom,male,40,ca
Dan,male,34,ny
Jenny,female,25,tx
Kevin,male,28,az
Amily,female,22,ca
Nancy,female,20,ky
Jack,male,26,fl
A,male,29,ca``````
``````person1,person2,date
Tom,Dan,2017-06-03
Tom,Jenny,2015-01-01
Dan,Jenny,2016-08-03
Jenny,Amily,2015-06-08
Dan,Nancy,2016-01-03
Nancy,Jack,2017-03-02
Dan,Kevin,2015-12-30
Amily,Dan,1990-1-1``````

• topic_person.json

• topic_friend.json

``````{
"topic": "person-demo-104",
"partition_list": [
{
"start_offset": -2,
"partition": 0
}
]
}``````
``````{
"topic": "friend-demo-104",
"partition_list": [
{
"start_offset": -2,
"partition": 0
}
]
}``````
``````CREATE LOADING JOB load_person FOR GRAPH social {
DEFINE FILENAME f1 = "$k1:/home/mydata/topic_person.json"; DEFINE FILENAME f2 = "$k1:/home/mydata/topic_friend.json";
LOAD f1 TO VERTEX person VALUES ($0,$0, $2,$1, $3) USING separator=","; LOAD f2 TO EDGE friendship VALUES ($0, $1,$2)  USING separator=",";
}``````

Run the loading job created in the last step will load the streamed data into the graph. If you make changes to the topic-partition configuration file, you can overwrite the values for the filename variables with the `USING` clause.

``GSQL > RUN LOADING JOB load_person``

By default, loading jobs that use Kafka data sources run in streaming mode and do not stop until manually aborted. As data is streamed from the data source, the running loading job will continuously ingest the streamed data into the graph store.

#### 2.2.6. Stream data source updates

To load updates to the data source after you’ve created the connectors, you need to delete the connector and recreate another connector.

In the future, the streaming data connector will support automatically scanning for updates and stream data to TigerGraph.

## 3. Manage connectors

After creating a connector, you can choose to delete it or pause it. You can also use `gadmin` commands to view status of all existing connectors.

### 3.1. List connectors

You can list all running connectors or view detailed configuration of a specific connector.

To view a list of all connectors, run `gadmin connector list` from your bash terminal as the TigerGraph Linux user.

To view configuration details of a specific connector, run `gadmin connector list <name_of_connector>` and replace `<name_of_connector>` with the name of the connector you want to inspect.

### 3.2. Pause a connector

Pausing a connector stops the connector from streaming data from the data source. The data that has been streamed to TigerGraph’s internal Kafka can still be loaded into the graph store via a loading job.

To pause a connector, run the below command and replace `<connector_name>` with the name of the connector:

### 3.4. Delete a connector

Deleting a connector removes a connector. It stops the connector from streaming, but the data that has been streamed to Kafka can still be ingested by a loading job. This action cannot be undone and a removed connector cannot be resumed.

To delete a connector, run the below command and replace `<connector_name>` with the name of the connector:

``\$ gadmin connector delete <connector_name>``