Load from Spark Dataframe
The TigerGraph Spark Connector is used with Apache Spark to read data from a Spark DataFrame (or Data Lake) and write to TigerGraph.
As of TigerGraph 4.1, it can also be used from Spark to run a TigerGraph query and streaming data into Spark as a Dataframe.
Users can leverage it to connect TigerGraph to the Spark ecosystem and load data from any Spark data sources, e.g.:
-
Distributed file system: HDFS, S3, GCS and ABS
-
Streaming source: Kafka
-
Data warehouse: BigQuery, Snowflake, PostgreSql, and Redshift
-
Open table format: Delta Lake, Iceberg and Hudi
This connector has multiple optimizations and is backward compatible with TigerGraph v3.6.0+.
-
Job level loading statistics is available in v3.10+.
-
Support for reading data into Spark from TigerGraph is available in v4.1+.
The legacy Spark Connection Via JDBC Driver will be deprecated. Please migrate to this new connector. |
Prerequisites
Compatibility
-
TigerGraph 3.6.0 or higher.
-
Spark 3.2 or higher with Scala 2.12 and Scala 2.13.
-
JAVA 8 or higher.
Download the JARs
This connector can be downloaded from the Maven central repository: Maven Central. It is available in three formats:
-
tigergraph-spark-connector-<version>.jar
: TheJAR
file containing only the compiled classes of the connector, which does not include any dependencies. -
tigergraph-spark-connector-<version>-jar-with-dependencies.jar
: TheJAR
file that includes compiled classes, as well as all the dependencies. -
tigergraph-spark-connector-<version>.tar.gz
: The compressedTAR
archive that includestigergraph-spark-connector-<version>.jar
and dependencies in separateJAR
files.
Beginning with version 0.2.0 of the connector, the following connector options are required: |
To use the TigerGraph Spark connector in a Spark shell:
--jars
option:spark-shell --jars tigergraph-spark-connector-<version>-jar-with-dependencies.jar
If you want to include the TigerGraph Spark Connector in your Spark installation, add the |
Configure Logging
We highly recommend setting the log level to info
to monitor the execution. There are two methods to enable logging for the connector.
Connector Default Logger
The connector has a built-in logger based on java.util.logging.
To activate it, set the option log.level
to 2, which enables info-level logging.
Please refer to Common Options to learn more about log.level
and log.path
options.
Create a Loading Job
CREATE VERTEX Comment (PRIMARY_ID id UINT, creationDate DATETIME,
locationIP STRING, browserUsed STRING, content STRING, length UINT)
WITH primary_id_as_attribute="TRUE", STATS="outdegree_by_edgetype"
CREATE GRAPH demo_graph (*)
USE GRAPH demo_graph
CREATE LOADING JOB load_Comment FOR GRAPH demo_graph {
DEFINE FILENAME file_Comment;
LOAD file_Comment
TO VERTEX Comment VALUES ($1, $0, $2, $3, $4, $5)
USING header="true", separator="|";
}
Overview
TigerGraph Spark loading leverages TigerGraph loading job’s built-in endpoint. It requires a pre-defined loading job, then the dataframe will be processed as follows:
+--------------------+-------------+-------------+-----------+--------------------+------+ | creationDate| id| locationIP|browserUsed| content|length| +--------------------+-------------+-------------+-----------+--------------------+------+ |2012-07-04T06:10:...|7696585588755| 46.23.82.182| Firefox| LOL| 3| |2012-08-22T17:22:...|8246341402699| 27.62.125.4| Chrome| roflol| 6| |2012-05-08T21:02:...|7146829775042| 61.1.50.205| Chrome| roflol| 6| |2012-11-22T01:25:...|9345853030654|190.95.68.192| Firefox|About Sergei Eise...| 79| |2012-11-11T08:59:...|9345853030710|166.75.225.76| Chrome| good| 4| +--------------------+-------------+-------------+-----------+--------------------+------+
If the following options are used,
-
loading.separator = "|"
-
loading.eol = "\n"
then the processed data would be
2012-07-04T06:10:43.489+00:00|7696585588755|46.23.82.182|Firefox|LOL|3 2012-08-22T17:22:20.315+00:00|8246341402699|27.62.125.4|Chrome|roflol|6 2012-05-08T21:02:39.145+00:00|7146829775042|61.1.50.205|Chrome|roflol|6 2012-11-22T01:25:39.670+00:00|9345853030654|190.95.68.192|Firefox|About Sergei Eisenstein, pioneering SAbout Steven Spielberg, makers in thAbout|79 2012-11-11T08:59:21.311+00:00|9345853030710|166.75.225.76|Chrome|good|4
-
loading.job = "load_Comment"
-
loading.filename = "file_Comment"
To let TigerGraph parse the data correctly, please make sure you are setting loading.separator
and loading.eol
to an unique character.
Usage
Database Connection
You may find it convenient to bundle the options related to making a database connection in a data object (e.g. tgOptions
) and then use .options(tgOptions)
when running the connector.
Using username/password has the advantage that this authentication method will automatically refresh an access token as needed. Placing the credentials in a data object keeps them separate for the connection commands.
If you choose to use a token, please make sure the lifetime of the token is long enough for the loading job.
val tgOptions = Map(
"url" -> "http(s)://<tg_node_host>:<tg_nginx_port>",
"version" -> "<tg_version>",
"graph" -> "<graph_name>",
"username" -> "<username>",
"password" -> "<password>"
)
Authentication
Authenticate with the username/password or secret to automatically maintain a valid token; otherwise, make sure the lifetime of your token is long enough for the loading job. |
Batch Write
val df = spark.read.json("path/to/Comment.json")
df.write
.format("tigergraph")
.mode("append")
.options(tgOptions)
.option("loading.job", "load_Comment")
.option("loading.filename", "file_Comment")
.option("loading.separator", "|")
.option("log.level", "2")
.option("log.path", "/tmp/tigergraph-spark-connector.log")
.save()
Write with Spark Structured Streaming API
val df = spark.readStream
.format("kafka")
.option("subscribe", "Comment")
.load()
.selectExpr("CAST(value AS STRING)").as[(String)]
df.writeStream
.outputMode("append")
.format("tigergraph")
.option("checkpointLocation", "/path/to/checkpoint")
.options(tgOptions)
.option("loading.job", "load_Comment")
.option("loading.filename", "file_Comment")
.option("loading.separator", "|")
.option("log.level", "2")
.option("log.path", "/tmp/tigergraph-spark-connector.log")
.start()
.awaitTermination()
Connector Options
Common Options
Key | Default Value | Description | Group |
---|---|---|---|
|
(none) |
The connection URL to TigerGraph cluster. It can be a list of URLs separated by comma for load balancing. The port number can be retrieved by Example: http(s)://192.168.1.1:14240, http(s)://192.168.1.2:14240, http(s)://192.168.1.3:14240 |
General |
|
(none) |
TigerGraph version, e.g., "3.10.1", "4.1.0". |
General |
|
(none) |
The graph name. |
General |
|
(none) |
The GSQL username. |
Authentication (The username/password pair or secret based authentication is more preferred than a fixed token, as it can generate and refresh token automatically.) |
|
(none) |
The GSQL password. |
Authentication |
|
(none) |
The GSQL secret, which is recommended for TGCloud users. |
Authentication |
|
(none) |
The classic Bearer token or JWT(since 3.10.1) for RESTPP authentication. |
Authentication |
|
basic |
The SSL mode, supported values: When setting it to |
SSL |
|
(none) |
Filename of the truststore which stores the SSL certificate chains. Add |
SSL |
|
JKS |
Truststore type, e.g., JKS, PKCS12 |
SSL |
|
(none) |
Password of the truststore. |
SSL |
|
30000 |
Connect timeout in ms. |
Transport Timeout |
|
60000 |
Socket read timeout in ms. |
Transport Timeout |
|
5000 |
The initial retry interval for transport timeout. |
Transport Timeout |
|
10000 |
The maximum retry interval for transport timeout. |
Transport Timeout |
|
5 |
The maximum retry attempts for transport timeout. |
Transport Timeout |
|
(none) |
The log level of the default logger. Available values: "0", "1", "2" and "3", which represent "ERROR", "WARN", "INFO" and "DEBUG". The Spark logging configurations(log4j) will be omitted if this option is set. The log will be printed to stderr unless |
Logging |
|
(none) |
The log file name pattern, e.g., "/tmp/tigergraph-spark-connector.log". It requires setting |
Logging |
Writer Options
Key | Default Value | Description | Group |
---|---|---|---|
|
(none) |
The GSQL loading job name. |
Loading Job |
|
(none) |
The filename defined in the loading job. |
Loading Job |
|
, |
The column separator. |
Loading Job |
|
\n |
The line separator. |
Loading Job |
|
2097152 |
The maximum batch size in bytes. |
Loading Job |
|
(none) |
The loading timeout per batch. |
Loading Job |
|
(none) |
The threshold of the error objects count. The loading job will be aborted when reaching the limit. Only available for TigerGraph version 3.10.0+. |
Loading Job |
|
(none) |
The threshold of the error objects percentage. The loading job will be aborted when reaching the limit. Only available for TigerGraph version 3.10.0+. |
Loading Job |
|
5000 |
The initial retry interval for transient server errors. |
Loading Job |
|
30000 |
The maximum retry interval for transient server errors. |
Loading Job |
|
10 |
The maximum retry attempts for transient server errors. |
Loading Job |
Use Cases
Load Data from Delta Lake
Batch Write
val df = spark.read.format("delta")
.load("/path/to/delta/table")
.select(
"creationDate",
"id",
"locationIP",
"browserUsed",
"content",
"length"
)
df.write
.format("tigergraph")
.mode("append")
.options(tgOptions)
.option("loading.job", "load_Comment")
.option("loading.filename", "file_Comment")
.option("loading.separator", "|")
.option("log.level", "2")
.option("log.path", "/tmp/tigergraph-spark-connector.log")
.save()
Streaming Write(CDC)
val df = spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.load("/path/to/delta/table")
.filter(
$"_change_type" === "insert" || $"_change_type" === "update_postimage"
)
.select(
"creationDate",
"id",
"locationIP",
"browserUsed",
"content",
"length"
)
df.writeStream
.outputMode("append")
.format("tigergraph")
.option("checkpointLocation", "/path/to/checkpoint")
.options(tgOptions)
.option("loading.job", "load_Comment")
.option("loading.filename", "file_Comment")
.option("loading.separator", "|")
.start()
.option("log.level", "2")
.option("log.path", "/tmp/tigergraph-spark-connector.log")
.awaitTermination()
For more details on Delta Lake see Welcome to the Delta Lake documentation — Delta Lake Documentation.
Load Data from Iceberg
Batch Write
val df = spark.table("catalog.db.table")
.select(
"creationDate",
"id",
"locationIP",
"browserUsed",
"content",
"length"
)
df.write
.format("tigergraph")
.mode("append")
.options(tgOptions)
.option("loading.job", "load_Comment")
.option("loading.filename", "file_Comment")
.option("loading.separator", "|")
.option("log.level", "2")
.option("log.path", "/tmp/tigergraph-spark-connector.log")
.save()
Streaming Write(CDC)
val df = spark.readStream
.format("iceberg")
.option("stream-from-timestamp", 0L)
.load("catalog.db.table")
.select(
"creationDate",
"id",
"locationIP",
"browserUsed",
"content",
"length"
)
df.writeStream
.outputMode("append")
.format("tigergraph")
.option("checkpointLocation", "/path/to/checkpoint")
.options(tgOptions)
.option("loading.job", "load_Comment")
.option("loading.filename", "file_Comment")
.option("loading.separator", "|")
.option("log.level", "2")
.option("log.path", "/tmp/tigergraph-spark-connector.log")
.start()
.awaitTermination()
For more details on Iceberg see Iceberg Apache: Getting Started
Load Data from Hudi
Batch Write
val df = spark.read
.format("hudi")
.load("/path/to/hudi/table")
.select(
"creationDate",
"id",
"locationIP",
"browserUsed",
"content",
"length"
)
df.write
.format("tigergraph")
.mode("append")
.options(tgOptions)
.option("loading.job", "load_Comment")
.option("loading.filename", "file_Comment")
.option("loading.separator", "|")
.option("log.level", "2")
.option("log.path", "/tmp/tigergraph-spark-connector.log")
.save()
Streaming Write(CDC)
val df = spark.readStream
.format("hudi")
.load("/path/to/hudi/table")
.select(
"creationDate",
"id",
"locationIP",
"browserUsed",
"content",
"length"
)
df.writeStream
.outputMode("append")
.format("tigergraph")
.option("checkpointLocation", "/path/to/checkpoint")
.options(tgOptions)
.option("loading.job", "load_Comment")
.option("loading.filename", "file_Comment")
.option("loading.separator", "|")
.option("log.level", "2")
.option("log.path", "/tmp/tigergraph-spark-connector.log")
.start()
.awaitTermination()
For more details on Hudi see Spark Guide | Apache Hudi.
Loading Statistics
When you configure the logging properly and set log level to info, the loading statistics will be logged.
There are 3 levels of stats:
-
Batch level: data will be loaded to TigerGraph by micro batches, malformed or invalid data count of the batch will be logged.
-
Partition level: the data source can contain multiple partitions, and the log will show how many rows of the partition has been sent to TigerGraph.
-
Job Level (only available for TigerGraph 3.10.0 or higher): The overall loading statistics of the Spark job aggregated by TigerGraph service KAFKASTRM-LL. It requires providing username and password to query /gsqlserver endpoint.
24/01/22 16:15:45 INFO TigerGraphBatchWrite: Overall loading statistics: [ {
"overall" : {
"duration" : 15792,
"size" : 48675207,
"progress" : 0,
"startTime" : 1706770863875,
"averageSpeed" : 29546,
"id" : "test_graph.load_Comment.spark.all.1706770859889",
"endTime" : 1706770879667,
"currentSpeed" : 29546,
"statistics" : {
"fileLevel" : {
"validLine" : 466594,
"notEnoughToken" : 0,
"tokenExceedsBuffer" : 0,
"rejectLine" : 0
},
"objectLevel" : {
"vertex" : [ {
"validObject" : 466593,
"typeName" : "Comment",
"invalidPrimaryId" : 1
} ]
}
}
},
"workers" : [ {
"tasks" : [ {
"filename" : "file_Comment"
} ]
}, {
"tasks" : [ {
"filename" : "file_Comment"
} ]
} ]
} ]
Level Statistic Reference
Row Level Statistics
Row Level Statistics | Description |
---|---|
|
Number of valid raw data lines parsed. |
|
Number of raw data lines rejected by the reject line rule in the loading script. |
|
Number of raw data lines with fewer tokens than what was specified by the loading script. |
|
Number of raw data lines containing invalid characters. |
|
Number of raw data lines containing oversize tokens (see |
|
Number of raw data lines that are empty. |
Object Level Statistics
Object Level Statistics | Description |
---|---|
|
Number of data records created. |
|
Number of token lists which passed the WHERE predicate filter. |
|
Number of token lists which failed the WHERE predicate filter. |
|
Number of token lists where the id token is invalid. |
|
Number of token lists where the id token is empty. |
|
Number of token lists where at least one of the attribute tokens is invalid. |
|
Number of token lists where at least one of the tokens corresponding to a UDT type attribute is invalid. |
|
Number of token lists where at least one of the tokens corresponding to an edge type’s source/target vertex type is invalid. |