Read to Spark Dataframe

The TigerGraph Spark Connector employs Apache Spark to read data from TigerGraph.

Supported reader types:

  • Read Vertices

  • Read Edges

  • Run Pre-installed Query

  • Run Interpreted Query

The same connector can be used as an intermediary to read data from a variety of sources and then stream data from Spark to TigerGraph.

The legacy Spark Connection Via JDBC Driver is deprecated. Please migrate to this new connector.

Prerequisite

Compatibility

  • TigerGraph 3.6.0 or higher. Job-level loading statistics are available in v3.10+.

  • Spark 3.2 or higher with Scala 2.12 and Scala 2.13.

  • JAVA 8 or higher.

Download the JARs

This connector can be downloaded from the Maven central repository: Maven Central. The connector is available in three formats:

  • tigergraph-spark-connector-<version>.jar: The JAR file containing only the compiled classes of the connector, which does not include any dependencies.

  • tigergraph-spark-connector-<version>-jar-with-dependencies.jar: The JAR file that includes compiled classes, as well as all the dependencies.

  • tigergraph-spark-connector-<version>.tar.gz: The compressed TAR archive that includes tigergraph-spark-connector-<version>.jar and dependencies in separate JAR files.

To use the TigerGraph Spark connector in a Spark shell:

use the --jars option:
spark-shell --jars tigergraph-spark-connector-<version>-jar-with-dependencies.jar

If you want to include the TigerGraph Spark Connector in your Spark installation, add the JAR with dependencies to Spark’s jars folder.

Configure Logging

We highly recommend setting the log level to info to monitor the execution. There are two methods to enable logging for the connector.

Connector Default Logger

The connector has a built-in logger based on java.util.logging. To activate it, set the option log.level to 2, which enables info-level logging.

Please refer to Common Options to learn more about log.level and log.file options.

Spark Logger

Configure Spark Log4j by $SPARK_HOME/conf/log4j2.properties as follows, or other SLF4j bindings if any:

logger.tg.name = com.tigergraph
logger.tg.level = info

Overview

TigerGraph Spark connector provides 4 kinds of reader options to read data from TigerGraph:

Option Description

query.vertex

Read vertex/vertices with the attributes.

query.edge

Read edge(s) with the attributes.

query.installed

Run a pre-installed query and read the outputs of PRINT statement.

query.interpreted

Run a query in interpreted mode and read the outputs of PRINT statement.

Please refer to the following sections for detailed usage and examples. Prior to that, ensure that the connection options are defined in advance.

Database Connection

Connection options object

You may find it convenient to bundle the options related to making a database connection in a data object (e.g., tgOptions) and then use .options(tgOptions) when running the connector. Moreover, placing the user credentials in a data object keeps them separate from the connection commands.

Predefined connection options:
val tgOptions = Map(
    "url" -> "http(s)://<tg_node_host>:<tg_nginx_port>",
    "version" -> "<tg_version>",
    "graph" -> "<graph_name>",
    "username" -> "<username>",
    "password" -> "<password>"
)
Authentication methods

Using username/password has the advantage that this authentication method will automatically refresh an access token as needed.

If you choose to use a token, please make sure the lifetime of the token is long enough for the loading job.

Read Vertices

Read all vertices by type

Option Value

query.vertex

<vertex_type>

Example: read all vertices of type Person
val df = spark.read
    .format("tigergraph")
    .options(tgOptions)
    .option("query.vertex", "Person")
    .load()

df.show()
Results
+--------+--------+---+------+-----+
|    v_id|    name|age|gender|state|
+--------+--------+---+------+-----+
|person42|person42| 42|female|   ny|
|person12|person12| 12|  male|   ny|
+--------+--------+---+------+-----+

Partitioned Vertex Query

To fully utilize the parallel and distributed processing capabilities of Spark and TigerGraph, it is recommended to use partitioned queries. This approach involves splitting a query into multiple range queries that are executed in parallel.

Option Value

query.partition.key

The name of the attribute that has been indexed.

query.partition.num

The expected partition number.

query.partition.upper.bound

The upper bound used to calculate the partition stride.

query.partition.lower.bound

The lower bound used to calculate the partition stride.

Notice that query.partition.upper.bound and query.partition.lower.bound are just used to decide the partition stride, not for filtering the vertices.

Example
val df = spark.read
    .format("tigergraph")
    .options(tgOptions)
    .option("query.vertex", "Person")
    .option("query.partition.key", "age")
    .option("query.partition.num", "5")
    .option("query.partition.upper.bound", "50")
    .option("query.partition.lower.bound", "20")
    .load()

df.show()

This query will generate 5 separate range queries:

  • age < 20

  • 20 ≤ age < 30

  • 30 ≤ age < 40

  • 40 ≤ age < 50

  • 50 ≤ age

Read a vertex by primary ID

Option Value

query.vertex

<vertex_type>.<vertex_id>

Example: read a vertex of type Person and primary ID person2
val df = spark.read
    .format("tigergraph")
    .options(tgOptions)
    .option("query.vertex", "Person.person2")
    .load()

df.show()
Results
+--------+--------+---+------+-----+
|    v_id|    name|age|gender|state|
+--------+--------+---+------+-----+
| person2| person2|  3|  male|   tx|
+--------+--------+---+------+-----+

Read Edges

Read all edges of a vertex

Option Value

query.edge

<src_vertex_type>.<src_vertex_id>

Example: read all edges of vertex type Person, ID person1
val df = spark.read
    .format("tigergraph")
    .options(tgOptions)
    .option("query.edge", "Person.person1")
    .load()

df.show()
Results
+---------+-------+-------+-------+-------+--------------------+
|from_type|from_id|to_type|  to_id| e_type|          attributes|
+---------+-------+-------+-------+-------+--------------------+
|   Person|person1|  Photo|photo16|  Likes|{"click_time":"20...|
|   Person|person1|  Short| short1|  Likes|{"click_time":"20...|
|   Person|person1|  Short| short1|Watches|      {"share":true}|
|   Person|person1|  Short|short19|Watches|      {"share":true}|
+---------+-------+-------+-------+-------+--------------------+

The results can contain different types of edges, so the edge attributes won’t be flattened.

Read edges of a vertex by edge type

Option Value

query.edge

<src_vertex_type>.<src_vertex_id>.<edge_type>

Example: read all edges of type Likes from vertex type Person, ID person1
val df = spark.read
    .format("tigergraph")
    .options(tgOptions)
    .option("query.edge", "Person.person1.Likes")
    .load()

df.show()
Results
+---------+-------+-------+-------+-------------------+
|from_type|from_id|to_type|  to_id|         click_time|
+---------+-------+-------+-------+-------------------+
|   Person|person1|  Photo|photo16|2024-01-22 03:16:45|
|   Person|person1|  Short| short1|2019-07-15 09:23:41|
+---------+-------+-------+-------+-------------------+

Read edges of a vertex by edge type and target vertex type

Option Value

query.edge

<src_vertex_type>.<src_vertex_id>.<edge_type>.<tgt_vertex_type>

Example: read all edges of type Likes from vertex type Person, ID person1 to vertex type Photo
val df = spark.read
    .format("tigergraph")
    .options(tgOptions)
    .option("query.edge", "Person.person1.Likes.Photo")
    .load()

df.show()
Results
+---------+-------+-------+-------+-------------------+
|from_type|from_id|to_type|  to_id|         click_time|
+---------+-------+-------+-------+-------------------+
|   Person|person1|  Photo|photo16|2024-01-22 03:16:45|
|   Person|person1|  Photo| photo7|2021-05-09 17:58:00|
+---------+-------+-------+-------+-------------------+

Read an edge by source vertex, target vertex, and edge type

Option Value

query.edge

<src_vertex_type>.<src_vertex_id>.<edge_type>.<tgt_vertex_type>.<tgt_vertex_id>

Example: read an edge of type Likes from vertex type Person and ID person1, to vertex type Photo and ID photo7
val df = spark.read
    .format("tigergraph")
    .options(tgOptions)
    .option("query.edge", "Person.person1.Likes.Photo.photo7")
    .load()

df.show()
Results
+---------+-------+-------+------+-------------------+
|from_type|from_id|to_type| to_id|         click_time|
+---------+-------+-------+------+-------------------+
|   Person|person1|  Photo|photo7|2021-05-09 17:58:00|
+---------+-------+-------+------+-------------------+

By default, we use "." to split the query fields. If the vertex ID contains ".", you can set a different separator to avoid the conflict:

.option("query.edge", "Person|jack.smith|Likes|Photo")
.option("query.field.separator", "|")

Query Operators

For the built-in vertex and edge queries mentioned above, there are several available query operators that can be used in combination.:

Option Value Example

query.op.select

Attributes of the selected vertices/edges to return separated by comma.

name,age

query.op.filter

Conditions used to filter the vertices/edges to return. The parameter takes a list of conditions, which is a string of comma-separated values.

age>20,gender=male

query.op.limit

An integer value that specifies the maximum limit of the total number of vertices/edges to return.

NOTE: When used with partitioned queries, the limit applies to each partition.

50

query.op.sort

Attributes to sort the results by. The parameter takes a list, which is a string of comma-separated values, and will sort all the vertices/edges based on the attributes provided in the list in order.

NOTE: When used with partitioned queries, the sort applies within each partition.

age,-salary

Example: read top 10 vertices of type Person, sorted by age(ASC) and last_name(DESC)
val df = spark.read
    .format("tigergraph")
    .options(tgOptions)
    .option("query.vertex", "Person")
    .option("query.op.limit", "10")
    .option("query.op.sort", "age,-last_name")
    .load()

df.show()
Example: read edges of type Invest, whose ratio attribute should be greater than 0.1, and select only the timestamp attribute:
val df = spark.read
    .format("tigergraph")
    .options(tgOptions)
    .option("query.edge", "Person.person1.Invest")
    .option("query.op.filter", "ratio>0.1")
    .option("query.op.select", "timestamp")
    .load()

df.show()

Run a GSQL Query

Run a Pre-installed Query

Option Value

query.installed

Name of the pre-installed query.

query.params

The query parameters in JSON format.

Please refer to Formatting data in JSON.

Example: run the query topK with k=10

GSQL Query topK
CREATE QUERY topK (INT k) {
    vSet = SELECT p FROM Person:p ORDER BY p.age LIMIT k;
    PRINT vSet;
}
Reader Options
val df = spark.read
    .format("tigergraph")
    .options(tgOptions)
    .option("query.installed", "topK")
    .option("query.params", "{\"k\": 10}")
    .load()

df.show()

Run an Interpreted Query

Option Value

query.interpreted

Query body of an anonymous query.

Example: run the anonymous topK query in interpreted mode with k=10. Because no parameter is allowed, the parameter k is set within the query.

Define the interpreted query body
val queryBody = """
    INTERPRET QUERY () FOR GRAPH gsql_demo {
        INT k=10;
        vSet = SELECT p FROM Person:p ORDER BY p.age LIMIT k;
        PRINT vSet;
    }
"""
Reader Options
val df = spark.read
    .format("tigergraph")
    .options(tgOptions)
    .option("query.interpreted", queryBody)
    .load()

df.show()

DataFrame Schema

Schema Inference

The output format of a GSQL query is flexible, as you can include multiple PRINT statements within a single query, with each PRINT statement capable of printing multiple objects.

By default, the output of each PRINT statement is converted into a row of a DataFrame:

A portion of a GSQL query with multiple PRINT statements
...
ListAccum<INT> @@list_accum = [1,3,5];
PRINT @@list_accum, @@list_accum.size();
PRINT @@list_accum.get(0);
...
The resulting DataFrame includes two rows corresponding to the outputs of two PRINT statements.
+------------------------------------------------+
|results                                         |
+------------------------------------------------+
|{"@@list_accum":[1,3,5],"@@list_accum.size()":3}|
|{"@@list_accum.get(0)":1}                       |
+------------------------------------------------+

However, sometimes we want to convert a vertex expression set or an accumulator into a Spark DataFrame, like @@list_accum, with each element being transformed into a row of the DataFrame.

The connector can automatically expand the vertex set or accumulator to Spark DataFrame under the following conditions:

  • It is an installed query instead of an interpreted query;

  • It contains only one PRINT statement, which only print a single object;

  • The printed object must either be a vertex expression set, or an accumulator.

You can also explicitly extract an object from multiple PRINT statements by option query.results.extract:

Option Value

query.results.extract

The row index and the object key concatenated with colon.

E.g.: 1:vSet, which extracts vSet from the output of the 2nd PRINT statement.

Example: The above list accumulator is extracted and expanded by setting "query.results.extract" to "0:@@list_accum".
+-------+
|results|
+-------+
|1      |
|3      |
|5      |
+-------+

Custom Schema

For interpreted queries and some installed queries, the schema cannot be inferred accurately, so it’s required to manually provide the schema by .schema(DDLString), whose column names should be the same as the corresponding JSON keys. Please check the example below:

Define the topK query in interpreted mode
val queryBody = """
    INTERPRET QUERY () FOR GRAPH gsql_demo {
        INT k=3;
        vSet = SELECT p FROM Person:p ORDER BY p.age LIMIT k;
        PRINT vSet;
    }
"""
1. without user given schema
val df = spark.read
    .format("tigergraph")
    .options(tgOptions)
    .option("query.interpreted", queryBody)
    .option("query.results.extract","0:vSet") // explicitly expand the vertex set
    .load()

df.show()
+-----------------------------------------------------------------------------------------------------------+
|results                                                                                                    |
+-----------------------------------------------------------------------------------------------------------+
|{"v_id":"person1","attributes":{"gender":"male","name":"person1","state":"fl","age":1},"v_type":"Person"}  |
|{"v_id":"person2","attributes":{"gender":"female","name":"person2","state":"ca","age":2},"v_type":"Person"}|
|{"v_id":"person3","attributes":{"gender":"male","name":"person3","state":"ny","age":3},"v_type":"Person"}  |
+-----------------------------------------------------------------------------------------------------------+
2. with user given schema
val df = spark.read
    .format("tigergraph")
    .schema("v_id STRING, gender STRING, name STRING, state STRING, age INT") // user given schema
    .options(tgOptions)
    .option("query.interpreted", queryBody)
    .option("query.results.extract","0:vSet")  // explicitly expand the vertex set
    .load()

df.show()
+-------+------+-------+-----+---+
|v_id   |gender|name   |state|age|
+-------+------+-------+-----+---+
|person1|male  |person1|fl   |1  |
|person2|female|person2|ca   |2  |
|person3|male  |person3|ny   |3  |
+-------+------+-------+-----+---+

Connector Options

Common Options

Beginning with version 0.2.0 of the connector, the following connector options are required: version, log.level, and log.file.

Key Default Value Description Group

url

(none)

The connection URL to TigerGraph cluster. It can be a list of URLs separated by comma for load balancing.

The port number can be retrieved by gadmin config get Nginx.Port. For TGCloud users, the default port number is 443.

Example: http(s)://192.168.1.1:14240, http(s)://192.168.1.2:14240, http(s)://192.168.1.3:14240

General

version

(none)

TigerGraph version, e.g., "3.10.1", "4.1.0".

General

graph

(none)

The graph name.

General

username

(none)

The GSQL username.

Authentication

(The username/password pair or secret based authentication is more preferred than a fixed token, as it can generate and refresh token automatically.)

password

(none)

The GSQL password.

Authentication

secret

(none)

The GSQL secret, which is recommended for TGCloud users.

Authentication

token

(none)

The classic Bearer token or JWT(since 3.10.1) for RESTPP authentication.

Authentication

ssl.mode

basic

The SSL mode. Supported values: basic, verifyCA and verifyHostname.

When setting it to verifyCA or verifyHostname, the truststore file should be given.

SSL

ssl.truststore

(none)

Filename of the truststore which stores the SSL certificate chains.

Add --files /path/to/trust.jks when submitting the Spark job.

SSL

ssl.truststore.type

JKS

Truststore type, e.g., JKS, PKCS12

SSL

ssl.truststore.password

(none)

Password of the truststore.

SSL

io.connect.timeout.ms

30000

Connect timeout in ms.

Transport Timeout

io.read.timeout.ms

60000

Socket read timeout in ms.

Transport Timeout

io.retry.interval.ms

5000

The initial retry interval for transport timeout.

Transport Timeout

io.max.retry.interval.ms

10000

The maximum retry interval for transport timeout.

Transport Timeout

io.max.retry.attempts

5

The maximum retry attempts for transport timeout.

Transport Timeout

log.level

(none)

The log level of the default logger. Available values: "0", "1", "2" and "3", which represent "ERROR", "WARN", "INFO" and "DEBUG".

The Spark logging configurations(log4j) will be omitted if this option is set.

The log will be printed to stderr unless log.file is set.

Logging

log.file

(none)

The log file name pattern, e.g., "/tmp/tigergraph-spark-connector.log". It requires setting log.level first.

Logging

Query Options

Key Default Value Description Group

query.timeout.ms

16000

By default, an HTTP request in the TigerGraph system times out after 16 seconds. You can customize this timeout limit for a particular query instance by this option.

Equivalent to RESTPP request header GSQL-TIMEOUT.

Query

query.max.response.bytes

(none)

Specify the response size limit of a query request.

Equivalent to RESTPP request header RESPONSE-LIMIT.

Query