Load from Spark Dataframe
The TigerGraph Spark Connector employs Apache Spark to read data from a Spark DataFrame (or Data Lake) and write to TigerGraph. This connector has multiple optimizations for high performance, scalability, and management.
Compatibility
-
TigerGraph 3.6.0 or higher. Job level loading statistics are only available for v3.10+.
-
Spark 3.2 or higher with Scala 2.12 and Scala 2.13.
-
JAVA 8 or higher.
Users can leverage it to connect TigerGraph to the Spark ecosystem and load data from any Spark-compatible data sources, such as:
-
Distributed file system: HDFS, S3, GCS and ABS
-
Streaming source: Kafka
-
Data warehouse: BigQuery, Snowflake, PostgreSql, and Redshift
-
Open table format: Delta Lake, Iceberg and Hudi
The legacy Spark Connection Via JDBC Driver is deprecated. Please migrate to this new connector. |
Overview
The first step is to read data into a Spark dataframe. Then, using a TigerGraph loading job which maps data fields from the dataframe into graph elements, the connector pulls data from Spark into TigerGraph.
+--------------------+-------------+-------------+-----------+--------------------+------+ | creationDate| id| locationIP|browserUsed| content|length| +--------------------+-------------+-------------+-----------+--------------------+------+ |2012-07-04T06:10:...|7696585588755| 46.23.82.182| Firefox| LOL| 3| |2012-08-22T17:22:...|8246341402699| 27.62.125.4| Chrome| roflol| 6| |2012-05-08T21:02:...|7146829775042| 61.1.50.205| Chrome| roflol| 6| |2012-11-22T01:25:...|9345853030654|190.95.68.192| Firefox|About Sergei Eise...| 79| |2012-11-11T08:59:...|9345853030710|166.75.225.76| Chrome| good| 4| +--------------------+-------------+-------------+-----------+--------------------+------+
- loading.separator = "|" - loading.eol = "\n" the processed data would be: 2012-07-04T06:10:43.489+00:00|7696585588755|46.23.82.182|Firefox|LOL|3 2012-08-22T17:22:20.315+00:00|8246341402699|27.62.125.4|Chrome|roflol|6 2012-05-08T21:02:39.145+00:00|7146829775042|61.1.50.205|Chrome|roflol|6 2012-11-22T01:25:39.670+00:00|9345853030654|190.95.68.192|Firefox|About Sergei Eisenstein, pioneering SAbout Steven Spielberg, makers in thAbout|79 2012-11-11T08:59:21.311+00:00|9345853030710|166.75.225.76|Chrome|good|4
- loading.job = "load_Comment" - loading.filename = "file_Comment"
To let TigerGraph parse the data chunk correctly, please make sure you are setting loading.separator
and loading.eol
to characters which do not appear in your data fields.
Setup
Download the JARs
This connector can be downloaded from the Maven central repository: Maven Central. The connector is available in 3 release formats:
-
tigergraph-spark-connector-<version>.jar
: TheJAR
file containing only the compiled classes of the connector, which does not include any dependencies. -
tigergraph-spark-connector-<version>-jar-with-dependencies.jar
: TheJAR
file that includes compiled classes, as well as all the dependencies. -
tigergraph-spark-connector-<version>.tar.gz
: The compressedTAR
archive that includestigergraph-spark-connector-<version>.jar
and dependencies in separateJAR
files.
To use the TigerGraph Spark connector in a Spark shell, use the --jars
option:
spark-shell --jars tigergraph-spark-connector-<version>-jar-with-dependencies.jar
If you want to include the TigerGraph Spark Connector in your Spark installation, add the |
Create a Loading Job
This loading job should map the dataframe’s columns to the desired graph element attributes.
gsql ' CREATE VERTEX Comment (PRIMARY_ID id UINT, creationDate DATETIME, locationIP STRING, browserUsed STRING, content STRING, length UINT) WITH primary_id_as_attribute="TRUE", STATS="outdegree_by_edgetype" CREATE GRAPH demo_graph (*) ' gsql -g demo_graph ' CREATE LOADING JOB load_Comment FOR GRAPH test_graph { DEFINE FILENAME file_Comment; LOAD file_Comment TO VERTEX Comment VALUES ($1, $0, $2, $3, $4, $5) USING header="true", separator="|"; } '
Configure the Connector
If you are using Spark Connector 0.2.0+, the following configuration parameters are required:
|
Key | Default Value | Description | Group |
---|---|---|---|
|
(none) |
The connection URL to TigerGraph cluster. It can be a list of URLs separated by comma for load balancing. Example: http://192.168.1.1:14240, http://192.168.1.2:14240, http://192.168.1.3:14240 |
General |
|
(none) |
The graph name. |
|
|
(none) |
The GSQL username. |
Authentication (You can choose any authentication method for data loading, but it’s recommended to give username/password pair, which can generate and refresh token automatically.) |
|
(none) |
The GSQL password. |
Authentication |
|
(none) |
The GSQL secret. |
Authentication |
|
(none) |
The Bearer token for RESTPP. |
Authentication |
|
(none) |
The GSQL loading job name. |
Loading Job |
|
(none) |
The filename defined in the loading job. |
Loading Job |
|
, |
The column separator. |
Loading Job |
|
\n |
The line separator. |
Loading Job |
|
2097152 |
The maximum batch size in bytes. |
Loading Job |
|
(none) |
The loading timeout per batch. |
Loading Job |
|
(none) |
The threshold of the error objects count. The loading job will be aborted when reaching the limit. Only available for TigerGraph version 3.10.0+. |
Loading Job |
|
(none) |
The threshold of the error objects percentage. The loading job will be aborted when reaching the limit. Only available for TigerGraph version 3.10.0+. |
Loading Job |
|
5000 |
The initial retry interval for transient server errors. |
Loading Job |
|
10 |
The maximum retry attempts for transient server errors. |
Loading Job |
|
30000 |
The maximum retry interval for transient server errors. |
Loading Job |
|
basic |
The SSL mode: basic, verifyCA and verifyHostname. When setting it to verifyCA and verifyHostname, the truststore file should be given. |
SSL |
|
(none) |
Filename of the truststore which stores the SSL certificate chains. Add |
SSL |
|
JKS |
Truststore type, e.g., JKS, PKCS12 |
SSL |
|
(none) |
Password of the truststore. |
SSL |
|
30000 |
Connect timeout in ms. |
Transport Timeout |
|
60000 |
Socket read timeout in ms. |
Transport Timeout |
|
5000 |
The initial retry interval for transport timeout. |
Transport Timeout |
|
10000 |
The maximum retry interval for transport timeout. |
Transport Timeout |
|
5 |
The maximum retry attempts for transport timeout. |
Transport Timeout |
|
(none) |
The URL where the Client Credentials grant request should be sent for retrieving the access token. This URL is typically provided by the Identity Provider (IdP) such as Azure AD or Auth0. For Azure AD:
For Auth0:
Note: Use the gadmin command to set the |
Authentication |
|
(none) |
A stringified JSON that carries parameters for retrieving the access token. The required parameters vary depending on the IdP. For Azure AD:
For Auth0:
|
Authentication |
Use Case Examples
Some Helpful tips
-
High Availabilty
To ensure fault tolerance, provide a list of all node URLs
in the url
option for the connector:
"url" -> "https://m1:14240,https://m2:14240,https://m3:14240,https://m4:14240",
Authentication Methods
|
We recommend using OAuth2 for authentication. Our implementation supports automatic token requests and refresh, and is more secure that username/password, offering convenience and security for both batch and streaming jobs.
The Spark connector now supports the following authentication methods:
-
oauth2
: Recommended for both batch and streaming jobs. The token is automatically requested and refreshed usingoauth2.url
andoauth2.parameters
. -
token
: Suitable for one-time Spark jobs, but lacks support for refresh tokens. -
username/password
: Works for both batch and streaming jobs, but is considered an older, less secure method. The token is automatically requested and refreshed. -
secret
: Also works for batch and streaming jobs, and the token is managed using the GSQL secret, but is less secure compared to OAuth2.
Batch Write Mode
val GRAPH = "demo_graph"
val URL = "http(s)://hostname:port[,http(s)://hostname:port]*"
val USERNAME = "tigergraph"
val PASSWORD = "tigergraph"
val LOADING_JOB = "load_Comment"
val FILENAME = "file_Comment"
val SEPARATOR = "|"
val VERSION = "3.10.1"
val LOG_LEVEL = "2"
val LOG_PATH = "/tmp/tigergraph-spark-connector.log"
val df = spark.read.json("path/to/person.json")
df.write
.format("tigergraph")
.mode("append")
.options(
Map(
"graph" -> GRAPH,
"url" -> URL,
"username" -> USERNAME,
"password" -> PASSWORD,
"loading.job" -> LOADING_JOB,
"loading.filename" -> FILENAME,
"loading.separator" -> SEPARATOR,
"version" -> VERSION,
"log.level" -> LOG_LEVEL,
"log.path" -> LOG_PATH
)
)
.save()
Streaming Mode with Spark Structured Streaming API
val GRAPH = "Social_Net"
val URL = "http(s)://hostname:port"
val USERNAME = "tigergraph"
val PASSWORD = "tigergraph"
val LOADING_JOB = "load_person"
val FILENAME = "f1"
val SEPARATOR = "|"
val VERSION = "3.10.1"
val LOG_LEVEL = "2"
val LOG_PATH = "/tmp/tigergraph-spark-connector.log"
val df = spark.readStream
.format("kafka")
.option("subscribe", "person")
.load()
.selectExpr("CAST(value AS STRING)").as[(String)]
df.writeStream
.outputMode("append")
.format("tigergraph")
.option("checkpointLocation", "/path/to/checkpoint")
.options(
Map(
"graph" -> GRAPH,
"url" -> URL,
"username" -> USERNAME,
"password" -> PASSWORD,
"loading.job" -> LOADING_JOB,
"loading.filename" -> FILENAME,
"loading.separator" -> SEPARATOR,
"version" -> VERSION,
"log.level" -> LOG_LEVEL,
"log.path" -> LOG_PATH
)
)
.start()
.awaitTermination()
Load Data from Delta Lake
Batch Write
val df = spark.read.format("delta")
.load("/path/to/delta/table")
.select(
"creationDate",
"id",
"locationIP",
"browserUsed",
"content",
"length"
)
df.write
.format("tigergraph")
.mode("append")
.options(
Map(
"graph" -> GRAPH,
"url" -> URL,
"username" -> USERNAME,
"password" -> PASSWORD,
"loading.job" -> LOADING_JOB,
"loading.filename" -> FILENAME,
"loading.separator" -> SEPARATOR,
"version" -> VERSION,
"log.level" -> LOG_LEVEL,
"log.path" -> LOG_PATH
)
)
.save()
Streaming Write(CDC)
val df = spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.load("/path/to/delta/table")
.filter(
$"_change_type" === "insert" || $"_change_type" === "update_postimage"
)
.select(
"creationDate",
"id",
"locationIP",
"browserUsed",
"content",
"length"
)
df.writeStream
.outputMode("append")
.format("tigergraph")
.option("checkpointLocation", "/path/to/checkpoint")
.options(
Map(
"graph" -> GRAPH,
"url" -> URL,
"username" -> USERNAME,
"password" -> PASSWORD,
"loading.job" -> LOADING_JOB,
"loading.filename" -> FILENAME,
"loading.separator" -> SEPARATOR,
"version" -> VERSION,
"log.level" -> LOG_LEVEL,
"log.path" -> LOG_PATH
)
)
.start()
.awaitTermination()
For more details on Delta Lake see Welcome to the Delta Lake documentation — Delta Lake Documentation.
Load Data from Iceberg
Batch Write
val df = spark.table("catalog.db.table")
.select(
"creationDate",
"id",
"locationIP",
"browserUsed",
"content",
"length"
)
df.write
.format("tigergraph")
.mode("append")
.options(
Map(
"graph" -> GRAPH,
"url" -> URL,
"username" -> USERNAME,
"password" -> PASSWORD,
"loading.job" -> LOADING_JOB,
"loading.filename" -> FILENAME,
"loading.separator" -> SEPARATOR,
"version" -> VERSION,
"log.level" -> LOG_LEVEL,
"log.path" -> LOG_PATH
)
)
.save()
Streaming Write(CDC)
val df = spark.readStream
.format("iceberg")
.option("stream-from-timestamp", 0L)
.load("catalog.db.table")
.select(
"creationDate",
"id",
"locationIP",
"browserUsed",
"content",
"length"
)
df.writeStream
.outputMode("append")
.format("tigergraph")
.option("checkpointLocation", "/path/to/checkpoint")
.options(
Map(
"graph" -> GRAPH,
"url" -> URL,
"username" -> USERNAME,
"password" -> PASSWORD,
"loading.job" -> LOADING_JOB,
"loading.filename" -> FILENAME,
"loading.separator" -> SEPARATOR,
"version" -> VERSION,
"log.level" -> LOG_LEVEL,
"log.path" -> LOG_PATH
)
)
.start()
.awaitTermination()
For more details on Iceberg see Iceberg Apache: Getting Started
Load Data from Hudi
Batch Write
val df = spark.read
.format("hudi")
.load("/path/to/hudi/table")
.select(
"creationDate",
"id",
"locationIP",
"browserUsed",
"content",
"length"
)
df.write
.format("tigergraph")
.mode("append")
.options(
Map(
"graph" -> GRAPH,
"url" -> URL,
"username" -> USERNAME,
"password" -> PASSWORD,
"loading.job" -> LOADING_JOB,
"loading.filename" -> FILENAME,
"loading.separator" -> SEPARATOR,
"version" -> VERSION,
"log.level" -> LOG_LEVEL,
"log.path" -> LOG_PATH
)
)
.save()
Streaming Write(CDC)
val df = spark.readStream
.format("hudi")
.load("/path/to/hudi/table")
.select(
"creationDate",
"id",
"locationIP",
"browserUsed",
"content",
"length"
)
df.writeStream
.outputMode("append")
.format("tigergraph")
.option("checkpointLocation", "/path/to/checkpoint")
.options(
Map(
"graph" -> GRAPH,
"url" -> URL,
"username" -> USERNAME,
"password" -> PASSWORD,
"loading.job" -> LOADING_JOB,
"loading.filename" -> FILENAME,
"loading.separator" -> SEPARATOR,
"version" -> VERSION,
"log.level" -> LOG_LEVEL,
"log.path" -> LOG_PATH
)
)
.start()
.awaitTermination()
For more details on Hudi see Spark Guide | Apache Hudi.
Loading Statistics
When you configure the logging properly and set log level to info
, the loading statistics will be logged.
There are 3 levels of stats:
-
Batch level: data will be loaded to TigerGraph in micro batches. The counts of malformed or invalid data of the batch will be logged.
-
Partition level: the data source can contain multiple partitions, and the log will show how many rows of the partition have been sent to TigerGraph.
-
Job Level (available for TigerGraph 3.10+): The overall loading statistics of the Spark job aggregated by TigerGraph service KAFKASTRM-LL. This requires providing username and password to the query/gsqlserver endpoint.
24/01/22 16:15:45 INFO TigerGraphBatchWrite: Overall loading statistics: [ {
"overall" : {
"duration" : 15792,
"size" : 48675207,
"progress" : 0,
"startTime" : 1706770863875,
"averageSpeed" : 29546,
"id" : "test_graph.load_Comment.spark.all.1706770859889",
"endTime" : 1706770879667,
"currentSpeed" : 29546,
"statistics" : {
"fileLevel" : {
"validLine" : 466594,
"notEnoughToken" : 0,
"tokenExceedsBuffer" : 0,
"rejectLine" : 0
},
"objectLevel" : {
"vertex" : [ {
"validObject" : 466593,
"typeName" : "Comment",
"invalidPrimaryId" : 1
} ]
}
}
},
"workers" : [ {
"tasks" : [ {
"filename" : "file_Comment"
} ]
}, {
"tasks" : [ {
"filename" : "file_Comment"
} ]
} ]
} ]
Row Level Statistics
Row Level Statistics | Description |
---|---|
|
Number of valid raw data lines parsed. |
|
Number of raw data lines rejected by the reject line rule in the loading script. |
|
Number of raw data lines with fewer tokens than what was specified by the loading script. |
|
Number of raw data lines containing invalid characters. |
|
Number of raw data lines containing oversize tokens (see |
|
Number of raw data lines that are empty. |
Object Level Statistics
Object Level Statistics | Description |
---|---|
|
Number of data records created. |
|
Number of token lists which passed the WHERE predicate filter. |
|
Number of token lists which failed the WHERE predicate filter. |
|
Number of token lists where the id token is invalid. |
|
Number of token lists where the id token is empty. |
|
Number of token lists where at least one of the attribute tokens is invalid. |
|
Number of token lists where at least one of the tokens corresponding to a UDT type attribute is invalid. |
|
Number of token lists where at least one of the tokens corresponding to an edge type’s source/target vertex type is invalid. |
|
Counts how many times a user-defined token function has thrown an exception during data loading. |