Load from Spark Dataframe

The TigerGraph Spark Connector employs Apache Spark to read data from a Spark DataFrame (or Data Lake) and write to TigerGraph. This connector has multiple optimizations for high performance, scalability, and management.

In TigerGraph 3.11+, it can also be used to to stream data from TigerGraph into Spark as a Dataframe.

Users can leverage it to connect TigerGraph to the Spark ecosystem and load data from any Spark data sources, e.g.:

The legacy Spark Connection Via JDBC Driver is deprecated. Please migrate to this new connector.

From version 4.1.3, TigerGraph’s Spark Connector supports OAuth2 authentication, enabling it to request JWT tokens from third-party Identity Providers (IdPs) like Azure AD and Auth0, enhancing security and token management for Spark jobs.

Prerequisites

  • TigerGraph 3.6.0 or higher. Job-level loading statistics are available in v3.10+.

  • Spark 3.2 or higher with Scala 2.12 and Scala 2.13.

  • JAVA 8 or higher.

Download the TigerGraph Spark Connector

This connector can be downloaded from the Maven central repository: Maven Central. The connector is available in three formats:

  • tigergraph-spark-connector-<version>.jar: The JAR file containing only the compiled classes of the connector, which does not include any dependencies.

  • tigergraph-spark-connector-<version>-jar-with-dependencies.jar: The JAR file that includes compiled classes, as well as all the dependencies.

  • tigergraph-spark-connector-<version>.tar.gz: The compressed TAR archive that includes tigergraph-spark-connector-<version>.jar and dependencies in separate JAR files.

To use the TigerGraph Spark connector in a Spark shell:

use the --jars option:
spark-shell --jars tigergraph-spark-connector-<version>-jar-with-dependencies.jar

If you want to include the TigerGraph Spark Connector in your Spark installation, add the JAR with dependencies to Spark’s jars folder.

Configure Logging

We highly recommend setting the log level to info to monitor the execution. There are two methods to enable logging for the connector.

Connector Default Logger

The connector has a built-in logger based on java.util.logging. To activate it, set the option log.level to 2, which enables info-level logging.

Please refer to Common Options to learn more about log.level and log.file options.

Spark Logger

Configure Spark Log4j by $SPARK_HOME/conf/log4j2.properties as follows, or other SLF4j bindings if any:

logger.tg.name = com.tigergraph
logger.tg.level = info

Create a Loading Job

This loading map should map the dataframe’s columns to the desired graph element attributes.

Example of simple graph schema and loading job
CREATE VERTEX Comment (PRIMARY_ID id UINT, creationDate DATETIME,
  locationIP STRING, browserUsed STRING, content STRING, length UINT)
  WITH primary_id_as_attribute="TRUE", STATS="outdegree_by_edgetype"
CREATE GRAPH demo_graph (*)

USE GRAPH demo_graph
CREATE LOADING JOB load_Comment FOR GRAPH demo_graph {
    DEFINE FILENAME file_Comment;
    LOAD file_Comment
        TO VERTEX Comment VALUES ($1, $0, $2, $3, $4, $5)
        USING header="true", separator="|";
}

Overview

The first step is the read data into a Spark dataframe. Then, using a TigerGraph loading job which maps data fields from the dataframe into graph elements, the connector pulls data from Spark into TigerGraph.

Spark dataframe:
+--------------------+-------------+-------------+-----------+--------------------+------+
|        creationDate|           id|   locationIP|browserUsed|             content|length|
+--------------------+-------------+-------------+-----------+--------------------+------+
|2012-07-04T06:10:...|7696585588755| 46.23.82.182|    Firefox|                 LOL|     3|
|2012-08-22T17:22:...|8246341402699|  27.62.125.4|     Chrome|              roflol|     6|
|2012-05-08T21:02:...|7146829775042|  61.1.50.205|     Chrome|              roflol|     6|
|2012-11-22T01:25:...|9345853030654|190.95.68.192|    Firefox|About Sergei Eise...|    79|
|2012-11-11T08:59:...|9345853030710|166.75.225.76|     Chrome|                good|     4|
+--------------------+-------------+-------------+-----------+--------------------+------+
The connector concatenates columns to delimited data.

If the following options are used,

- loading.separator = "|"
- loading.eol = "\n"

then the processed data would be

2012-07-04T06:10:43.489+00:00|7696585588755|46.23.82.182|Firefox|LOL|3
2012-08-22T17:22:20.315+00:00|8246341402699|27.62.125.4|Chrome|roflol|6
2012-05-08T21:02:39.145+00:00|7146829775042|61.1.50.205|Chrome|roflol|6
2012-11-22T01:25:39.670+00:00|9345853030654|190.95.68.192|Firefox|About Sergei Eisenstein, pioneering SAbout Steven Spielberg, makers in thAbout|79
2012-11-11T08:59:21.311+00:00|9345853030710|166.75.225.76|Chrome|good|4

The processed data will be sent to TigerGraph in batches as CSV file sources for the loading job:

- loading.job = "load_Comment"
- loading.filename = "file_Comment"

To let TigerGraph parse the data correctly, please make sure you set loading.separator and loading.eol to characters which do not appear in your data fields.

Usage

Database Connection

Predefined connection options:
val tgOptions = Map(
    "url" -> "http(s)://<tg_node_host>:<tg_nginx_port>",
    "version" -> "<tg_version>",
    "graph" -> "<graph_name>",
    "username" -> "<username>",
    "password" -> "<password>"
)

Some Helpful tips

  • Connection options object

To simplify configuration, you can bundle the options related to database connection in a data object (e.g., tgOptions) and then use .options(tgOptions) when running the connector. This approach helps keep user credentials separate from connection commands.

  • High Availability

To ensure fault tolerance, provide a list of all node URLs in the url option for the connector:

"url" -> "https://m1:14240,https://m2:14240,https://m3:14240,https://m4:14240",

Authentication Methods

We recommend using OAuth2 for authentication. Our implementation supports automatic token requests and refresh, and is more secure that username/password, offering convenience and security for both batch and streaming jobs.

The Spark connector now supports the following authentication methods:

  • oauth2: Recommended for both batch and streaming jobs. The token is automatically requested and refreshed using oauth2.url and oauth2.parameters.

  • token: Suitable for one-time Spark jobs, but lacks support for refresh tokens.

  • username/password: Works for both batch and streaming jobs, but is considered an older, less secure method. The token is automatically requested and refreshed.

  • secret: Also works for batch and streaming jobs, and the token is managed using the GSQL secret, but is less secure compared to OAuth2.

Batch Write

Read data from any Spark data sources into the dataframe:
val df = spark.read.json("path/to/Comment.json")
Batch write the data into TigerGraph:
df.write
    .format("tigergraph")
    .mode("append")
    .options(tgOptions)
    .option("loading.job", "load_Comment")
    .option("loading.filename", "file_Comment")
    .option("loading.separator", "|")
    .option("log.level", "2")
    .option("log.file", "/tmp/tigergraph-spark-connector.log")
    .save()

Write with Spark Structured Streaming API

Read data from any Spark streaming data sources into the dataframe:
val df = spark.readStream
    .format("kafka")
    .option("subscribe", "Comment")
    .load()
    .selectExpr("CAST(value AS STRING)").as[(String)]
Streaming write data to TigerGraph:
df.writeStream
    .outputMode("append")
    .format("tigergraph")
    .option("checkpointLocation", "/path/to/checkpoint")
    .options(tgOptions)
    .option("loading.job", "load_Comment")
    .option("loading.filename", "file_Comment")
    .option("loading.separator", "|")
    .option("log.level", "2")
    .option("log.file", "/tmp/tigergraph-spark-connector.log")
    .start()
    .awaitTermination()

Connector Options

Common Options

Beginning with version 0.2.0 of the connector, the following connector options are required: version, log.level, and log.file.

Key Default Value Description Group

url

(none)

The connection URL to TigerGraph cluster. It can be a list of URLs separated by comma for load balancing.

The port number can be retrieved by gadmin config get Nginx.Port. For TGCloud users, the default port number is 443.

Example: http(s)://192.168.1.1:14240, http(s)://192.168.1.2:14240, http(s)://192.168.1.3:14240

General

version

(none)

TigerGraph version, e.g., "3.10.1", "4.1.0".

General

graph

(none)

The graph name.

General

username

(none)

The GSQL username.

Authentication

(The username/password pair or secret based authentication is more preferred than a fixed token, as it can generate and refresh token automatically.)

password

(none)

The GSQL password.

Authentication

secret

(none)

The GSQL secret, which is recommended for TGCloud users.

Authentication

token

(none)

The classic Bearer token or JWT(since 3.10.1) for RESTPP authentication.

Authentication

ssl.mode

basic

The SSL mode. Supported values: basic, verifyCA and verifyHostname.

When setting it to verifyCA or verifyHostname, the truststore file should be given.

SSL

ssl.truststore

(none)

Filename of the truststore which stores the SSL certificate chains.

Add --files /path/to/trust.jks when submitting the Spark job.

SSL

ssl.truststore.type

JKS

Truststore type, e.g., JKS, PKCS12

SSL

ssl.truststore.password

(none)

Password of the truststore.

SSL

io.connect.timeout.ms

30000

Connect timeout in ms.

Transport Timeout

io.read.timeout.ms

60000

Socket read timeout in ms.

Transport Timeout

io.retry.interval.ms

5000

The initial retry interval for transport timeout.

Transport Timeout

io.max.retry.interval.ms

10000

The maximum retry interval for transport timeout.

Transport Timeout

io.max.retry.attempts

5

The maximum retry attempts for transport timeout.

Transport Timeout

log.level

(none)

The log level of the default logger. Available values: "0", "1", "2" and "3", which represent "ERROR", "WARN", "INFO" and "DEBUG".

The Spark logging configurations(log4j) will be omitted if this option is set.

The log will be printed to stderr unless log.file is set.

Logging

log.file

(none)

The log file name pattern, e.g., "/tmp/tigergraph-spark-connector.log". It requires setting log.level first.

Logging

oauth2.url

(none)

The URL where the Client Credentials grant request should be sent for retrieving the access token. This URL is typically provided by the Identity Provider (IdP) such as Azure AD or Auth0.

For Azure AD: https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token

For Auth0: https://{yourDomain}/oauth/token

Note: Use the gadmin command to set the Security.JWT.RSA.PublicKey in advance for JWT verification.

Authentication

oauth2.parameters

(none)

A stringified JSON that carries parameters for retrieving the access token. The required parameters vary depending on the IdP.

For Azure AD: { "client_id": "{client_id}", "client_secret": "{client_secret}", "scope": "https://storage.azure.com/.default" }

For Auth0: { "client_id": "{client_id}", "client_secret": "{client_secret}", "audience": "{api_identifier}" }

Authentication

Writer Options

Key Default Value Description Group

loading.job

(none)

The GSQL loading job name.

Loading Job

loading.filename

(none)

The filename defined in the loading job.

Loading Job

loading.separator

,

The column separator.

Loading Job

loading.eol

\n

The line separator.

Loading Job

loading.batch.size.bytes

2097152

The maximum batch size in bytes.

Loading Job

loading.timeout.ms

(none)

The loading timeout per batch.

Loading Job

loading.max.percent.error

(none)

The threshold of the error objects count. The loading job will be aborted when reaching the limit. Only available for TigerGraph version 3.10.0+.

Loading Job

loading.max.num.error

(none)

The threshold of the error objects percentage. The loading job will be aborted when reaching the limit. Only available for TigerGraph version 3.10.0+.

Loading Job

loading.retry.interval.ms

5000

The initial retry interval for transient server errors.

Loading Job

loading.max.retry.interval.ms

30000

The maximum retry interval for transient server errors.

Loading Job

loading.max.retry.attempts

10

The maximum retry attempts for transient server errors.

Loading Job

Use Cases

Load Data from Delta Lake

Batch Write

Load delta table to Spark dataframe:
val df = spark.read.format("delta")
    .load("/path/to/delta/table")
    .select(
        "creationDate",
        "id",
        "locationIP",
        "browserUsed",
        "content",
        "length"
    )
Batch write the data into TigerGraph:
df.write
    .format("tigergraph")
    .mode("append")
    .options(tgOptions)
    .option("loading.job", "load_Comment")
    .option("loading.filename", "file_Comment")
    .option("loading.separator", "|")
    .option("log.level", "2")
    .option("log.file", "/tmp/tigergraph-spark-connector.log")
    .save()

Streaming Write(CDC)

Streaming read from delta table:
val df = spark.readStream
    .format("delta")
    .option("readChangeFeed", "true")
    .load("/path/to/delta/table")
    .filter(
        $"_change_type" === "insert" || $"_change_type" === "update_postimage"
    )
    .select(
        "creationDate",
        "id",
        "locationIP",
        "browserUsed",
        "content",
        "length"
    )
Streaming write data to TigerGraph:
df.writeStream
    .outputMode("append")
    .format("tigergraph")
    .option("checkpointLocation", "/path/to/checkpoint")
    .options(tgOptions)
    .option("loading.job", "load_Comment")
    .option("loading.filename", "file_Comment")
    .option("loading.separator", "|")
    .start()
    .option("log.level", "2")
    .option("log.file", "/tmp/tigergraph-spark-connector.log")
    .awaitTermination()

Load Data from Iceberg

Batch Write

Load Iceberg table to Spark dataframe:
val df = spark.table("catalog.db.table")
    .select(
        "creationDate",
        "id",
        "locationIP",
        "browserUsed",
        "content",
        "length"
    )
Batch write the data into TigerGraph:
df.write
    .format("tigergraph")
    .mode("append")
    .options(tgOptions)
    .option("loading.job", "load_Comment")
    .option("loading.filename", "file_Comment")
    .option("loading.separator", "|")
    .option("log.level", "2")
    .option("log.file", "/tmp/tigergraph-spark-connector.log")
    .save()

Streaming Write(CDC)

Streaming read from Iceberg table:
val df = spark.readStream
    .format("iceberg")
    .option("stream-from-timestamp", 0L)
    .load("catalog.db.table")
    .select(
        "creationDate",
        "id",
        "locationIP",
        "browserUsed",
        "content",
        "length"
    )
Streaming write data to TigerGraph:
df.writeStream
    .outputMode("append")
    .format("tigergraph")
    .option("checkpointLocation", "/path/to/checkpoint")
    .options(tgOptions)
    .option("loading.job", "load_Comment")
    .option("loading.filename", "file_Comment")
    .option("loading.separator", "|")
    .option("log.level", "2")
    .option("log.file", "/tmp/tigergraph-spark-connector.log")
    .start()
    .awaitTermination()

For more details on Iceberg see Iceberg Apache: Getting Started

Load Data from Hudi

Batch Write

Load Hudi table to Spark dataframe:
val df = spark.read
    .format("hudi")
    .load("/path/to/hudi/table")
    .select(
        "creationDate",
        "id",
        "locationIP",
        "browserUsed",
        "content",
        "length"
    )
Batch write the data into TigerGraph
df.write
    .format("tigergraph")
    .mode("append")
    .options(tgOptions)
    .option("loading.job", "load_Comment")
    .option("loading.filename", "file_Comment")
    .option("loading.separator", "|")
    .option("log.level", "2")
    .option("log.file", "/tmp/tigergraph-spark-connector.log")
    .save()

Streaming Write(CDC)

Streaming read from Hudi table:
val df = spark.readStream
    .format("hudi")
    .load("/path/to/hudi/table")
    .select(
        "creationDate",
        "id",
        "locationIP",
        "browserUsed",
        "content",
        "length"
    )
Streaming write data to TigerGraph:
df.writeStream
    .outputMode("append")
    .format("tigergraph")
    .option("checkpointLocation", "/path/to/checkpoint")
    .options(tgOptions)
    .option("loading.job", "load_Comment")
    .option("loading.filename", "file_Comment")
    .option("loading.separator", "|")
    .option("log.level", "2")
    .option("log.file", "/tmp/tigergraph-spark-connector.log")
    .start()
    .awaitTermination()

For more details on Hudi see Spark Guide | Apache Hudi.

Loading Statistics

When you configure the logging properly and set log level to info, the loading statistics will be logged.

There are 3 levels of stats:

  • Batch level: Data will be loaded to TigerGraph in micro batches. The count of malformed or invalid data of each batch will be logged.

  • Partition level: The data source can contain multiple partitions, and the log will show how many rows of a partition has been sent to TigerGraph.

  • Job Level (only available for TigerGraph 3.10+): The overall loading statistics of the Spark job aggregated by TigerGraph service KAFKASTRM-LL. This requires providing username and password to query /gsqlserver endpoint.

Sample loading statistics:
24/01/22 16:15:45 INFO TigerGraphBatchWrite: Overall loading statistics: [ {
    "overall" : {
        "duration" : 15792,
        "size" : 48675207,
        "progress" : 0,
        "startTime" : 1706770863875,
        "averageSpeed" : 29546,
        "id" : "test_graph.load_Comment.spark.all.1706770859889",
        "endTime" : 1706770879667,
        "currentSpeed" : 29546,
        "statistics" : {
            "fileLevel" : {
                "validLine" : 466594,
                "notEnoughToken" : 0,
                "tokenExceedsBuffer" : 0,
                "rejectLine" : 0
            },
            "objectLevel" : {
                "vertex" : [ {
                "validObject" : 466593,
                "typeName" : "Comment",
                "invalidPrimaryId" : 1
                } ]
            }
        }
    },
    "workers" : [ {
        "tasks" : [ {
            "filename" : "file_Comment"
        } ]
    }, {
    "tasks" : [ {
        "filename" : "file_Comment"
        } ]
    } ]
} ]

Row Level Statistics

Row Level Statistics Description

validLine

Number of valid raw data lines parsed.

rejectLine

Number of raw data lines rejected by the reject line rule in the loading script.

notEnoughToken

Number of raw data lines with fewer tokens than what was specified by the loading script.

badCharacter

Number of raw data lines containing invalid characters.

tokenExceedsBuffer

Number of raw data lines containing oversize tokens (see gadmin config get GSQL.OutputTokenBufferSize).

emptyLine

Number of raw data lines that are empty.

Object Level Statistics

Object Level Statistics Description

validObject

Number of data records created.

passedCondition

Number of token lists which passed the WHERE predicate filter.

failedCondition

Number of token lists which failed the WHERE predicate filter.

invalidPrimaryId

Number of token lists where the id token is invalid.

noIdFound

Number of token lists where the id token is empty.

invalidAttribute

Number of token lists where at least one of the attribute tokens is invalid.

incorrectFixedBinaryLength

Number of token lists where at least one of the tokens corresponding to a UDT type attribute is invalid.

invalidVertexType

Number of token lists where at least one of the tokens corresponding to an edge type’s source/target vertex type is invalid.