Querying and Streaming from Google BigQuery

The Data Streaming Connector allows you to invoke SQL queries to your Google BigQuery dataset and stream the query results to TigerGraph’s internal Kafka server with a specified topic. You can then create and run a Kafka loading job to load data from Kafka into your graphs.

1. Prerequisites

  • You should have one of the following authentication credentials:

    • Google Service Account credentials

    • Access and refresh tokens

2. Procedure

2.1. Specify connector configurations

The connector configurations provide the following information:

  • Connector class

  • Connection URL

  • Value converter

  • SQL query statement

  • Connector properties

2.1.1. Specify connector class

The connector class indicates what type of connector the configuration file is used to create. Connector class is specified by the connector.class key.

For streaming from Google BigQuery, the class is io.confluent.connect.jdbc.JdbcSourceConnector.

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

2.1.2. Specify connection URL

To stream data from BigQuery, Data Streaming Connector uses the BigQuery JDBC Driver. The connection URL requires the domain name or IP address of the BigQuery server (https://www.googleapis.com/bigquery/v2), the port number, the ID of your BigQuery project, and your credentials for authentication. You can also supply other optional properties to the connection that are supported by the BigQuery JDBC connector:

connection.url=jdbc:bigquery://https://www.googleapis.com/bigquery/v2:<port_number>;<property1>=<value1>;<property2>=<value2>;... (1)
1 Each property-value pair is separated by a semicolon`;`.

The following is a list of required or frequently used properties:

Property Description

ProjectId

ID of your Google Cloud Project for you BigQuery server. Required.

OAuthType

Required. A number that specifies the type of authentication used by the connector:

  • 0: Authentication with Google Service Account. This requires that you supply two additional parameters:

    • OAuthServiceAcctEmail

    • OAuthPvtKeyPath

  • 2: Authentication with access token or refresh token. This requires that you supply one of the two following parameters:

    • OAuthAccessToken

    • OAuthRefreshToken

OAuthServiceAcctEmail

Google Service Account Email used to authenticate the connection. Required if you set AuthValue to 0.

OAuthPvtKeyPath

Path to the key file that is used to authenticate the service account email address. This parameter supports keys in .p12 or .json format. Required if you set AuthValue to 0.

OAuthAccessToken

Access token of your Google Service Account. See Google Cloud documentation to generate an access token. Required if you set AuthValue to 2.

OAuthRefreshToken

Refresh token of your Google Service Account. See Google Cloud documentation to generate a refresh token. Required if you set AuthValue to 2.

2.1.3. Value converter

BigQuery stores data in a proprietary columnar format. In order for TigerGraph to ingest the data, Data Streaming Connector uses a value converter to convert the data into the CSV format. To do this, add a line in the configuration file that specifies the value converter:

value.converter=com.tigergraph.kafka.connect.converters.TigerGraphCsvConverter

By default, CSV values are separated by commas. If you want to specify the separator, use the value.converter.csv.separator configuration to specify the separator. For example, the following converter configurations use | as the separator. If you change the separator, make sure to use the same separator when you create the loading job.

value.converter=com.tigergraph.kafka.connect.converters.TigerGraphCsvConverter
value.converter.csv.separator=|

2.1.4. Subsection configurations

Subsection configurations are specified after the overall connector configurations. Each set of subsection configurations creates an instance of a connector by specifying a connector name and a Kafka topic that the connector streams messages to.

When streaming from BigQuery, each subsection also must have a SQL query statement used to query your BigQuery warehouse. The results of the query are streamed to the corresponding topic.

If you are querying STRUCT data or arrays, see Querying STRUCT data and arrays. BYTES type values are automatically converted to UTF-8 strings.

For example, the following subsection configurations create two instances of the connector, each streaming the results of their corresponding queries to topic_0 and topic_1:

[bq_query_0]
name="bq_query_0"
topic="topic_0"
query="SELECT * FROM `bigquery-public-data.breathe.arxiv` LIMIT 1000"

[bq_query_1]
name="bq_query_1"
topic="topic_1"
query="SELECT doi,abstract,authors FROM `bigquery-public-data.breathe.nature`"

include::partial$create-connector.adoc[

2.2. Create data source

Now that the connector has started loading data into TigerGraph’s internal Kafka cluster, you can create a data source and point it to the Kafka cluster:

  1. Create a data source configuration file. The only required field is the address of the broker. The broker’s IP and hostname is the private IP of your TigerGraph server and port of TigerGraph’s internal Kafka server:

    • If you don’t know your private IP, you can retrieve it by running gmyip in the bash terminal.

    • To retrieve the port of your Kafka cluster, run gadmin config get Kafka.Port to retrieve the port number. The default port is 30002.

    In the kafka.config field, you can provide additional configurations from librdkafka Global Configurations. For example, you can specify group.id to be tigergraph to specify the group that this consumer belongs to:

    {
        "broker":"10.128.0.240:30002", (1)
        "kafka_config":
            {
                "session.timeout.ms":"20000"
            }
    }
    1 Make sure to use the internal ID of the machine instead of localhost for the hostname of the broker. localhost only works for single-server instances.
  2. Run CREATE DATA SOURCE to create the data source and assign the configuration file to the data source you just created:

    CREATE DATA_SOURCE KAFKA <datasource_name> FOR GRAPH <graph_name>
    SET <datasource_name> = <path_to_datasource_config>

    For example:

    CREATE DATA_SOURCE KAFKA k1 FOR GRAPH Social
    SET k1 = "/home/tigergraph/social/k1.config"

2.3. Create loading job

Create a loading job to load data from the data source:

  1. Create a topic-partition configuration for each topic.

    {
      "topic": <topic_name>, (1)
      "partition_list": [ (2)
        {
          "start_offset": <offset_value>, (3)
          "partition": <partition_number> (4)
        },
        {
          "start_offset": <offset_value>, (3)
          "partition": <partition_number> (4)
        }
        ...
      ]
    }
    1 Replace <topic_name> with the name of the topic this configuration applies to. This must be one of the topics you configured in the connector configuration step.
    2 List of partitions you want to stream from. For each partition, you can set a start offset. If this list is empty, or partition_list isn’t included, all partitions are used with the default offset.
    3 Replace <offset_value> with the offset value you want. The default offset for loading is -1, which means you will load data from the most recent message in the topic. If you want to load from the beginning of a topic, the start_offset value should be -2.
    4 Replace <partition_number> with the partition number if you want to configure.
  2. Create a loading job and map data to graph. See Kafka loader guide for how to map data from a Kafka data source to the graph. See Loading JSON data on how to create a loading job for JSON data.

Known bug: to use the -1 value for offset, delete the start_offset key instead of setting it to -1.

2.4. Run loading job

Run the loading job created in the last step will load the streamed data into the graph. If you make changes to the topic-partition configuration file, you can overwrite the values for the filename variables with the USING clause.

GSQL > RUN LOADING JOB load_person

By default, loading jobs that use Kafka data sources run in streaming mode and do not stop until manually aborted. As data is streamed from the data source, the running loading job will continuously ingest the streamed data into the graph store.

3. Querying STRUCT data and arrays

If the record being queried in your BigQuery warehouse contains STRUCT data or arrays, conversion functions need to be applied to the SQL statement:

3.1. STRUCT data

When retrieving struct data, it is recommended to retrieve the fields of the data directly. For example:

SELECT basic.age, basic.gender FROM `project.dataset.table`

If you want to retrieve the entire STRUCT, you need to first use TO_JSON_STRING() to convert the STRUCT data into JSON strings in the converted CSV streams.

For example, SELECT TO_JSON_STRING(col) FROM table. After converting the data into string format, flatten the JSON strings when loading the data into the graph.

3.2. Arrays

To load array values, apply the function ARRAY_TO_STRING to the columns of ARRAY type. For example, SELECT ARRAY_TO_STRING(col_arr,separator) FROM table. It is important to ensure that the separator used here is distinct from the separator in your CSV streams.

After converting the array to strings, the string representation of the arrays will be in the CSV streams. You can then load the data in CSV as a list.

4. Example

For example, suppose we have the following source table in BigQuery and the following graph schema in TigerGraph:

  • Source table

  • Graph schema

name: String basic: Struct tags: Array<String> state: String

Tom

{“age“:40, ”gender”:”male”}

[“tall“,”strong”]

ca

Dan

{“age“:35, ”gender”:”female”}

[“smart“,”blonde”]

ny

CREATE VERTEX person (PRIMARY_ID name STRING, name STRING, age INT, gender STRING, state STRING, tags LIST<STRING>)
CREATE GRAPH social (person)

The following configuration invokes the provided query on the source table and convert the result to CSV format:

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url="jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;OAuthType=2;ProjectId=tigergraph;OAuthAccessToken=xxx;OAuthRefreshToken=yyy;OAuthClientId=zzz;OAuthClientSecret=sss;LargeResultDataset=target_dataset;LargeResultTable=target_table;"
mode=bulk
value.converter=com.tigergraph.kafka.connect.converters.TigerGraphCsvConverter


[bq_query_0]
name="bq_query_0"
topic="person-demo"
query="SELECT name, basic.age, basic.gender, state, ARRAY_TO_STRING(tags,'#') FROM `project.dataset.table`"

The following is the converted CSV stream:

Tom,40,male,ca,tall#strong
Dan,35,female,ny,smart#blonde

The next step is to create the data source. The following data source configuration file can be used to create the data source:

{
    "broker":"10.128.0.240:30002", (1)
}
1 The IP address is the internal network IP address of the server running TigerGraph.

Run the following command to create the data source k1:

CREATE DATA_SOURCE KAFKA k1 FOR GRAPH social

The next step is to create the topic-partition configurations to use in the loading job. In this case, the data source is the person-demo topic in TigerGraph’s internal Kafka cluster.

  • Topic-partition configurations

  • Loading job

{
  "topic": "person-demo",
  "partition_list": [
    {
      "start_offset": -2,
      "partition": 0
    }
  ]
}
CREATE LOADING JOB load_person FOR GRAPH social {
    DEFINE FILENAME f1 = "$k1:/home/mydata/topic_person.json";
    LOAD f1 TO VERTEX person VALUES ($0, $0, $1, $2, $3,SPLIT($4,"#")) USING separator="|";
}

After the loading job has been defined, run the loading job to ingest the data into the graph.