Skip to content

Commit

Permalink
feat(bigquery): use storage api for query jobs (#6822)
Browse files Browse the repository at this point in the history
Initial work on using the Storage API for fetching results of a query. This is more efficient because it can download data in parallel by splitting the read session and using Arrow as a more efficient format. The API surface for users stay the same, with them being able to transform query results into user defined structs. Under the hood the library will take care of converting data represented in Arrow to the user defined struct.

One thing to note is that this introduces the first external dependency on the Apache Arrow Go library.

Initially we are gonna use it as an experimental feature and explicit ask users to create a `bqStorage.BigQueryReadClient`. 

Proposed by issue https://togithub.com/googleapis/google-cloud-go/issues/3880 and work on the Python library https://medium.com/google-cloud/announcing-google-cloud-bigquery-version-1-17-0-1fc428512171
  • Loading branch information
alvarowolfx authored Jan 23, 2023
1 parent fbe1bd4 commit 26c04f4
Show file tree
Hide file tree
Showing 18 changed files with 1,695 additions and 19 deletions.
221 changes: 221 additions & 0 deletions bigquery/arrow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bigquery

import (
"bytes"
"encoding/base64"
"errors"
"fmt"
"math/big"

"cloud.google.com/go/civil"
"github.com/apache/arrow/go/v10/arrow"
"github.com/apache/arrow/go/v10/arrow/array"
"github.com/apache/arrow/go/v10/arrow/ipc"
)

type arrowDecoder struct {
tableSchema Schema
rawArrowSchema []byte
arrowSchema *arrow.Schema
}

func newArrowDecoderFromSession(session *readSession, schema Schema) (*arrowDecoder, error) {
bqSession := session.bqSession
if bqSession == nil {
return nil, errors.New("read session not initialized")
}
arrowSerializedSchema := bqSession.GetArrowSchema().GetSerializedSchema()
buf := bytes.NewBuffer(arrowSerializedSchema)
r, err := ipc.NewReader(buf)
if err != nil {
return nil, err
}
defer r.Release()
p := &arrowDecoder{
tableSchema: schema,
rawArrowSchema: arrowSerializedSchema,
arrowSchema: r.Schema(),
}
return p, nil
}

func (ap *arrowDecoder) createIPCReaderForBatch(serializedArrowRecordBatch []byte) (*ipc.Reader, error) {
buf := bytes.NewBuffer(ap.rawArrowSchema)
buf.Write(serializedArrowRecordBatch)
return ipc.NewReader(buf, ipc.WithSchema(ap.arrowSchema))
}

// decodeArrowRecords decodes BQ ArrowRecordBatch into rows of []Value.
func (ap *arrowDecoder) decodeArrowRecords(serializedArrowRecordBatch []byte) ([][]Value, error) {
r, err := ap.createIPCReaderForBatch(serializedArrowRecordBatch)
if err != nil {
return nil, err
}
defer r.Release()
rs := make([][]Value, 0)
for r.Next() {
rec := r.Record()
values, err := ap.convertArrowRecordValue(rec)
if err != nil {
return nil, err
}
rs = append(rs, values...)
}
return rs, nil
}

// decodeRetainedArrowRecords decodes BQ ArrowRecordBatch into a list of retained arrow.Record.
func (ap *arrowDecoder) decodeRetainedArrowRecords(serializedArrowRecordBatch []byte) ([]arrow.Record, error) {
r, err := ap.createIPCReaderForBatch(serializedArrowRecordBatch)
if err != nil {
return nil, err
}
defer r.Release()
records := []arrow.Record{}
for r.Next() {
rec := r.Record()
rec.Retain()
records = append(records, rec)
}
return records, nil
}

// convertArrowRows converts an arrow.Record into a series of Value slices.
func (ap *arrowDecoder) convertArrowRecordValue(record arrow.Record) ([][]Value, error) {
rs := make([][]Value, record.NumRows())
for i := range rs {
rs[i] = make([]Value, record.NumCols())
}
for j, col := range record.Columns() {
fs := ap.tableSchema[j]
ft := ap.arrowSchema.Field(j).Type
for i := 0; i < col.Len(); i++ {
v, err := convertArrowValue(col, i, ft, fs)
if err != nil {
return nil, fmt.Errorf("found arrow type %s, but could not convert value: %v", ap.arrowSchema.Field(j).Type, err)
}
rs[i][j] = v
}
}
return rs, nil
}

// convertArrow gets row value in the given column and converts to a Value.
// Arrow is a colunar storage, so we navigate first by column and get the row value.
// More details on conversions can be seen here: https://cloud.google.com/bigquery/docs/reference/storage#arrow_schema_details
func convertArrowValue(col arrow.Array, i int, ft arrow.DataType, fs *FieldSchema) (Value, error) {
if !col.IsValid(i) {
return nil, nil
}
switch ft.(type) {
case *arrow.BooleanType:
v := col.(*array.Boolean).Value(i)
return convertBasicType(fmt.Sprintf("%v", v), fs.Type)
case *arrow.Int8Type:
v := col.(*array.Int8).Value(i)
return convertBasicType(fmt.Sprintf("%v", v), fs.Type)
case *arrow.Int16Type:
v := col.(*array.Int16).Value(i)
return convertBasicType(fmt.Sprintf("%v", v), fs.Type)
case *arrow.Int32Type:
v := col.(*array.Int32).Value(i)
return convertBasicType(fmt.Sprintf("%v", v), fs.Type)
case *arrow.Int64Type:
v := col.(*array.Int64).Value(i)
return convertBasicType(fmt.Sprintf("%v", v), fs.Type)
case *arrow.Float16Type:
v := col.(*array.Float16).Value(i)
return convertBasicType(fmt.Sprintf("%v", v.Float32()), fs.Type)
case *arrow.Float32Type:
v := col.(*array.Float32).Value(i)
return convertBasicType(fmt.Sprintf("%v", v), fs.Type)
case *arrow.Float64Type:
v := col.(*array.Float64).Value(i)
return convertBasicType(fmt.Sprintf("%v", v), fs.Type)
case *arrow.BinaryType:
v := col.(*array.Binary).Value(i)
encoded := base64.StdEncoding.EncodeToString(v)
return convertBasicType(encoded, fs.Type)
case *arrow.StringType:
v := col.(*array.String).Value(i)
return convertBasicType(v, fs.Type)
case *arrow.Date32Type:
v := col.(*array.Date32).Value(i)
return convertBasicType(v.FormattedString(), fs.Type)
case *arrow.Date64Type:
v := col.(*array.Date64).Value(i)
return convertBasicType(v.FormattedString(), fs.Type)
case *arrow.TimestampType:
v := col.(*array.Timestamp).Value(i)
dft := ft.(*arrow.TimestampType)
t := v.ToTime(dft.Unit)
if dft.TimeZone == "" { // Datetime
return Value(civil.DateTimeOf(t)), nil
}
return Value(t.UTC()), nil // Timestamp
case *arrow.Time32Type:
v := col.(*array.Time32).Value(i)
return convertBasicType(v.FormattedString(arrow.Microsecond), fs.Type)
case *arrow.Time64Type:
v := col.(*array.Time64).Value(i)
return convertBasicType(v.FormattedString(arrow.Microsecond), fs.Type)
case *arrow.Decimal128Type:
dft := ft.(*arrow.Decimal128Type)
v := col.(*array.Decimal128).Value(i)
rat := big.NewRat(1, 1)
rat.Num().SetBytes(v.BigInt().Bytes())
d := rat.Denom()
d.Exp(big.NewInt(10), big.NewInt(int64(dft.Scale)), nil)
return Value(rat), nil
case *arrow.Decimal256Type:
dft := ft.(*arrow.Decimal256Type)
v := col.(*array.Decimal256).Value(i)
rat := big.NewRat(1, 1)
rat.Num().SetBytes(v.BigInt().Bytes())
d := rat.Denom()
d.Exp(big.NewInt(10), big.NewInt(int64(dft.Scale)), nil)
return Value(rat), nil
case *arrow.ListType:
arr := col.(*array.List)
dft := ft.(*arrow.ListType)
values := []Value{}
start, end := arr.ValueOffsets(i)
slice := array.NewSlice(arr.ListValues(), start, end)
for j := 0; j < slice.Len(); j++ {
v, err := convertArrowValue(slice, j, dft.Elem(), fs)
if err != nil {
return nil, err
}
values = append(values, v)
}
return values, nil
case *arrow.StructType:
arr := col.(*array.Struct)
nestedValues := []Value{}
fields := ft.(*arrow.StructType).Fields()
for fIndex, f := range fields {
v, err := convertArrowValue(arr.Field(fIndex), i, f.Type, fs.Schema[fIndex])
if err != nil {
return nil, err
}
nestedValues = append(nestedValues, v)
}
return nestedValues, nil
default:
return nil, fmt.Errorf("unknown arrow type: %v", ft)
}
}
22 changes: 22 additions & 0 deletions bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Client struct {

projectID string
bqs *bq.Service
rc *readClient
}

// DetectProjectID is a sentinel value that instructs NewClient to detect the
Expand Down Expand Up @@ -97,6 +98,21 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
return c, nil
}

// EnableStorageReadClient sets up Storage API connection to be used when fetching
// large datasets from tables, jobs or queries.
// Calling this method twice will return an error.
func (c *Client) EnableStorageReadClient(ctx context.Context, opts ...option.ClientOption) error {
if c.rc != nil {
return fmt.Errorf("failed: storage read client already set up")
}
rc, err := newReadClient(ctx, c.projectID, opts...)
if err != nil {
return err
}
c.rc = rc
return nil
}

// Project returns the project ID or number for this instance of the client, which may have
// either been explicitly specified or autodetected.
func (c *Client) Project() string {
Expand All @@ -107,6 +123,12 @@ func (c *Client) Project() string {
// Close should be called when the client is no longer needed.
// It need not be called at program exit.
func (c *Client) Close() error {
if c.rc != nil {
err := c.rc.close()
if err != nil {
return err
}
}
return nil
}

Expand Down
23 changes: 23 additions & 0 deletions bigquery/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,29 @@ func ExampleQuery_Read() {
_ = it // TODO: iterate using Next or iterator.Pager.
}

func ExampleQuery_Read_accelerated() {
ctx := context.Background()
client, err := bigquery.NewClient(ctx, "project-id")
if err != nil {
// TODO: Handle error.
}

// Enable Storage API usage for fetching data
err = client.EnableStorageReadClient(ctx)
if err != nil {
// TODO: Handle error.
}

sql := fmt.Sprintf(`SELECT name, number, state FROM %s WHERE state = "CA"`, `bigquery-public-data.usa_names.usa_1910_current`)
q := client.Query(sql)
it, err := q.Read(ctx)
if err != nil {
// TODO: Handle error.
}

_ = it // TODO: iterate using Next or iterator.Pager.
}

func ExampleRowIterator_Next() {
ctx := context.Background()
client, err := bigquery.NewClient(ctx, "project-id")
Expand Down
17 changes: 16 additions & 1 deletion bigquery/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
cloud.google.com/go/datacatalog v1.8.1
cloud.google.com/go/iam v0.8.0
cloud.google.com/go/storage v1.28.1
github.com/apache/arrow/go/v10 v10.0.1
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
github.com/googleapis/gax-go/v2 v2.7.0
Expand All @@ -22,13 +23,27 @@ require (
require (
cloud.google.com/go/compute v1.14.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/goccy/go-json v0.9.11 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v2.0.8+incompatible // indirect
github.com/google/martian/v3 v3.2.1 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.1 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.0.0-20221014081412-f15817d10f9b // indirect
golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 // indirect
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect
golang.org/x/text v0.5.0 // indirect
golang.org/x/tools v0.1.12 // indirect
google.golang.org/appengine v1.6.7 // indirect
)
Loading

0 comments on commit 26c04f4

Please sign in to comment.