Load from External Kafka
This method is for only for v3.9.3 and higher versions. For earlier versions, please refer to Stream from an External Kafka Cluster (Deprecated). |
After you have defined a graph schema, you can create a loading job, specify your data sources, and run the job to load data.
The steps for loading from local files, cloud storage, or any other supported sources are similar. We will call out whether a particular step is common for all loading or specific to a data source or loading mode.
Example Schema
//Vertex Types:
CREATE VERTEX Person(PRIMARY_ID id UINT, firstName STRING, lastName STRING,
gender STRING, birthday DATETIME, creationDate DATETIME, locationIP STRING,
browserUsed STRING, speaks SET<STRING>, email SET<STRING>)
WITH STATS="OUTDEGREE_BY_EDGETYPE", PRIMARY_ID_AS_ATTRIBUTE="true"
CREATE VERTEX Comment(PRIMARY_ID id UINT, creationDate DATETIME,
locationIP STRING, browserUsed STRING, content STRING, length UINT)
WITH STATS="OUTDEGREE_BY_EDGETYPE", PRIMARY_ID_AS_ATTRIBUTE="true"
//Edge Types:
CREATE DIRECTED EDGE HAS_CREATOR(FROM Comment, TO Person)
WITH REVERSE_EDGE="HAS_CREATOR_REVERSE"
Create Data Source Object
A data source object provides a standard interface for all supported data source types, so that loading jobs can be written without regard for the data source.
When you create the object, you specify its details (type, access credentials, etc.) in the form of a JSON object. The JSON object can either be read in from a file or provided inline.
Inline mode is required when creating data sources for TigerGraph Cloud instances. |
In the following example, we create a data source named s1
, and read its configuration information from a file called ds_config.json
.
USE GRAPH ldbc_snb
CREATE DATA_SOURCE s1 = "ds_config.json" FOR GRAPH ldbc_snb
Older versions of TigerGraph required a keyword after |
CREATE DATA_SOURCE s1 = "{
type: <type>,
key: <value>
}" FOR GRAPH ldbc_snb
String Literals
String literals can be represented according to the following options:
-
Enclosed with double quote
"
. -
Enclosed with triple double quotes
"""
. -
Enclosed with triple single quotes
'''
.
In the case of JSON that does not contain a colon :
or a comma ,
the double quotes "
can be omitted.
CREATE DATA_SOURCE s1 = """{
"type": "<type>",
"key": "<value>"
}""" FOR GRAPH ldbc_snb
Either a period .
or _
can be used for separation in the specified key name. Example: first.second
or first_second
.
Configure the Kafka source
The TigerGraph connector to external Kafka sources makes use of Apache Kafka Mirrormaker.
In addition, users can utilize external sources to provide configurations for Kafka connectors. These sources include files, Vault, and environment variables. See Externalizing Kafka Configurations. |
To configure the data source object, the minimum requirement is the address of the external source Kafka cluster:
{
"type": "mirrormaker",
"source.cluster.bootstrap.servers": "<broker_addrs>"
}
Configuration Settings
Field | Description |
---|---|
|
Protocol used to communicate with brokers.
Valid values are: |
|
SASL mechanism used for client connections.
This may be any mechanism for which a security provider is available. GSSAPI is the default mechanism.
Third party providers require configuring |
|
The Kerberos principal name used by your Kafka brokers. This could be defined in either JAAS configuration or Kafka’s configuration. |
|
JAAS login context parameters for SASL connections in the format used by JAAS configuration files. See JAAS Login Configuration File for details. |
|
Name of java handler class required for third party |
|
The endpoint identification algorithm used to validate server hostname in the server certificate. Default is |
|
The location of the key store file. |
|
The password of the key store file. |
|
The password of the private key in the key store file or the PEM key specified in |
|
The location of the trust store file. |
|
The password for the trust store file. |
Component Prefix
Replace the <prefix>
with the appropriate identifier:
-
[admin | producer | consumer]
-
[source | target].cluster
-
[source | target].cluster.[admin | producer | consumer]
Security Protocols
If the source cluster is configured for SSL or SASL protocols, you need to provide the following SSL/SASL credentials in order to communicate with the source cluster.
-
If the source cluster uses SASL, you need to upload the keytab of each Kerberos principal to every node of your TigerGraph cluster at the same absolute path.
-
If the source cluster uses SSL, see our documentation Kafka SSL Security Guide
-
If the source cluster uses SASL and SSL, you need to upload the keytab of each Kerberos principal, as well as the key store and truststore to every node of your TigerGraph cluster. Each file must be at the same absolute path on all nodes.
The following configurations are required for admin, producer and consumer. Kafka allows SSL settings overriding, respecting security settings with the following precedence:
-
generic.ssl.setting
<source/target.cluster.ssl.setting
<admin/producer/consumer.ssl.setting
.
If both source and target clusters are sharing the same SSL settings, user can set generic settings for both source/target clusters and all the rols(admin/producer/consumer).
For example, user can set ssl.keystore.location="/path/to/key/store"
instead of:
-
source.cluster.ssl.keystore.location="/path/to/key/store"
-
admin.ssl.keystore.location="/path/to/key/store"
-
source.cluster.admin.ssl.keystore.location="/path/to/key/store"
.
If source and target clusters have different SSL settings, it is possible to set cluster wide SSL configs.
For example, user can set: target.cluster.ssl.truststore.password="/password/for/trust/store"
instead of:
-
target.cluster.producer.ssl.trust.password="/password/for/trust/store"
.
Note: SSL is now well supported by TigerGraph, we recommend users to set up regular SSL rather than SASL + PlainText/SSL.
Supported Configuration Examples
SSL
Need to configure:
-
<prefix>.security.protocol
-
<prefix>.ssl.<field>
{
"type": "mirrormaker",
"source.cluster.bootstrap.servers": "<KAFKAURL>:<PORT>",
"consumer.security.protocol": "SSL",
"consumer.ssl.endpoint.identification.algorithm": "none",
"consumer.ssl.keystore.location": "/path/to/client.keystore.jks",
"consumer.ssl.keystore.password": "******",
"consumer.ssl.key.password": "******",
"consumer.ssl.truststore.location": "/path/to/client.truststore.jks",
"consumer.ssl.truststore.password": "******",
"source.admin.security.protocol": "SSL",
"source.admin.ssl.endpoint.identification.algorithm": "none",
"source.admin.ssl.keystore.location": "/path/to/client.keystore.jks",
"source.admin.ssl.keystore.password": "******",
"source.admin.ssl.key.password": "******",
"source.admin.ssl.truststore.location": "/path/to/client.truststore.jks",
"source.admin.ssl.truststore.password": "******",
"producer.security.protocol": "SSL",
"producer.ssl.endpoint.identification.algorithm": "none",
"producer.ssl.keystore.location": "/path/to/client.keystore.jks",
"producer.ssl.keystore.password": "******",
"producer.ssl.key.password": "******",
"producer.ssl.truststore.location": "/path/to/client.truststore.jks",
"producer.ssl.truststore.password": "******"
}
SASL_PLAINTEXT
Need to configure:
-
<prefix>.security.protocol
-
<prefix>.sasl.<field>
{
"type": "mirrormaker",
"source.cluster.bootstrap.servers": "<KAFKAURL>:<PORT>",
"consumer.security.protocol": "SASL_PLAINTEXT",
"consumer.sasl.mechanism": "<GSSAPI | PLAIN | etc>",
"consumer.sasl.jaas.config": "<jaas_conf_content>",
"source.admin.security.protocol": "SASL_PLAINTEXT",
"source.admin.sasl.mechanism": "<GSSAPI | PLAIN | etc>",
"source.admin.sasl.jaas.config": "<jaas_conf_content>",
"producer.security.protocol": "SASL_PLAINTEXT",
"producer.sasl.mechanism": "<GSSAPI | PLAIN | etc>",
"producer.sasl.jaas.config": "<jaas_conf_content>",
}
SASL_SSL
Need to configure:
-
<prefix>.security.protocol
-
<prefix>.sasl.<field>
-
<prefix>.ssl.<field>
{
"type": "mirrormaker",
"source.cluster.bootstrap.servers": "<KAFKAURL>:<PORT>",
"consumer.security.protocol": "SASL_SSL",
"consumer.sasl.mechanism": "<GSSAPI | PLAIN | etc>",
"consumer.sasl.jaas.config": "<jaas_conf_content>",
"consumer.ssl.endpoint.identification.algorithm": "none",
"consumer.ssl.keystore.location": "/path/to/client.keystore.jks",
"consumer.ssl.keystore.password": "******",
"consumer.ssl.key.password": "******",
"consumer.ssl.truststore.location": "/path/to/client.truststore.jks",
"consumer.ssl.truststore.password": "******",
"source.admin.security.protocol": "SASL_PLAINTEXT",
"source.admin.sasl.mechanism": "<GSSAPI | PLAIN | etc>",
"source.admin.sasl.jaas.config": "<jaas_conf_content>",
"source.admin.ssl.endpoint.identification.algorithm": "none",
"source.admin.ssl.keystore.location": "/path/to/client.keystore.jks",
"source.admin.ssl.keystore.password": "******",
"source.admin.ssl.key.password": "******",
"source.admin.ssl.truststore.location": "/path/to/client.truststore.jks",
"source.admin.ssl.truststore.password": "******",
"producer.security.protocol": "SASL_PLAINTEXT",
"producer.sasl.mechanism": "<GSSAPI | PLAIN | etc>",
"producer.sasl.jaas.config": "<jaas_conf_content>",
"producer.ssl.endpoint.identification.algorithm": "none",
"producer.ssl.keystore.location": "/path/to/client.keystore.jks",
"producer.ssl.keystore.password": "******",
"producer.ssl.key.password": "******",
"producer.ssl.truststore.location": "/path/to/client.truststore.jks",
"producer.ssl.truststore.password": "******"
}
Third Party SASL Mechanism
For both SASL
and SASL_SSL
when a third party mechanism it is necessary to:
-
Include the
<prefix>.sasl.jaas.config
in addition to the<prefix>.sasl.client.callback.handler.class
in the configuration -
Place the third party jar under
$(gadmin config get System.Approot)/kafka/libs/
{
"type": "mirrormaker",
"source.cluster.bootstrap.servers": "<KAFKAURL>:<PORT>",
"consumer.security.protocol": "SASL_PLAINTEXT",
"consumer.sasl.mechanism": "<AWS_MSK_IAM | OAUTHBEARER | etc>",
"consumer.sasl.jaas.config": "<jaas_conf_content>",
"consumer.sasl.client.callback.handler.class": "<handler class name for third party>",
"source.admin.security.protocol": "SASL_PLAINTEXT",
"source.admin.sasl.mechanism": "<AWS_MSK_IAM | OAUTHBEARER | etc>",
"source.admin.sasl.jaas.config": "<jaas_conf_content>",
"source.admin.sasl.client.callback.handler.class": "<handler class name for third party>",
"producer.security.protocol": "SASL_PLAINTEXT",
"producer.sasl.mechanism": "<AWS_MSK_IAM | OAUTHBEARER | etc>",
"producer.sasl.jaas.config": "<jaas_conf_content>",
"producer.sasl.client.callback.handler.class": "<handler class name for third party>"
}
Schema Registry Service
If there is a schema registry service containing the record schema of the source topic, please add it to the data source configuration:
"value.converter.schema.registry.url": "schema_registry_url"
Currently, only Avro schema is supported. |
Create a loading job
A loading job tells the database how to construct vertices and edges from data sources. The loading job body has two parts:
-
DEFINE statements create variables to refer to data sources. These can refer to actual files or be placeholder names. The actual data sources can be given when running the loading job.
-
LOAD statements specify how to take the data fields from files to construct vertices or edges.
Example loading job from external Kafka
The following is an example loading job from and external Kafka cluster.
USE GRAPH ldbc_snb
CREATE DATA_SOURCE s1 = "ds_config.json" FOR GRAPH ldbc_snb
CREATE LOADING JOB load_data FOR GRAPH ldbc_snb {
DEFINE FILENAME file_Comment = "$s1:topic_Comment";
DEFINE FILENAME file_Person = "$s1:topic_Person";
DEFINE FILENAME file_Comment_hasCreator_Person =
"$s1:topic_Comment_hasCreator_Person";
LOAD file_Comment
TO VERTEX Comment
VALUES ($1, $0, $2, $3, $4, $5) USING separator="|";
LOAD file_Person
TO VERTEX Person
VALUES ($1, $2, $3, $4, $5, $0, $6, $7,
SPLIT($8,";"), SPLIT($9,";")) USING separator="|";
LOAD file_Comment_hasCreator_Person
TO EDGE HAS_CREATOR
VALUES ($1, $2) USING separator="|";
}
Define filenames
First we define filenames, which are local variables referring to data files (or data objects).
The terms FILENAME and filevar are used for legacy reasons, but a filevar can also be an object in a data object store.
|
DEFINE FILENAME filevar ["=" file_descriptor ];
The file descriptor can be specified at compile-time or at runtime. Runtime settings override compile-time settings:
RUN LOADING JOB job_name USING filevar=file_descriptor_override
While a loading job may have multiple FILENAME variables , they must all refer to the same DATA_SOURCE object.
|
Kafka file descriptors
The file descriptor has three valid formats. You can simply provide the Kafka topic name and use default settings. Or, you can provide configuration details including the topic, either in a JSON file or as inline JSON content.
DEFINE FILENAME file_name = "$[data source name]:[topic]";
DEFINE FILENAME file_name = "$[data source name]:[json config file]";
DEFINE FILENAME file_name = "$[data source name]:[inline json content]";
For example:
// Format 1: topic only
DEFINE FILENAME file_Person = "$s1:topic_Person";
// Format 2: topic and configuration file
DEFINE FILENAME file_Person = "$s1:myfile.json";
// Format 3: topic and inline configuration
DEFINE FILENAME file_Person="""$s1:{
"topic": "topic_Person",
"tasks.max": "10"
}""";
Filename parameters
These are the required and optional configuration parameters:
Parameter | Description | Required? | Default value |
---|---|---|---|
topic |
The source topic name |
Required |
N/A |
tasks.max |
The maximum number of tasks used to consume the source topic. You can increase this value when the source topic contains multiple partitions. |
Optional |
1 |
num.partitions |
The number of partitions to use. When loading data, each partition is distributed evenly across each node. If one filename contains much more data than others, consider using a larger partition number. |
Optional |
3 |
value.converter |
Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka.
This controls the format of the values in messages written to or read from Kafka.
If records are in Avro format with Schema Registry service, use |
Optional |
N/A |
For Avro data with schema registry,you must set both value.converter.schema.registry.url when defining the DATA_SOURCE and value.converter when defining a FILENAME.
|
Specify the data mapping
Next, we use LOAD statements to describe how the incoming data will be loaded to attributes of vertices and edges. Each LOAD statement handles the data mapping, and optional data transformation and filtering, from one filename to one or more vertex and edge types.
LOAD [ source_object|filevar|TEMP_TABLE table_name ]
destination_clause [, destination_clause ]*
[ TAGS clause ] (1)
[ USING clause ];
1 | As of v3.9.3, TAGS are deprecated. |
Let’s break down one of the LOAD statements in our example:
LOAD file_Person TO VERTEX Person
VALUES($1, $2, $3, $4, $5, $0, $6, $7,
SPLIT($8, ";"), SPLIT($9, ";"))
USING SEPARATOR="|", HEADER="true", EOL="\n";
-
$0
,$1
,… refer to the first, second, … columns in each line a data file. -
SEPARATOR="|"
says the column separator character is the pipe (|
). The default is comma (,
). -
HEADER="true"
says that the first line in the source contains column header names instead of data. These names can be used instead of the columnn numbers. -
SPLIT
is one of GSQL’s ETL functions. It says that there is a multi-valued column, which has a separator character to mark the subfields in that column.
Refer to Creating a Loading Job in the GSQL Language Reference for descriptions of all the options for loading jobs.
When loading JSON or Avro data,
E.g.,
|
Avro Data Validation
In certain scenarios, users could load data in Avro format to TigerGraph DB, via an external Kafka connector, such as MirrorMakerConnector and experience malformed data errors during this process. See our documentation on Avro Data Validation with KafkaConnect for help.
Run the loading job
Use the command RUN LOADING JOB
to run the loading job.
RUN LOADING JOB [-noprint] job_name [
USING filevar [="file_descriptor"][, filevar [="file_descriptor"]]*
[,EOF="eof_mode"]
]
-noprint
By default, the loading job will run in the foreground and print the loading status and statistics after you submit the job.
If the -noprint
option is specified, the job will run in the background after displaying the job ID and the location of the log file.
filevar list
The optional USING
clause may contain a list of file variables. Each file variable may optionally be assigned a file_descriptor
, obeying the same format as in CREATE LOADING JOB
. This list of file variables determines which parts of a loading job are run and what data files are used.
When a loading job is compiled, it generates one RESTPP endpoint for each filevar
and source_object. As a consequence, a loading job can be run in parts. When RUN LOADING JOB
is executed, only those endpoints whose filevar or file identifier (GSQL_FILENAME_n
) is mentioned in the USING
clause will be used. However, if the USING
clause is omitted, then the entire loading job will be run.
If a file_descriptor
is given, it overrides the file_descriptor
defined in the loading job. If a particular filevar
is not assigned a file_descriptor
either in the loading job or in the RUN LOADING JOB
statement, an error is reported and the job exits.
A loading job from an external Kafka clusters runs in streaming mode regardless of the EOF setting, i.e., it continuously read new records from the source topic.
|
Manage and monitor your loading job
When a loading job starts, the GSQL server assigns it a job ID and displays it for the user to see. There are four key commands to monitor and manage loading jobs:
SHOW LOADING STATUS job_id|ALL
ABORT LOADING JOB job_id|ALL
RESUME LOADING JOB job_id
SHOW LOADING ERROR job_id
SHOW LOADING STATUS
shows the current status of either a specified loading job or all current jobs, this command should be within the scope of a graph:
GSQL > USE GRAPH graph_name
GSQL > SHOW LOADING STATUS ALL
For each loading job, the above command reports the following information:
-
Loading status
-
Loaded lines/Loaded objects/Error lines
-
Average loading speed
-
Size of loaded data
-
Duration
When inspecting all current jobs with SHOW LOADING STATUS ALL
, the jobs in the FINISHED
state will be omitted as they are considered to have successfully finished.
You can use SHOW LOADING STATUS job_id
to check the historical information of finished jobs.
If the report for this job contains error data, you can use SHOW LOADING ERROR job_id
to see the original data that caused the error.
See Managing and Inspecting a Loading Job for more details.
Manage loading job concurrency
See Loading Job Concurrency for how to manage the concurrency of loading jobs.