Load from Spark Dataframe
The TigerGraph Spark Connector is used with Apache Spark to read data from a Spark DataFrame (or Data Lake) and write to TigerGraph.
Users can leverage it to connect TigerGraph to the Spark ecosystem and load data from any Spark data sources, e.g.:
-
Distributed file system: HDFS, S3, GCS and ABS
-
Streaming source: Kafka
-
Data warehouse: BigQuery, Snowflake, PostgreSql, and Redshift
-
Open table format: Delta Lake, Iceberg and Hudi
This connector has multiple optimisations and is backward compatible with TigerGraph v3.6.0+. Although, the job level loading statistics is only available for v3.10.0+.
The legacy Spark Connection Via JDBC Driver will be deprecated. Please migrate to this new connector. |
Prerequisite
Create a Loading Job
gsql ' CREATE VERTEX Comment (PRIMARY_ID id UINT, creationDate DATETIME, locationIP STRING, browserUsed STRING, content STRING, length UINT) WITH primary_id_as_attribute="TRUE", STATS="outdegree_by_edgetype" CREATE GRAPH demo_graph (*) ' gsql -g demo_graph ' CREATE LOADING JOB load_Comment FOR GRAPH test_graph { DEFINE FILENAME file_Comment; LOAD file_Comment TO VERTEX Comment VALUES ($1, $0, $2, $3, $4, $5) USING header="true", separator="|"; } '
Compatibility
-
TigerGraph 3.6.0 or higher.
-
Spark 3.2 or higher with Scala 2.12 and Scala 2.13.
-
JAVA 8 or higher.
Download the JARs
This connector can be downloaded from maven central repository: Maven Central it has 3 types of release format:
-
tigergraph-spark-connector-<version>.jar
: TheJAR
file containing only the compiled classes of the connector, which does not include any dependencies. -
tigergraph-spark-connector-<version>-jar-with-dependencies.jar
: TheJAR
file that includes compiled classes, as well as all the dependencies. -
tigergraph-spark-connector-<version>.tar.gz
: The compressedTAR
archive that includestigergraph-spark-connector-<version>.jar
and dependencies in separateJAR
files.
To use TigerGraph Spark connector in a Spark Shell:
--jars
option:spark-shell --jars tigergraph-spark-connector-<version>-jar-with-dependencies.jar
If you want to include TigerGraph Spark Connector in your Spark installation, add the |
Configure Logging
It’s highly recommended to set log level to info to monitor the loading status.
Configure a Spark log4j conf/log4j2.properties
as follows, or other SLF4j bindings if any:
logger.tg.name = com.tigergraph logger.tg.level = info
Overview
TigerGraph Spark loading is leveraging /ddl
endpoint.
It requires a pre-defined loading job, then the dataframe will be processed as follows:
+--------------------+-------------+-------------+-----------+--------------------+------+ | creationDate| id| locationIP|browserUsed| content|length| +--------------------+-------------+-------------+-----------+--------------------+------+ |2012-07-04T06:10:...|7696585588755| 46.23.82.182| Firefox| LOL| 3| |2012-08-22T17:22:...|8246341402699| 27.62.125.4| Chrome| roflol| 6| |2012-05-08T21:02:...|7146829775042| 61.1.50.205| Chrome| roflol| 6| |2012-11-22T01:25:...|9345853030654|190.95.68.192| Firefox|About Sergei Eise...| 79| |2012-11-11T08:59:...|9345853030710|166.75.225.76| Chrome| good| 4| +--------------------+-------------+-------------+-----------+--------------------+------+
- loading.separator = "|" - loading.eol = "\n" the processed data would be: 2012-07-04T06:10:43.489+00:00|7696585588755|46.23.82.182|Firefox|LOL|3 2012-08-22T17:22:20.315+00:00|8246341402699|27.62.125.4|Chrome|roflol|6 2012-05-08T21:02:39.145+00:00|7146829775042|61.1.50.205|Chrome|roflol|6 2012-11-22T01:25:39.670+00:00|9345853030654|190.95.68.192|Firefox|About Sergei Eisenstein, pioneering SAbout Steven Spielberg, makers in thAbout|79 2012-11-11T08:59:21.311+00:00|9345853030710|166.75.225.76|Chrome|good|4
- loading.job = "load_Comment" - loading.filename = "file_Comment"
To let TigerGraph parse the data chunk correctly, please make sure you are setting loading.separator
and loading.eol
to an unique character.
Batch Write
val GRAPH = "demo_graph"
val URL = "http(s)://hostname:port[,http(s)://hostname:port]*"
val USERNAME = "tigergraph"
val PASSWORD = "tigergraph"
val LOADING_JOB = "load_Comment"
val FILENAME = "file_Comment"
val SEPARATOR = "|"
val df = spark.read.json("path/to/person.json")
df.write
.format("tigergraph")
.mode("append")
.options(
Map(
"graph" -> GRAPH,
"url" -> URL,
"username" -> USERNAME,
"password" -> PASSWORD,
"loading.job" -> LOADING_JOB,
"loading.filename" -> FILENAME,
"loading.separator" -> SEPARATOR
)
)
.save()
Write with Spark Structured Streaming API
val GRAPH = "Social_Net"
val URL = "http(s)://hostname:port"
val USERNAME = "tigergraph"
val PASSWORD = "tigergraph"
val LOADING_JOB = "load_person"
val FILENAME = "f1"
val SEPARATOR = "|"
val df = spark.readStream
.format("kafka")
.option("subscribe", "person")
.load()
.selectExpr("CAST(value AS STRING)").as[(String)]
df.writeStream
.outputMode("append")
.format("tigergraph")
.option("checkpointLocation", "/path/to/checkpoint")
.options(
Map(
"graph" -> GRAPH,
"url" -> URL,
"username" -> USERNAME,
"password" -> PASSWORD,
"loading.job" -> LOADING_JOB,
"loading.filename" -> FILENAME,
"loading.separator" -> SEPARATOR
)
)
.start()
.awaitTermination()
Authenticate with the username/password or secret to support refreshing the token, otherwise, make sure the lifetime of the token is long enough for your purposes. |
Configuration
Key | Default Value | Description | Group |
---|---|---|---|
|
(none) |
The connection URL to TigerGraph cluster. It can be a list of URLs separated by comma for load balancing. Example: http://192.168.1.1:14240, http://192.168.1.2:14240, http://192.168.1.3:14240 |
General |
|
(none) |
The graph name. |
|
|
(none) |
The GSQL username. |
Authentication (You can choose any authentication method for data loading, but it’s recommended to give username/password pair, which can generate and refresh token automatically.) |
|
(none) |
The GSQL password. |
Authentication |
|
(none) |
The GSQL secret. |
Authentication |
|
(none) |
The Bearer token for RESTPP. |
Authentication |
|
(none) |
The GSQL loading job name. |
Loading Job |
|
(none) |
The filename defined in the loading job. |
Loading Job |
|
, |
The column separator. |
Loading Job |
|
\n |
The line separator. |
Loading Job |
|
2097152 |
The maximum batch size in bytes. |
Loading Job |
|
(none) |
The loading timeout per batch. |
Loading Job |
|
(none) |
The threshold of the error objects count. The loading job will be aborted when reaching the limit. Only available for TigerGraph version 3.10.0+. |
Loading Job |
|
(none) |
The threshold of the error objects percentage. The loading job will be aborted when reaching the limit. Only available for TigerGraph version 3.10.0+. |
Loading Job |
|
5000 |
The initial retry interval for transient server errors. |
Loading Job |
|
10 |
The maximum retry attempts for transient server errors. |
Loading Job |
|
30000 |
The maximum retry interval for transient server errors. |
Loading Job |
|
basic |
The SSL mode: basic, verifyCA and verifyHostname. When setting it to verifyCA and verifyHostname, the truststore file should be given. |
SSL |
|
(none) |
Filename of the truststore which stores the SSL certificate chains. Add |
SSL |
|
JKS |
Truststore type, e.g., JKS, PKCS12 |
SSL |
|
(none) |
Password of the truststore. |
SSL |
|
30000 |
Connect timeout in ms. |
Transport Timeout |
|
60000 |
Socket read timeout in ms. |
Transport Timeout |
|
5000 |
The initial retry interval for transport timeout. |
Transport Timeout |
|
10000 |
The maximum retry interval for transport timeout. |
Transport Timeout |
|
5 |
The maximum retry attempts for transport timeout. |
Transport Timeout |
Use Cases
Load Data from Delta Lake
Batch Write
val df = spark.read.format("delta")
.load("/path/to/delta/table")
.select(
"creationDate",
"id",
"locationIP",
"browserUsed",
"content",
"length"
)
df.write
.format("tigergraph")
.mode("append")
.options(
Map(
"graph" -> GRAPH,
"url" -> URL,
"username" -> USERNAME,
"password" -> PASSWORD,
"loading.job" -> LOADING_JOB,
"loading.filename" -> FILENAME,
"loading.separator" -> SEPARATOR
)
)
.save()
Streaming Write(CDC)
val df = spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.load("/path/to/delta/table")
.filter(
$"_change_type" === "insert" || $"_change_type" === "update_postimage"
)
.select(
"creationDate",
"id",
"locationIP",
"browserUsed",
"content",
"length"
)
df.writeStream
.outputMode("append")
.format("tigergraph")
.option("checkpointLocation", "/path/to/checkpoint")
.options(
Map(
"graph" -> GRAPH,
"url" -> URL,
"username" -> USERNAME,
"password" -> PASSWORD,
"loading.job" -> LOADING_JOB,
"loading.filename" -> FILENAME,
"loading.separator" -> SEPARATOR
)
)
.start()
.awaitTermination()
For more details on Delta Lake see Welcome to the Delta Lake documentation — Delta Lake Documentation.
Load Data from Iceberg
Batch Write
val df = spark.table("catalog.db.table")
.select(
"creationDate",
"id",
"locationIP",
"browserUsed",
"content",
"length"
)
df.write
.format("tigergraph")
.mode("append")
.options(
Map(
"graph" -> GRAPH,
"url" -> URL,
"username" -> USERNAME,
"password" -> PASSWORD,
"loading.job" -> LOADING_JOB,
"loading.filename" -> FILENAME,
"loading.separator" -> SEPARATOR
)
)
.save()
Streaming Write(CDC)
val df = spark.readStream
.format("iceberg")
.option("stream-from-timestamp", 0L)
.load("catalog.db.table")
.select(
"creationDate",
"id",
"locationIP",
"browserUsed",
"content",
"length"
)
df.writeStream
.outputMode("append")
.format("tigergraph")
.option("checkpointLocation", "/path/to/checkpoint")
.options(
Map(
"graph" -> GRAPH,
"url" -> URL,
"username" -> USERNAME,
"password" -> PASSWORD,
"loading.job" -> LOADING_JOB,
"loading.filename" -> FILENAME,
"loading.separator" -> SEPARATOR
)
)
.start()
.awaitTermination()
For more details on Iceberg see Iceberg Apache: Getting Started
Load Data from Hudi
Batch Write
val df = spark.read
.format("hudi")
.load("/path/to/hudi/table")
.select(
"creationDate",
"id",
"locationIP",
"browserUsed",
"content",
"length"
)
df.write
.format("tigergraph")
.mode("append")
.options(
Map(
"graph" -> GRAPH,
"url" -> URL,
"username" -> USERNAME,
"password" -> PASSWORD,
"loading.job" -> LOADING_JOB,
"loading.filename" -> FILENAME,
"loading.separator" -> SEPARATOR
)
)
.save()
Streaming Write(CDC)
val df = spark.readStream
.format("hudi")
.load("/path/to/hudi/table")
.select(
"creationDate",
"id",
"locationIP",
"browserUsed",
"content",
"length"
)
df.writeStream
.outputMode("append")
.format("tigergraph")
.option("checkpointLocation", "/path/to/checkpoint")
.options(
Map(
"graph" -> GRAPH,
"url" -> URL,
"username" -> USERNAME,
"password" -> PASSWORD,
"loading.job" -> LOADING_JOB,
"loading.filename" -> FILENAME,
"loading.separator" -> SEPARATOR
)
)
.start()
.awaitTermination()
For more details on Hudi see Spark Guide | Apache Hudi.
Loading Statistics
When you configure the logging properly and set log level to info, the loading statistics will be logged.
There are 3 levels of stats:
-
Batch level: data will be loaded to TigerGraph by micro batches, malformed or invalid data count of the batch will be logged.
-
Partition level: the data source can contain multiple partitions, and the log will show how many rows of the partition has been sent to TigerGraph.
-
Job Level (only available for TigerGraph 3.10.0 or higher): The overall loading statistics of the Spark job aggregated by TigerGraph service KAFKASTRM-LL. It requires providing username and password to query /gsqlserver endpoint.
24/01/22 16:15:45 INFO TigerGraphBatchWrite: Overall loading statistics: [ {
"overall" : {
"duration" : 15792,
"size" : 48675207,
"progress" : 0,
"startTime" : 1706770863875,
"averageSpeed" : 29546,
"id" : "test_graph.load_Comment.spark.all.1706770859889",
"endTime" : 1706770879667,
"currentSpeed" : 29546,
"statistics" : {
"fileLevel" : {
"validLine" : 466594,
"notEnoughToken" : 0,
"tokenExceedsBuffer" : 0,
"rejectLine" : 0
},
"objectLevel" : {
"vertex" : [ {
"validObject" : 466593,
"typeName" : "Comment",
"invalidPrimaryId" : 1
} ]
}
}
},
"workers" : [ {
"tasks" : [ {
"filename" : "file_Comment"
} ]
}, {
"tasks" : [ {
"filename" : "file_Comment"
} ]
} ]
} ]
Level Statistic Reference
Row Level Statistics
Row Level Statistics | Description |
---|---|
|
Number of valid raw data lines parsed. |
|
Number of raw data lines rejected by the reject line rule in the loading script. |
|
Number of raw data lines with fewer tokens than what was specified by the loading script. |
|
Number of raw data lines containing invalid characters. |
|
Number of raw data lines containing oversize tokens (see |
|
Number of raw data lines that are empty. |
Object Level Statistics
Object Level Statistics | Description |
---|---|
|
Number of data records created. |
|
Number of token lists which passed the WHERE predicate filter. |
|
Number of token lists which failed the WHERE predicate filter. |
|
Number of token lists where the id token is invalid. |
|
Number of token lists where the id token is empty. |
|
Number of token lists where at least one of the attribute tokens is invalid. |
|
Number of token lists where at least one of the tokens corresponding to a UDT type attribute is invalid. |
|
Number of token lists where at least one of the tokens corresponding to an edge type’s source/target vertex type is invalid. |