Read to Spark Dataframe
The TigerGraph Spark Connector is used with Apache Spark to read data from TigerGraph.
Supported reader types:
-
Read Vertices
-
Read Edges
-
Run Pre-installed Query
-
Run Interpreted Query
The legacy Spark Connection Via JDBC Driver will be deprecated. Please migrate to this new connector. |
Prerequisite
Compatibility
-
TigerGraph 3.6.0 or higher.
-
Spark 3.2 or higher with Scala 2.12 and Scala 2.13.
-
JAVA 8 or higher.
Download the JARs
This connector can be downloaded from the Maven central repository: Maven Central. It is available in three formats:
-
tigergraph-spark-connector-<version>.jar
: TheJAR
file containing only the compiled classes of the connector, which does not include any dependencies. -
tigergraph-spark-connector-<version>-jar-with-dependencies.jar
: TheJAR
file that includes compiled classes, as well as all the dependencies. -
tigergraph-spark-connector-<version>.tar.gz
: The compressedTAR
archive that includestigergraph-spark-connector-<version>.jar
and dependencies in separateJAR
files.
Beginning with version 0.2.0 of the connector, the following connector options are required: |
To use the TigerGraph Spark connector in a Spark shell:
--jars
option:spark-shell --jars tigergraph-spark-connector-<version>-jar-with-dependencies.jar
If you want to include the TigerGraph Spark Connector in your Spark installation, add the |
Configure Logging
We highly recommend setting the log level to info
to monitor the execution. There are two methods to enable logging for the connector.
Connector Default Logger
The connector has a built-in logger based on java.util.logging.
To activate it, set the option log.level
to 2, which enables info-level logging.
Please refer to Common Options to learn more about log.level
and log.path
options.
Overview
TigerGraph Spark connector provides 4 kinds of reader options to read data from TigerGraph:
Option | Description |
---|---|
|
Read vertex/vertices with the attributes. |
|
Read edge(s) with the attributes. |
|
Run a pre-installed query and read the outputs of |
|
Run a query in interpreted mode and read the outputs of |
Please refer to the following sections for detailed usage and examples. Prior to that, ensure that the connection options are defined in advance.
Database Connection
You may find it convenient to bundle the options related to making a database connection in a data object (e.g. tgOptions
) and then use .options(tgOptions)
when running the connector.
Using username/password has the advantage that this authentication method will automatically refresh an access token as needed. Placing the credentials in a data object keeps them separate for the connection commands.
If you choose to use a token, please make sure the lifetime of the token is long enough for the loading job.
val tgOptions = Map(
"url" -> "http(s)://<tg_node_host>:<tg_nginx_port>",
"version" -> "<tg_version>",
"graph" -> "<graph_name>",
"username" -> "<username>",
"password" -> "<password>"
)
Read Vertices
Read all vertices by type
Option | Value |
---|---|
|
<vertex_type> |
Person
val df = spark.read
.format("tigergraph")
.options(tgOptions)
.option("query.vertex", "Person")
.load()
df.show()
+--------+--------+---+------+-----+
| v_id| name|age|gender|state|
+--------+--------+---+------+-----+
|person42|person42| 42|female| ny|
|person12|person12| 12| male| ny|
+--------+--------+---+------+-----+
Partitioned Vertex Query
To fully utilize the parallel and distributed processing capabilities of Spark and TigerGraph, it is recommended to use partitioned queries. This approach involves splitting a query into multiple range queries that are executed in parallel.
Option | Value |
---|---|
|
The name of the attribute that has been indexed. |
|
The expected partition number. |
|
The upper bound used to calculate the partition stride. |
|
The lower bound used to calculate the partition stride. |
Notice that |
val df = spark.read
.format("tigergraph")
.options(tgOptions)
.option("query.vertex", "Person")
.option("query.partition.key", "age")
.option("query.partition.num", "5")
.option("query.partition.upper.bound", "50")
.option("query.partition.lower.bound", "20")
.load()
df.show()
This query will generate 5 separate range queries:
-
age < 20
-
20 ≤ age < 30
-
30 ≤ age < 40
-
40 ≤ age < 50
-
50 ≤ age
Read a vertex by primary ID
Option | Value |
---|---|
|
<vertex_type>.<vertex_id> |
Person
and primary ID person2
val df = spark.read
.format("tigergraph")
.options(tgOptions)
.option("query.vertex", "Person.person2")
.load()
df.show()
+--------+--------+---+------+-----+
| v_id| name|age|gender|state|
+--------+--------+---+------+-----+
| person2| person2| 3| male| tx|
+--------+--------+---+------+-----+
Read Edges
Read all edges of a vertex
Option | Value |
---|---|
|
<src_vertex_type>.<src_vertex_id> |
Person
, ID person1
val df = spark.read
.format("tigergraph")
.options(tgOptions)
.option("query.edge", "Person.person1")
.load()
df.show()
+---------+-------+-------+-------+-------+--------------------+
|from_type|from_id|to_type| to_id| e_type| attributes|
+---------+-------+-------+-------+-------+--------------------+
| Person|person1| Photo|photo16| Likes|{"click_time":"20...|
| Person|person1| Short| short1| Likes|{"click_time":"20...|
| Person|person1| Short| short1|Watches| {"share":true}|
| Person|person1| Short|short19|Watches| {"share":true}|
+---------+-------+-------+-------+-------+--------------------+
The results can contain different types of edges, so the edge attributes won’t be flattened.
Read edges of a vertex by edge type
Option | Value |
---|---|
|
<src_vertex_type>.<src_vertex_id>.<edge_type> |
Likes
from vertex type Person
, ID person1
val df = spark.read
.format("tigergraph")
.options(tgOptions)
.option("query.edge", "Person.person1.Likes")
.load()
df.show()
+---------+-------+-------+-------+-------------------+
|from_type|from_id|to_type| to_id| click_time|
+---------+-------+-------+-------+-------------------+
| Person|person1| Photo|photo16|2024-01-22 03:16:45|
| Person|person1| Short| short1|2019-07-15 09:23:41|
+---------+-------+-------+-------+-------------------+
Read edges of a vertex by edge type and target vertex type
Option | Value |
---|---|
|
<src_vertex_type>.<src_vertex_id>.<edge_type>.<tgt_vertex_type> |
Likes
from vertex type Person
, ID person1
to vertex type Photo
val df = spark.read
.format("tigergraph")
.options(tgOptions)
.option("query.edge", "Person.person1.Likes.Photo")
.load()
df.show()
+---------+-------+-------+-------+-------------------+
|from_type|from_id|to_type| to_id| click_time|
+---------+-------+-------+-------+-------------------+
| Person|person1| Photo|photo16|2024-01-22 03:16:45|
| Person|person1| Photo| photo7|2021-05-09 17:58:00|
+---------+-------+-------+-------+-------------------+
Read an edge by source vertex, target vertex, and edge type
Option | Value |
---|---|
|
<src_vertex_type>.<src_vertex_id>.<edge_type>.<tgt_vertex_type>.<tgt_vertex_id> |
Likes
from vertex type Person
and ID person1
, to vertex type Photo
and ID photo7
val df = spark.read
.format("tigergraph")
.options(tgOptions)
.option("query.edge", "Person.person1.Likes.Photo.photo7")
.load()
df.show()
+---------+-------+-------+------+-------------------+
|from_type|from_id|to_type| to_id| click_time|
+---------+-------+-------+------+-------------------+
| Person|person1| Photo|photo7|2021-05-09 17:58:00|
+---------+-------+-------+------+-------------------+
By default, we use "." to split the query fields. If the vertex ID contains ".", you can set a different separator to avoid the conflict:
|
Query Operators
For the built-in vertex and edge queries mentioned above, there are several available query operators that can be used in combination.:
Option | Value | Example |
---|---|---|
|
Attributes of the selected vertices/edges to return separated by comma. |
|
|
Conditions used to filter the vertices/edges to return. The parameter takes a list of conditions, which is a string of comma-separated values. |
|
|
An integer value that specifies the maximum limit of the total number of vertices/edges to return. NOTE: When used with partitioned queries, the limit applies to each partition. |
|
|
Attributes to sort the results by. The parameter takes a list, which is a string of comma-separated values, and will sort all the vertices/edges based on the attributes provided in the list in order. NOTE: When used with partitioned queries, the sort applies within each partition. |
|
val df = spark.read
.format("tigergraph")
.options(tgOptions)
.option("query.vertex", "Person")
.option("query.op.limit", "10")
.option("query.op.sort", "age,-last_name")
.load()
df.show()
Invest
, whose ratio
attribute should be greater than 0.1, and select only the timestamp
attribute:val df = spark.read
.format("tigergraph")
.options(tgOptions)
.option("query.edge", "Person.person1.Invest")
.option("query.op.filter", "ratio>0.1")
.option("query.op.select", "timestamp")
.load()
df.show()
Run a GSQL Query
Run a Pre-installed Query
Option | Value |
---|---|
|
Name of the pre-installed query. |
|
The query parameters in JSON format. Please refer to Formatting data in JSON. |
Example: run the query topK
with k=10
topK
CREATE QUERY topK (INT k) {
vSet = SELECT p FROM Person:p ORDER BY p.age LIMIT k;
PRINT vSet;
}
val df = spark.read
.format("tigergraph")
.options(tgOptions)
.option("query.installed", "topK")
.option("query.params", "{\"k\": 10}")
.load()
df.show()
Run an Interpreted Query
Option | Value |
---|---|
|
Query body of an anonymous query. |
Example: run the anonymous topK
query in interpreted mode with k=10. Because no parameter is allowed, the parameter k is set within the query.
val queryBody = """
INTERPRET QUERY () FOR GRAPH gsql_demo {
INT k=10;
vSet = SELECT p FROM Person:p ORDER BY p.age LIMIT k;
PRINT vSet;
}
"""
val df = spark.read
.format("tigergraph")
.options(tgOptions)
.option("query.interpreted", queryBody)
.load()
df.show()
DataFrame Schema
Schema Inference
The output format of a GSQL query is flexible, as you can include multiple PRINT
statements within a single query, with each PRINT
statement capable of printing multiple objects.
By default, the output of each PRINT
statement is converted into a row of a DataFrame:
...
ListAccum<INT> @@list_accum = [1,3,5];
PRINT @@list_accum, @@list_accum.size();
PRINT @@list_accum.get(0);
...
+------------------------------------------------+
|results |
+------------------------------------------------+
|{"@@list_accum":[1,3,5],"@@list_accum.size()":3}|
|{"@@list_accum.get(0)":1} |
+------------------------------------------------+
However, sometimes we want to convert a vertex expression set or an accumulator into a Spark DataFrame, like @@list_accum
, with each element being transformed into a row of the DataFrame.
The connector can automatically expand the vertex set or accumulator to Spark DataFrame under the following conditions:
-
It is an installed query instead of an interpreted query;
-
It contains only one PRINT statement, which only print a single object;
-
The printed object must either be a vertex expression set, or an accumulator.
You can also explicitly extract an object from multiple PRINT statements by option query.results.extract
:
Option | Value |
---|---|
|
The row index and the object key concatenated with colon. E.g.: |
+-------+
|results|
+-------+
|1 |
|3 |
|5 |
+-------+
Custom Schema
For interpreted queries and some installed queries, the schema cannot be inferred accurately, so it’s required to manually provide the schema by .schema(DDLString)
, whose column names should be the same as the corresponding JSON keys. Please check the example below:
val queryBody = """
INTERPRET QUERY () FOR GRAPH gsql_demo {
INT k=3;
vSet = SELECT p FROM Person:p ORDER BY p.age LIMIT k;
PRINT vSet;
}
"""
val df = spark.read
.format("tigergraph")
.options(tgOptions)
.option("query.interpreted", queryBody)
.option("query.results.extract","0:vSet") // explicitly expand the vertex set
.load()
df.show()
+-----------------------------------------------------------------------------------------------------------+
|results |
+-----------------------------------------------------------------------------------------------------------+
|{"v_id":"person1","attributes":{"gender":"male","name":"person1","state":"fl","age":1},"v_type":"Person"} |
|{"v_id":"person2","attributes":{"gender":"female","name":"person2","state":"ca","age":2},"v_type":"Person"}|
|{"v_id":"person3","attributes":{"gender":"male","name":"person3","state":"ny","age":3},"v_type":"Person"} |
+-----------------------------------------------------------------------------------------------------------+
val df = spark.read
.format("tigergraph")
.schema("v_id STRING, gender STRING, name STRING, state STRING, age INT") // user given schema
.options(tgOptions)
.option("query.interpreted", queryBody)
.option("query.results.extract","0:vSet") // explicitly expand the vertex set
.load()
df.show()
+-------+------+-------+-----+---+
|v_id |gender|name |state|age|
+-------+------+-------+-----+---+
|person1|male |person1|fl |1 |
|person2|female|person2|ca |2 |
|person3|male |person3|ny |3 |
+-------+------+-------+-----+---+
Connector Options
Common Options
Key | Default Value | Description | Group |
---|---|---|---|
|
(none) |
The connection URL to TigerGraph cluster. It can be a list of URLs separated by comma for load balancing. The port number can be retrieved by Example: http(s)://192.168.1.1:14240, http(s)://192.168.1.2:14240, http(s)://192.168.1.3:14240 |
General |
|
(none) |
TigerGraph version, e.g., "3.10.1", "4.1.0". |
General |
|
(none) |
The graph name. |
General |
|
(none) |
The GSQL username. |
Authentication (The username/password pair or secret based authentication is more preferred than a fixed token, as it can generate and refresh token automatically.) |
|
(none) |
The GSQL password. |
Authentication |
|
(none) |
The GSQL secret, which is recommended for TGCloud users. |
Authentication |
|
(none) |
The classic Bearer token or JWT(since 3.10.1) for RESTPP authentication. |
Authentication |
|
basic |
The SSL mode, supported values: When setting it to |
SSL |
|
(none) |
Filename of the truststore which stores the SSL certificate chains. Add |
SSL |
|
JKS |
Truststore type, e.g., JKS, PKCS12 |
SSL |
|
(none) |
Password of the truststore. |
SSL |
|
30000 |
Connect timeout in ms. |
Transport Timeout |
|
60000 |
Socket read timeout in ms. |
Transport Timeout |
|
5000 |
The initial retry interval for transport timeout. |
Transport Timeout |
|
10000 |
The maximum retry interval for transport timeout. |
Transport Timeout |
|
5 |
The maximum retry attempts for transport timeout. |
Transport Timeout |
|
(none) |
The log level of the default logger. Available values: "0", "1", "2" and "3", which represent "ERROR", "WARN", "INFO" and "DEBUG". The Spark logging configurations(log4j) will be omitted if this option is set. The log will be printed to stderr unless |
Logging |
|
(none) |
The log file name pattern, e.g., "/tmp/tigergraph-spark-connector.log". It requires setting |
Logging |
Query Options
Key | Default Value | Description | Group |
---|---|---|---|
|
16000 |
By default, an HTTP request in the TigerGraph system times out after 16 seconds. You can customize this timeout limit for a particular query instance by this option. Equivalent to RESTPP request header |
Query |
|
(none) |
Specify the response size limit of a query request. Equivalent to RESTPP request header |
Query |