diff --git a/bigquery/arrow.go b/bigquery/arrow.go new file mode 100644 index 000000000000..30a862e46107 --- /dev/null +++ b/bigquery/arrow.go @@ -0,0 +1,221 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bigquery + +import ( + "bytes" + "encoding/base64" + "errors" + "fmt" + "math/big" + + "cloud.google.com/go/civil" + "github.com/apache/arrow/go/v10/arrow" + "github.com/apache/arrow/go/v10/arrow/array" + "github.com/apache/arrow/go/v10/arrow/ipc" +) + +type arrowDecoder struct { + tableSchema Schema + rawArrowSchema []byte + arrowSchema *arrow.Schema +} + +func newArrowDecoderFromSession(session *readSession, schema Schema) (*arrowDecoder, error) { + bqSession := session.bqSession + if bqSession == nil { + return nil, errors.New("read session not initialized") + } + arrowSerializedSchema := bqSession.GetArrowSchema().GetSerializedSchema() + buf := bytes.NewBuffer(arrowSerializedSchema) + r, err := ipc.NewReader(buf) + if err != nil { + return nil, err + } + defer r.Release() + p := &arrowDecoder{ + tableSchema: schema, + rawArrowSchema: arrowSerializedSchema, + arrowSchema: r.Schema(), + } + return p, nil +} + +func (ap *arrowDecoder) createIPCReaderForBatch(serializedArrowRecordBatch []byte) (*ipc.Reader, error) { + buf := bytes.NewBuffer(ap.rawArrowSchema) + buf.Write(serializedArrowRecordBatch) + return ipc.NewReader(buf, ipc.WithSchema(ap.arrowSchema)) +} + +// decodeArrowRecords decodes BQ ArrowRecordBatch into rows of []Value. +func (ap *arrowDecoder) decodeArrowRecords(serializedArrowRecordBatch []byte) ([][]Value, error) { + r, err := ap.createIPCReaderForBatch(serializedArrowRecordBatch) + if err != nil { + return nil, err + } + defer r.Release() + rs := make([][]Value, 0) + for r.Next() { + rec := r.Record() + values, err := ap.convertArrowRecordValue(rec) + if err != nil { + return nil, err + } + rs = append(rs, values...) + } + return rs, nil +} + +// decodeRetainedArrowRecords decodes BQ ArrowRecordBatch into a list of retained arrow.Record. +func (ap *arrowDecoder) decodeRetainedArrowRecords(serializedArrowRecordBatch []byte) ([]arrow.Record, error) { + r, err := ap.createIPCReaderForBatch(serializedArrowRecordBatch) + if err != nil { + return nil, err + } + defer r.Release() + records := []arrow.Record{} + for r.Next() { + rec := r.Record() + rec.Retain() + records = append(records, rec) + } + return records, nil +} + +// convertArrowRows converts an arrow.Record into a series of Value slices. +func (ap *arrowDecoder) convertArrowRecordValue(record arrow.Record) ([][]Value, error) { + rs := make([][]Value, record.NumRows()) + for i := range rs { + rs[i] = make([]Value, record.NumCols()) + } + for j, col := range record.Columns() { + fs := ap.tableSchema[j] + ft := ap.arrowSchema.Field(j).Type + for i := 0; i < col.Len(); i++ { + v, err := convertArrowValue(col, i, ft, fs) + if err != nil { + return nil, fmt.Errorf("found arrow type %s, but could not convert value: %v", ap.arrowSchema.Field(j).Type, err) + } + rs[i][j] = v + } + } + return rs, nil +} + +// convertArrow gets row value in the given column and converts to a Value. +// Arrow is a colunar storage, so we navigate first by column and get the row value. +// More details on conversions can be seen here: https://cloud.google.com/bigquery/docs/reference/storage#arrow_schema_details +func convertArrowValue(col arrow.Array, i int, ft arrow.DataType, fs *FieldSchema) (Value, error) { + if !col.IsValid(i) { + return nil, nil + } + switch ft.(type) { + case *arrow.BooleanType: + v := col.(*array.Boolean).Value(i) + return convertBasicType(fmt.Sprintf("%v", v), fs.Type) + case *arrow.Int8Type: + v := col.(*array.Int8).Value(i) + return convertBasicType(fmt.Sprintf("%v", v), fs.Type) + case *arrow.Int16Type: + v := col.(*array.Int16).Value(i) + return convertBasicType(fmt.Sprintf("%v", v), fs.Type) + case *arrow.Int32Type: + v := col.(*array.Int32).Value(i) + return convertBasicType(fmt.Sprintf("%v", v), fs.Type) + case *arrow.Int64Type: + v := col.(*array.Int64).Value(i) + return convertBasicType(fmt.Sprintf("%v", v), fs.Type) + case *arrow.Float16Type: + v := col.(*array.Float16).Value(i) + return convertBasicType(fmt.Sprintf("%v", v.Float32()), fs.Type) + case *arrow.Float32Type: + v := col.(*array.Float32).Value(i) + return convertBasicType(fmt.Sprintf("%v", v), fs.Type) + case *arrow.Float64Type: + v := col.(*array.Float64).Value(i) + return convertBasicType(fmt.Sprintf("%v", v), fs.Type) + case *arrow.BinaryType: + v := col.(*array.Binary).Value(i) + encoded := base64.StdEncoding.EncodeToString(v) + return convertBasicType(encoded, fs.Type) + case *arrow.StringType: + v := col.(*array.String).Value(i) + return convertBasicType(v, fs.Type) + case *arrow.Date32Type: + v := col.(*array.Date32).Value(i) + return convertBasicType(v.FormattedString(), fs.Type) + case *arrow.Date64Type: + v := col.(*array.Date64).Value(i) + return convertBasicType(v.FormattedString(), fs.Type) + case *arrow.TimestampType: + v := col.(*array.Timestamp).Value(i) + dft := ft.(*arrow.TimestampType) + t := v.ToTime(dft.Unit) + if dft.TimeZone == "" { // Datetime + return Value(civil.DateTimeOf(t)), nil + } + return Value(t.UTC()), nil // Timestamp + case *arrow.Time32Type: + v := col.(*array.Time32).Value(i) + return convertBasicType(v.FormattedString(arrow.Microsecond), fs.Type) + case *arrow.Time64Type: + v := col.(*array.Time64).Value(i) + return convertBasicType(v.FormattedString(arrow.Microsecond), fs.Type) + case *arrow.Decimal128Type: + dft := ft.(*arrow.Decimal128Type) + v := col.(*array.Decimal128).Value(i) + rat := big.NewRat(1, 1) + rat.Num().SetBytes(v.BigInt().Bytes()) + d := rat.Denom() + d.Exp(big.NewInt(10), big.NewInt(int64(dft.Scale)), nil) + return Value(rat), nil + case *arrow.Decimal256Type: + dft := ft.(*arrow.Decimal256Type) + v := col.(*array.Decimal256).Value(i) + rat := big.NewRat(1, 1) + rat.Num().SetBytes(v.BigInt().Bytes()) + d := rat.Denom() + d.Exp(big.NewInt(10), big.NewInt(int64(dft.Scale)), nil) + return Value(rat), nil + case *arrow.ListType: + arr := col.(*array.List) + dft := ft.(*arrow.ListType) + values := []Value{} + start, end := arr.ValueOffsets(i) + slice := array.NewSlice(arr.ListValues(), start, end) + for j := 0; j < slice.Len(); j++ { + v, err := convertArrowValue(slice, j, dft.Elem(), fs) + if err != nil { + return nil, err + } + values = append(values, v) + } + return values, nil + case *arrow.StructType: + arr := col.(*array.Struct) + nestedValues := []Value{} + fields := ft.(*arrow.StructType).Fields() + for fIndex, f := range fields { + v, err := convertArrowValue(arr.Field(fIndex), i, f.Type, fs.Schema[fIndex]) + if err != nil { + return nil, err + } + nestedValues = append(nestedValues, v) + } + return nestedValues, nil + default: + return nil, fmt.Errorf("unknown arrow type: %v", ft) + } +} diff --git a/bigquery/bigquery.go b/bigquery/bigquery.go index 8bb92ba683ca..c95ec5435ba5 100644 --- a/bigquery/bigquery.go +++ b/bigquery/bigquery.go @@ -57,6 +57,7 @@ type Client struct { projectID string bqs *bq.Service + rc *readClient } // DetectProjectID is a sentinel value that instructs NewClient to detect the @@ -97,6 +98,21 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio return c, nil } +// EnableStorageReadClient sets up Storage API connection to be used when fetching +// large datasets from tables, jobs or queries. +// Calling this method twice will return an error. +func (c *Client) EnableStorageReadClient(ctx context.Context, opts ...option.ClientOption) error { + if c.rc != nil { + return fmt.Errorf("failed: storage read client already set up") + } + rc, err := newReadClient(ctx, c.projectID, opts...) + if err != nil { + return err + } + c.rc = rc + return nil +} + // Project returns the project ID or number for this instance of the client, which may have // either been explicitly specified or autodetected. func (c *Client) Project() string { @@ -107,6 +123,12 @@ func (c *Client) Project() string { // Close should be called when the client is no longer needed. // It need not be called at program exit. func (c *Client) Close() error { + if c.rc != nil { + err := c.rc.close() + if err != nil { + return err + } + } return nil } diff --git a/bigquery/examples_test.go b/bigquery/examples_test.go index 4c5681386367..db376378054d 100644 --- a/bigquery/examples_test.go +++ b/bigquery/examples_test.go @@ -162,6 +162,29 @@ func ExampleQuery_Read() { _ = it // TODO: iterate using Next or iterator.Pager. } +func ExampleQuery_Read_accelerated() { + ctx := context.Background() + client, err := bigquery.NewClient(ctx, "project-id") + if err != nil { + // TODO: Handle error. + } + + // Enable Storage API usage for fetching data + err = client.EnableStorageReadClient(ctx) + if err != nil { + // TODO: Handle error. + } + + sql := fmt.Sprintf(`SELECT name, number, state FROM %s WHERE state = "CA"`, `bigquery-public-data.usa_names.usa_1910_current`) + q := client.Query(sql) + it, err := q.Read(ctx) + if err != nil { + // TODO: Handle error. + } + + _ = it // TODO: iterate using Next or iterator.Pager. +} + func ExampleRowIterator_Next() { ctx := context.Background() client, err := bigquery.NewClient(ctx, "project-id") diff --git a/bigquery/go.mod b/bigquery/go.mod index c2243e92c957..a2a3551b5f3c 100644 --- a/bigquery/go.mod +++ b/bigquery/go.mod @@ -7,6 +7,7 @@ require ( cloud.google.com/go/datacatalog v1.8.1 cloud.google.com/go/iam v0.8.0 cloud.google.com/go/storage v1.28.1 + github.com/apache/arrow/go/v10 v10.0.1 github.com/google/go-cmp v0.5.9 github.com/google/uuid v1.3.0 github.com/googleapis/gax-go/v2 v2.7.0 @@ -22,13 +23,27 @@ require ( require ( cloud.google.com/go/compute v1.14.0 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect + github.com/andybalholm/brotli v1.0.4 // indirect + github.com/apache/thrift v0.16.0 // indirect + github.com/goccy/go-json v0.9.11 // indirect github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect github.com/golang/protobuf v1.5.2 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/google/flatbuffers v2.0.8+incompatible // indirect github.com/google/martian/v3 v3.2.1 // indirect github.com/googleapis/enterprise-certificate-proxy v0.2.1 // indirect + github.com/klauspost/asmfmt v1.3.2 // indirect + github.com/klauspost/compress v1.15.9 // indirect + github.com/klauspost/cpuid/v2 v2.0.9 // indirect + github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect + github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect + golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/net v0.0.0-20221014081412-f15817d10f9b // indirect golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect - golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 // indirect + golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect golang.org/x/text v0.5.0 // indirect + golang.org/x/tools v0.1.12 // indirect google.golang.org/appengine v1.6.7 // indirect ) diff --git a/bigquery/go.sum b/bigquery/go.sum index c37f02418faf..9ff688b69ac4 100644 --- a/bigquery/go.sum +++ b/bigquery/go.sum @@ -13,21 +13,32 @@ cloud.google.com/go/longrunning v0.3.0 h1:NjljC+FYPV3uh5/OwWT6pVU+doBqMg2x/rZlE+ cloud.google.com/go/storage v1.28.1 h1:F5QDG5ChchaAVQhINh24U99OWHURqrW8OmQcGKXcbgI= cloud.google.com/go/storage v1.28.1/go.mod h1:Qnisd4CqDdo6BGs2AD5LLnEsmSQ80wQ5ogcBBKhU86Y= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= +github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= +github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= +github.com/apache/arrow/go/v10 v10.0.1 h1:n9dERvixoC/1JjDmBcs9FPaEryoANa2sCgVFo6ez9cI= +github.com/apache/arrow/go/v10 v10.0.1/go.mod h1:YvhnlEePVnBS4+0z3fhPfUy7W1Ikj0Ih0vcRo/gZ1M0= +github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY= +github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/goccy/go-json v0.9.11 h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk= +github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -42,8 +53,11 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/flatbuffers v2.0.8+incompatible h1:ivUb1cGomAB101ZM1T0nOiWz9pSrTMoa9+EiY7igmkM= +github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -62,6 +76,19 @@ github.com/googleapis/enterprise-certificate-proxy v0.2.1 h1:RY7tHKZcRlk788d5WSo github.com/googleapis/enterprise-certificate-proxy v0.2.1/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k= github.com/googleapis/gax-go/v2 v2.7.0 h1:IcsPKeInNvYi7eqSaDjiZqDDKu5rsmunY0Y1YupQSSQ= github.com/googleapis/gax-go/v2 v2.7.0/go.mod h1:TEop28CZZQ2y+c0VxMUmu1lV+fQx57QpBWsYpwqHJx8= +github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= +github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= +github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= +github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -70,21 +97,31 @@ github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpE github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 h1:tnebWN09GYg9OLPss1KXj8txwZc6X6uMr6VFdcGNbHw= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20221014081412-f15817d10f9b h1:tvrvnPFcdzp294diPnrdZZZ8XUt2Tyj7svb7X52iDuU= @@ -101,8 +138,8 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg= -golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 h1:v6hYoSR9T5oet+pMXwUWkbiVqx/63mlHjefrHmxwfeY= +golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -113,9 +150,15 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E= google.golang.org/api v0.107.0 h1:I2SlFjD8ZWabaIFOfeEDg3pf0BHJDh6iYQ1ic3Yu/UU= google.golang.org/api v0.107.0/go.mod h1:2Ts0XTHNVWxypznxWOYUeI4g3WdP9Pk2Qk58+a/O9MY= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= @@ -152,6 +195,7 @@ google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go index 0f79caad7f22..81c1b123444c 100644 --- a/bigquery/integration_test.go +++ b/bigquery/integration_test.go @@ -54,6 +54,7 @@ var record = flag.Bool("record", false, "record RPCs") var ( client *Client + storageOptimizedClient *Client storageClient *storage.Client connectionsClient *connection.Client policyTagManagerClient *datacatalog.PolicyTagManagerClient @@ -121,6 +122,14 @@ func initIntegrationTest() func() { if err != nil { log.Fatal(err) } + storageOptimizedClient, err = NewClient(ctx, projID, option.WithHTTPClient(hc)) + if err != nil { + log.Fatal(err) + } + err = storageOptimizedClient.EnableStorageReadClient(ctx) + if err != nil { + log.Fatal(err) + } storageClient, err = storage.NewClient(ctx, option.WithHTTPClient(hc)) if err != nil { log.Fatal(err) @@ -145,6 +154,7 @@ func initIntegrationTest() func() { log.Print("replay not supported for Go versions before 1.8") } client = nil + storageOptimizedClient = nil storageClient = nil connectionsClient = nil return func() {} @@ -204,6 +214,14 @@ func initIntegrationTest() func() { if err != nil { log.Fatalf("NewClient: %v", err) } + storageOptimizedClient, err = NewClient(ctx, projID, bqOpts...) + if err != nil { + log.Fatalf("NewClient: %v", err) + } + err = storageOptimizedClient.EnableStorageReadClient(ctx) + if err != nil { + log.Fatalf("ConfigureStorageReadClient: %v", err) + } storageClient, err = storage.NewClient(ctx, sOpts...) if err != nil { log.Fatalf("storage.NewClient: %v", err) @@ -1902,12 +1920,16 @@ func TestIntegration_QuerySessionSupport(t *testing.T) { } -func TestIntegration_QueryParameters(t *testing.T) { - if client == nil { - t.Skip("Integration tests skipped") - } - ctx := context.Background() +var ( + queryParameterTestCases = []struct { + query string + parameters []QueryParameter + wantRow []Value + wantConfig interface{} + }{} +) +func initQueryParameterTestCases() { d := civil.Date{Year: 2016, Month: 3, Day: 20} tm := civil.Time{Hour: 15, Minute: 04, Second: 05, Nanosecond: 3008} rtm := tm @@ -1928,7 +1950,7 @@ func TestIntegration_QueryParameters(t *testing.T) { SubStructArray []ss } - testCases := []struct { + queryParameterTestCases = []struct { query string parameters []QueryParameter wantRow []Value @@ -2000,6 +2022,22 @@ func TestIntegration_QueryParameters(t *testing.T) { []Value{rtm}, rtm, }, + { + "SELECT @val", + []QueryParameter{ + { + Name: "val", + Value: &QueryParameterValue{ + Type: StandardSQLDataType{ + TypeKind: "JSON", + }, + Value: "{\"alpha\":\"beta\"}", + }, + }, + }, + []Value{"{\"alpha\":\"beta\"}"}, + "{\"alpha\":\"beta\"}", + }, { "SELECT @val", []QueryParameter{{Name: "val", Value: s{ts, []string{"a", "b"}, ss{"c"}, []ss{{"d"}, {"e"}}}}}, @@ -2167,7 +2205,17 @@ func TestIntegration_QueryParameters(t *testing.T) { }, }, } - for _, c := range testCases { +} + +func TestIntegration_QueryParameters(t *testing.T) { + if client == nil { + t.Skip("Integration tests skipped") + } + ctx := context.Background() + + initQueryParameterTestCases() + + for _, c := range queryParameterTestCases { q := client.Query(c.query) q.Parameters = c.parameters job, err := q.Run(ctx) diff --git a/bigquery/internal/query/order.go b/bigquery/internal/query/order.go new file mode 100644 index 000000000000..a634210c63fe --- /dev/null +++ b/bigquery/internal/query/order.go @@ -0,0 +1,67 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package query + +import ( + "strings" +) + +type qnode struct { + query string + children []*qnode +} + +// HasOrderedResults checks if a given SQL query returns ordered results. +// This function uses a naive approach of checking the root level query +// ( ignoring subqueries, functions calls, etc ) and checking +// if it contains an ORDER BY clause. +func HasOrderedResults(sql string) bool { + cleanedQuery := strings.TrimSpace(sql) + if !strings.HasPrefix(cleanedQuery, "(") { + cleanedQuery = "(" + cleanedQuery + ")" + } + root := &qnode{query: cleanedQuery, children: []*qnode{}} + curNode := root + indexStack := []int{} + nodeStack := []*qnode{} + for i, c := range cleanedQuery { + if c == '(' { + indexStack = append(indexStack, i) + nextNode := &qnode{children: []*qnode{}} + curNode.children = append(curNode.children, nextNode) + nodeStack = append(nodeStack, curNode) + curNode = nextNode + } + if c == ')' { + if len(indexStack) == 0 { + // unbalanced + return false + } + start := indexStack[len(indexStack)-1] + indexStack = indexStack[:len(indexStack)-1] + + curNode.query = cleanedQuery[start+1 : i] + + curNode = nodeStack[len(nodeStack)-1] + nodeStack = nodeStack[:len(nodeStack)-1] + } + } + curNode = root.children[0] + q := curNode.query + for _, c := range curNode.children { + q = strings.Replace(q, c.query, "", 1) + } + return strings.Contains(strings.ToUpper(q), "ORDER BY") +} diff --git a/bigquery/internal/query/order_test.go b/bigquery/internal/query/order_test.go new file mode 100644 index 000000000000..7bc555235bb2 --- /dev/null +++ b/bigquery/internal/query/order_test.go @@ -0,0 +1,51 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package query + +import "testing" + +func TestDetectOrderedResults(t *testing.T) { + testCases := []struct { + query string + isOrdered bool + }{ + {query: "SELECT 1 AS num FROM a ORDER BY num", isOrdered: true}, + {query: "SELECT 1 AS num FROM a", isOrdered: false}, + {query: "SELECT 1 AS num FROM (SELECT x FROM b ORDER BY x)", isOrdered: false}, + {query: "SELECT x AS num FROM (SELECT x FROM b ORDER BY x) a inner join b on b.x = a.x ORDER BY x", isOrdered: true}, + {query: "SELECT SUM(1) FROM a", isOrdered: false}, + {query: "SELECT COUNT(a.x) FROM a ORDER BY b", isOrdered: true}, + {query: "SELECT FOO(a.x, a.y) FROM a ORDER BY a", isOrdered: true}, + // Invalid queries + {query: "SELECT 1 AS num FROM a (", isOrdered: false}, + {query: "SELECT 1 AS num FROM a )", isOrdered: false}, + // Script statements + {query: "CALL foo()", isOrdered: false}, + {query: "DECLARE x INT64", isOrdered: false}, + {query: "SET x = 4;", isOrdered: false}, + } + + for _, tc := range testCases { + got := HasOrderedResults(tc.query) + if got != tc.isOrdered { + if tc.isOrdered { + t.Fatalf("expected query `%s` to be ordered", tc.query) + } else { + t.Fatalf("expected query `%s` to not be ordered", tc.query) + } + + } + } +} diff --git a/bigquery/iterator.go b/bigquery/iterator.go index 43ffb19dc66e..620ed16dcd75 100644 --- a/bigquery/iterator.go +++ b/bigquery/iterator.go @@ -44,6 +44,8 @@ type RowIterator struct { ctx context.Context src *rowSource + arrowIterator *arrowIterator + pageInfo *iterator.PageInfo nextFunc func() error pf pageFetcher diff --git a/bigquery/job.go b/bigquery/job.go index d761650fed89..1319e322cf46 100644 --- a/bigquery/job.go +++ b/bigquery/job.go @@ -321,16 +321,25 @@ func (j *Job) read(ctx context.Context, waitForQuery func(context.Context, strin if err != nil { return nil, err } - // Shave off some potential overhead by only retaining the minimal job representation in the iterator. - itJob := &Job{ - c: j.c, - projectID: j.projectID, - jobID: j.jobID, - location: j.location, + var it *RowIterator + if j.c.rc != nil { + it, err = newStorageRowIteratorFromJob(ctx, j) + if err != nil { + it = nil + } + } + if it == nil { + // Shave off some potential overhead by only retaining the minimal job representation in the iterator. + itJob := &Job{ + c: j.c, + projectID: j.projectID, + jobID: j.jobID, + location: j.location, + } + it = newRowIterator(ctx, &rowSource{j: itJob}, pf) + it.TotalRows = totalRows } - it := newRowIterator(ctx, &rowSource{j: itJob}, pf) it.Schema = schema - it.TotalRows = totalRows return it, nil } @@ -925,6 +934,28 @@ func (j *Job) isQuery() bool { return j.config != nil && j.config.Query != nil } +func (j *Job) isScript() bool { + return j.hasStatementType("SCRIPT") +} + +func (j *Job) isSelectQuery() bool { + return j.hasStatementType("SELECT") +} + +func (j *Job) hasStatementType(statementType string) bool { + if !j.isQuery() { + return false + } + if j.lastStatus == nil { + return false + } + queryStats, ok := j.lastStatus.Statistics.Details.(*QueryStatistics) + if !ok { + return false + } + return queryStats.StatementType == statementType +} + var stateMap = map[string]State{"PENDING": Pending, "RUNNING": Running, "DONE": Done} func (j *Job) setStatus(qs *bq.JobStatus) error { diff --git a/bigquery/query.go b/bigquery/query.go index d2d3cc15500e..95f5565aa00d 100644 --- a/bigquery/query.go +++ b/bigquery/query.go @@ -149,6 +149,9 @@ type QueryConfig struct { // Experimental: this option is experimental and may be modified or removed in future versions, // regardless of any other documented package stability guarantees. JobTimeout time.Duration + + // Force usage of Storage API if client is available. For test scenarios + forceStorageAPI bool } func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) { @@ -382,11 +385,13 @@ func (q *Query) Read(ctx context.Context) (it *RowIterator, err error) { } return job.Read(ctx) } + // we have a config, run on fastPath. resp, err := q.client.runQuery(ctx, queryRequest) if err != nil { return nil, err } + // construct a minimal job for backing the row iterator. minimalJob := &Job{ c: q.client, @@ -394,7 +399,20 @@ func (q *Query) Read(ctx context.Context) (it *RowIterator, err error) { location: resp.JobReference.Location, projectID: resp.JobReference.ProjectId, } + if resp.JobComplete { + // If more pages are available, discard and use the Storage API instead + if resp.PageToken != "" && q.client.rc != nil { + // Needed to fetch destination table + job, err := q.client.JobFromID(ctx, resp.JobReference.JobId) + if err != nil { + return nil, err + } + it, err = newStorageRowIteratorFromJob(ctx, job) + if err == nil { + return it, nil + } + } rowSource := &rowSource{ j: minimalJob, // RowIterator can precache results from the iterator to save a lookup. @@ -421,6 +439,9 @@ func (q *Query) Read(ctx context.Context) (it *RowIterator, err error) { // user's Query configuration. If all the options set on the job are supported on the // faster query path, this method returns a QueryRequest suitable for execution. func (q *Query) probeFastPath() (*bq.QueryRequest, error) { + if q.forceStorageAPI && q.client.rc != nil { + return nil, fmt.Errorf("force Storage API usage") + } // This is a denylist of settings which prevent us from composing an equivalent // bq.QueryRequest due to differences between configuration parameters accepted // by jobs.insert vs jobs.query. diff --git a/bigquery/query_test.go b/bigquery/query_test.go index 98b96a88c568..39d121b8f253 100644 --- a/bigquery/query_test.go +++ b/bigquery/query_test.go @@ -401,6 +401,7 @@ func TestQuery(t *testing.T) { diff := testutil.Diff(jc.(*QueryConfig), &wantConfig, cmp.Comparer(tableEqual), cmp.Comparer(externalDataEqual), + cmp.AllowUnexported(QueryConfig{}), ) if diff != "" { t.Errorf("#%d: (got=-, want=+:\n%s", i, diff) diff --git a/bigquery/storage_bench_test.go b/bigquery/storage_bench_test.go new file mode 100644 index 000000000000..690a561f02fc --- /dev/null +++ b/bigquery/storage_bench_test.go @@ -0,0 +1,105 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bigquery + +import ( + "context" + "fmt" + "testing" + + "google.golang.org/api/iterator" +) + +func BenchmarkIntegration_StorageReadQuery(b *testing.B) { + if storageOptimizedClient == nil { + b.Skip("Integration tests skipped") + } + ctx := context.Background() + table := "`bigquery-public-data.usa_names.usa_1910_current`" + benchCases := []struct { + name string + filter string + }{ + {name: "usa_1910_current_full", filter: ""}, + {name: "usa_1910_current_state_eq_fl", filter: "where state = \"FL\""}, + {name: "usa_1910_current_state_eq_ca", filter: "where state = \"CA\""}, + {name: "usa_1910_current_full_ordered", filter: "order by name"}, + } + + type S struct { + Name string + Number int + State string + Nested struct { + Name string + N int + } + } + + for _, bc := range benchCases { + sql := fmt.Sprintf(`SELECT name, number, state, STRUCT(name as name, number as n) as nested FROM %s %s`, table, bc.filter) + for _, maxStreamCount := range []int{0, 1} { + storageOptimizedClient.rc.settings.maxStreamCount = maxStreamCount + b.Run(fmt.Sprintf("storage_api_%d_max_streams_%s", maxStreamCount, bc.name), func(b *testing.B) { + for i := 0; i < b.N; i++ { + q := storageOptimizedClient.Query(sql) + q.forceStorageAPI = true + it, err := q.Read(ctx) + if err != nil { + b.Fatal(err) + } + if !it.IsAccelerated() { + b.Fatal("expected query execution to be accelerated") + } + for { + var s S + err := it.Next(&s) + if err == iterator.Done { + break + } + if err != nil { + b.Fatalf("failed to fetch via storage API: %v", err) + } + } + b.ReportMetric(float64(it.TotalRows), "rows") + bqSession := it.arrowIterator.session.bqSession + b.ReportMetric(float64(len(bqSession.Streams)), "parallel_streams") + b.ReportMetric(float64(maxStreamCount), "max_streams") + } + }) + } + b.Run(fmt.Sprintf("rest_api_%s", bc.name), func(b *testing.B) { + for i := 0; i < b.N; i++ { + q := client.Query(sql) + it, err := q.Read(ctx) + if err != nil { + b.Fatal(err) + } + for { + var s S + err := it.Next(&s) + if err == iterator.Done { + break + } + if err != nil { + b.Fatalf("failed to fetch via query API: %v", err) + } + } + b.ReportMetric(float64(it.TotalRows), "rows") + b.ReportMetric(1, "parallel_streams") + } + }) + } +} diff --git a/bigquery/storage_client.go b/bigquery/storage_client.go new file mode 100644 index 000000000000..aa04e2100883 --- /dev/null +++ b/bigquery/storage_client.go @@ -0,0 +1,168 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bigquery + +import ( + "context" + "fmt" + "runtime" + + "cloud.google.com/go/bigquery/internal" + storage "cloud.google.com/go/bigquery/storage/apiv1" + "cloud.google.com/go/bigquery/storage/apiv1/storagepb" + "cloud.google.com/go/internal/detect" + gax "github.com/googleapis/gax-go/v2" + "google.golang.org/api/option" + "google.golang.org/grpc" +) + +// readClient is a managed BigQuery Storage read client scoped to a single project. +type readClient struct { + rawClient *storage.BigQueryReadClient + projectID string + + settings readClientSettings +} + +type readClientSettings struct { + maxStreamCount int + maxWorkerCount int +} + +func defaultReadClientSettings() readClientSettings { + maxWorkerCount := runtime.GOMAXPROCS(0) + return readClientSettings{ + // with zero, the server will provide a value of streams so as to produce reasonable throughput + maxStreamCount: 0, + maxWorkerCount: maxWorkerCount, + } +} + +// newReadClient instantiates a new storage read client. +func newReadClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *readClient, err error) { + numConns := runtime.GOMAXPROCS(0) + if numConns > 4 { + numConns = 4 + } + o := []option.ClientOption{ + option.WithGRPCConnectionPool(numConns), + option.WithUserAgent(fmt.Sprintf("%s/%s", userAgentPrefix, internal.Version)), + } + o = append(o, opts...) + + rawClient, err := storage.NewBigQueryReadClient(ctx, o...) + if err != nil { + return nil, err + } + rawClient.SetGoogleClientInfo("gccl", internal.Version) + + // Handle project autodetection. + projectID, err = detect.ProjectID(ctx, projectID, "", opts...) + if err != nil { + return nil, err + } + + settings := defaultReadClientSettings() + rc := &readClient{ + rawClient: rawClient, + projectID: projectID, + settings: settings, + } + + return rc, nil +} + +// close releases resources held by the client. +func (c *readClient) close() error { + if c.rawClient == nil { + return fmt.Errorf("already closed") + } + c.rawClient.Close() + c.rawClient = nil + return nil +} + +// sessionForTable establishes a new session to fetch from a table using the Storage API +func (c *readClient) sessionForTable(ctx context.Context, table *Table, ordered bool) (*readSession, error) { + tableID, err := table.Identifier(StorageAPIResourceID) + if err != nil { + return nil, err + } + + // copy settings for a given session, to avoid overrides for all sessions + settings := c.settings + if ordered { + settings.maxStreamCount = 1 + } + + rs := &readSession{ + ctx: ctx, + table: table, + tableID: tableID, + settings: settings, + readRowsFunc: c.rawClient.ReadRows, + createReadSessionFunc: c.rawClient.CreateReadSession, + } + return rs, nil +} + +// ReadSession is the abstraction over a storage API read session. +type readSession struct { + settings readClientSettings + + ctx context.Context + table *Table + tableID string + + bqSession *storagepb.ReadSession + + // decouple from readClient to enable testing + createReadSessionFunc func(context.Context, *storagepb.CreateReadSessionRequest, ...gax.CallOption) (*storagepb.ReadSession, error) + readRowsFunc func(context.Context, *storagepb.ReadRowsRequest, ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) +} + +// Start initiates a read session +func (rs *readSession) start() error { + createReadSessionRequest := &storagepb.CreateReadSessionRequest{ + Parent: fmt.Sprintf("projects/%s", rs.table.ProjectID), + ReadSession: &storagepb.ReadSession{ + Table: rs.tableID, + DataFormat: storagepb.DataFormat_ARROW, + }, + MaxStreamCount: int32(rs.settings.maxStreamCount), + } + rpcOpts := gax.WithGRPCOptions( + // Read API can send batches up to 128MB + // https://cloud.google.com/bigquery/quotas#storage-limits + grpc.MaxCallRecvMsgSize(1024 * 1024 * 129), + ) + session, err := rs.createReadSessionFunc(rs.ctx, createReadSessionRequest, rpcOpts) + if err != nil { + return err + } + rs.bqSession = session + return nil +} + +// readRows returns a more direct iterators to the underlying Storage API row stream. +func (rs *readSession) readRows(req *storagepb.ReadRowsRequest) (storagepb.BigQueryRead_ReadRowsClient, error) { + if rs.bqSession == nil { + err := rs.start() + if err != nil { + return nil, err + } + } + return rs.readRowsFunc(rs.ctx, req) +} diff --git a/bigquery/storage_integration_test.go b/bigquery/storage_integration_test.go new file mode 100644 index 000000000000..6def5d51b03c --- /dev/null +++ b/bigquery/storage_integration_test.go @@ -0,0 +1,386 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bigquery + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "cloud.google.com/go/internal/testutil" + "google.golang.org/api/iterator" +) + +func TestIntegration_StorageReadBasicTypes(t *testing.T) { + if client == nil { + t.Skip("Integration tests skipped") + } + ctx := context.Background() + + initQueryParameterTestCases() + + for _, c := range queryParameterTestCases { + q := storageOptimizedClient.Query(c.query) + q.Parameters = c.parameters + q.forceStorageAPI = true + it, err := q.Read(ctx) + if err != nil { + t.Fatal(err) + } + err = checkIteratorRead(it, c.wantRow) + if err != nil { + t.Fatalf("error on query `%s`[%v]: %v", c.query, c.parameters, err) + } + if !it.IsAccelerated() { + t.Fatal("expected storage api to be used") + } + } +} + +func TestIntegration_StorageReadEmptyResultSet(t *testing.T) { + if client == nil { + t.Skip("Integration tests skipped") + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + table := storageOptimizedClient.Dataset(dataset.DatasetID).Table(tableIDs.New()) + err := table.Create(ctx, &TableMetadata{ + Schema: Schema{ + {Name: "name", Type: StringFieldType, Required: true}, + }, + }) + if err != nil { + t.Fatal(err) + } + defer table.Delete(ctx) + + it := table.Read(ctx) + err = checkIteratorRead(it, []Value{}) + if err != nil { + t.Fatalf("failed to read empty table: %v", err) + } + if !it.IsAccelerated() { + t.Fatal("expected storage api to be used") + } +} + +func TestIntegration_StorageReadFromSources(t *testing.T) { + if client == nil { + t.Skip("Integration tests skipped") + } + ctx := context.Background() + + dstTable := dataset.Table(tableIDs.New()) + dstTable.c = storageOptimizedClient + + sql := `SELECT 1 as num, 'one' as str +UNION ALL +SELECT 2 as num, 'two' as str +UNION ALL +SELECT 3 as num, 'three' as str +ORDER BY num` + q := storageOptimizedClient.Query(sql) + q.Dst = dstTable + job, err := q.Run(ctx) + if err != nil { + t.Fatal(err) + } + status, err := job.Wait(ctx) + if err != nil { + t.Fatal(err) + } + if err := status.Err(); err != nil { + t.Fatal(err) + } + expectedRows := [][]Value{ + {int64(1), "one"}, + {int64(2), "two"}, + {int64(3), "three"}, + } + tableRowIt := dstTable.Read(ctx) + if err = checkRowsRead(tableRowIt, expectedRows); err != nil { + t.Fatalf("checkRowsRead(table): %v", err) + } + if !tableRowIt.IsAccelerated() { + t.Fatalf("reading from table should use Storage API") + } + jobRowIt, err := job.Read(ctx) + if err != nil { + t.Fatalf("ReadJobResults(job): %v", err) + } + if err = checkRowsRead(jobRowIt, expectedRows); err != nil { + t.Fatalf("checkRowsRead(job): %v", err) + } + if !jobRowIt.IsAccelerated() { + t.Fatalf("reading job should use Storage API") + } + q.Dst = nil + q.forceStorageAPI = true + qRowIt, err := q.Read(ctx) + if err != nil { + t.Fatalf("ReadQuery(query): %v", err) + } + if !qRowIt.IsAccelerated() { + t.Fatalf("reading query should use Storage API") + } + if err = checkRowsRead(qRowIt, expectedRows); err != nil { + t.Fatalf("checkRowsRead(query): %v", err) + } +} + +func TestIntegration_StorageReadScriptJob(t *testing.T) { + if client == nil { + t.Skip("Integration tests skipped") + } + tableID := tableIDs.New() + ctx := context.Background() + + sql := fmt.Sprintf(` +-- Statement 0 +DECLARE x INT64; +SET x = 4; +-- Statement 1 +SELECT 1 as foo; +-- Statement 2 +SELECT 1 as num, 'one' as str +UNION ALL +SELECT 2 as num, 'two' as str; +-- Statement 3 +SELECT 1 as num, 'one' as str +UNION ALL +SELECT 2 as num, 'two' as str +UNION ALL +SELECT 3 as num, 'three' as str +UNION ALL +SELECT x as num, 'four' as str +ORDER BY num; +-- Statement 4 +CREATE TABLE %s.%s ( num INT64, str STRING ); +-- Statement 5 +DROP TABLE %s.%s; +`, dataset.DatasetID, tableID, dataset.DatasetID, tableID) + q := storageOptimizedClient.Query(sql) + q.forceStorageAPI = true + it, err := q.Read(ctx) + if err != nil { + t.Fatal(err) + } + expectedRows := [][]Value{ + {int64(1), "one"}, + {int64(2), "two"}, + {int64(3), "three"}, + {int64(4), "four"}, + } + if err = checkRowsRead(it, expectedRows); err != nil { + t.Fatalf("checkRowsRead(it): %v", err) + } + if !it.IsAccelerated() { + t.Fatalf("reading job should use Storage API") + } +} + +func TestIntegration_StorageReadQueryOrdering(t *testing.T) { + if client == nil { + t.Skip("Integration tests skipped") + } + ctx := context.Background() + + table := "`bigquery-public-data.usa_names.usa_1910_current`" + testCases := []struct { + name string + query string + maxExpectedStreams int + }{ + { + name: "Non_Ordered_Query", + query: fmt.Sprintf(`SELECT name, number, state FROM %s`, table), + maxExpectedStreams: -1, // No limit + }, + { + name: "Ordered_Query", + query: fmt.Sprintf(`SELECT name, number, state FROM %s order by name`, table), + maxExpectedStreams: 1, + }, + } + + type S struct { + Name string + Number int + State string + } + + for _, tc := range testCases { + q := storageOptimizedClient.Query(tc.query) + q.forceStorageAPI = true + + it, err := q.Read(ctx) + if err != nil { + t.Fatal(err) + } + + total, err := countIteratorRows(it) + if err != nil { + t.Fatal(err) + } + bqSession := it.arrowIterator.session.bqSession + if len(bqSession.Streams) == 0 { + t.Fatalf("%s: expected to use at least one stream but found %d", tc.name, len(bqSession.Streams)) + } + streamSettings := it.arrowIterator.session.settings.maxStreamCount + if tc.maxExpectedStreams > 0 { + if streamSettings > tc.maxExpectedStreams { + t.Fatalf("%s: expected stream settings to be at most %d streams but found %d", tc.name, tc.maxExpectedStreams, streamSettings) + } + if len(bqSession.Streams) > tc.maxExpectedStreams { + t.Fatalf("%s: expected server to set up at most %d streams but found %d", tc.name, tc.maxExpectedStreams, len(bqSession.Streams)) + } + } else { + if streamSettings != 0 { + t.Fatalf("%s: expected stream settings to be 0 (server defines amount of stream) but found %d", tc.name, streamSettings) + } + } + if total != it.TotalRows { + t.Fatalf("%s: should have read %d rows, but read %d", tc.name, it.TotalRows, total) + } + if !it.IsAccelerated() { + t.Fatalf("%s: expected query to be accelerated by Storage API", tc.name) + } + } +} + +func TestIntegration_StorageReadQueryMorePages(t *testing.T) { + if client == nil { + t.Skip("Integration tests skipped") + } + ctx := context.Background() + table := "`bigquery-public-data.samples.github_timeline`" + sql := fmt.Sprintf(`SELECT repository_url as url, repository_owner as owner, repository_forks as forks FROM %s`, table) + // Don't forceStorageAPI usage and still see internally Storage API is selected + q := storageOptimizedClient.Query(sql) + it, err := q.Read(ctx) + if err != nil { + t.Fatal(err) + } + if !it.IsAccelerated() { + t.Fatal("expected query to use Storage API") + } + + type S struct { + URL NullString + Owner NullString + Forks NullInt64 + } + + total, err := countIteratorRows(it) + if err != nil { + t.Fatal(err) + } + bqSession := it.arrowIterator.session.bqSession + if len(bqSession.Streams) == 0 { + t.Fatalf("should use more than one stream but found %d", len(bqSession.Streams)) + } + if total != it.TotalRows { + t.Fatalf("should have read %d rows, but read %d", it.TotalRows, total) + } +} + +func TestIntegration_StorageReadCancel(t *testing.T) { + if client == nil { + t.Skip("Integration tests skipped") + } + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + table := "`bigquery-public-data.samples.github_timeline`" + sql := fmt.Sprintf(`SELECT repository_url as url, repository_owner as owner, repository_forks as forks FROM %s`, table) + storageOptimizedClient.rc.settings.maxWorkerCount = 1 + q := storageOptimizedClient.Query(sql) + q.forceStorageAPI = true + it, err := q.Read(ctx) + if err != nil { + t.Fatal(err) + } + if !it.IsAccelerated() { + t.Fatal("expected query to use Storage API") + } + + for { + var dst []Value + err := it.Next(&dst) + if err == iterator.Done { + break + } + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + break + } + t.Fatalf("failed to fetch via storage API: %v", err) + } + } + // resources are cleaned asynchronously + time.Sleep(500 * time.Millisecond) + if !it.arrowIterator.isDone() { + t.Fatal("expected stream to be done") + } +} + +func countIteratorRows(it *RowIterator) (total uint64, err error) { + for { + var dst []Value + err := it.Next(&dst) + if err == iterator.Done { + break + } + if err != nil { + return total, fmt.Errorf("failed to fetch via storage API: %w", err) + } + total++ + } + return total, err +} + +func checkRowsRead(it *RowIterator, expectedRows [][]Value) error { + if int(it.TotalRows) != len(expectedRows) { + return fmt.Errorf("expected %d rows, found %d", len(expectedRows), it.TotalRows) + } + for _, row := range expectedRows { + err := checkIteratorRead(it, row) + if err != nil { + return err + } + } + return nil +} + +func checkIteratorRead(it *RowIterator, expectedRow []Value) error { + var outRow []Value + err := it.Next(&outRow) + if err == iterator.Done { + return nil + } + if err != nil { + return fmt.Errorf("failed to fetch via storage API: %v", err) + } + if len(outRow) != len(expectedRow) { + return fmt.Errorf("expected %d columns, but got %d", len(expectedRow), len(outRow)) + } + if !testutil.Equal(outRow, expectedRow) { + return fmt.Errorf("got %v, want %v", outRow, expectedRow) + } + return nil +} diff --git a/bigquery/storage_iterator.go b/bigquery/storage_iterator.go new file mode 100644 index 000000000000..702b90b67694 --- /dev/null +++ b/bigquery/storage_iterator.go @@ -0,0 +1,334 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bigquery + +import ( + "context" + "errors" + "fmt" + "io" + "sync" + "sync/atomic" + "time" + + "cloud.google.com/go/bigquery/internal/query" + "cloud.google.com/go/bigquery/storage/apiv1/storagepb" + "github.com/googleapis/gax-go/v2" + "golang.org/x/sync/semaphore" + "google.golang.org/api/iterator" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// arrowIterator is a raw interface for getting data from Storage Read API +type arrowIterator struct { + done uint32 // atomic flag + errs chan error + ctx context.Context + + schema Schema + decoder *arrowDecoder + records chan arrowRecordBatch + + session *readSession +} + +type arrowRecordBatch []byte + +func newStorageRowIteratorFromTable(ctx context.Context, table *Table, ordered bool) (*RowIterator, error) { + md, err := table.Metadata(ctx) + if err != nil { + return nil, err + } + rs, err := table.c.rc.sessionForTable(ctx, table, ordered) + if err != nil { + return nil, err + } + it, err := newStorageRowIterator(rs) + if err != nil { + return nil, err + } + it.arrowIterator.schema = md.Schema + return it, nil +} + +func newStorageRowIteratorFromJob(ctx context.Context, job *Job) (*RowIterator, error) { + cfg, err := job.Config() + if err != nil { + return nil, err + } + qcfg := cfg.(*QueryConfig) + if qcfg.Dst == nil { + if !job.isScript() { + return nil, fmt.Errorf("job has no destination table to read") + } + lastJob, err := resolveLastChildSelectJob(ctx, job) + if err != nil { + return nil, err + } + return newStorageRowIteratorFromJob(ctx, lastJob) + } + ordered := query.HasOrderedResults(qcfg.Q) + return newStorageRowIteratorFromTable(ctx, qcfg.Dst, ordered) +} + +func resolveLastChildSelectJob(ctx context.Context, job *Job) (*Job, error) { + childJobs := []*Job{} + it := job.Children(ctx) + for { + job, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + return nil, fmt.Errorf("failed to resolve table for script job: %w", err) + } + if !job.isSelectQuery() { + continue + } + childJobs = append(childJobs, job) + } + if len(childJobs) == 0 { + return nil, fmt.Errorf("failed to resolve table for script job: no child jobs found") + } + return childJobs[0], nil +} + +func newRawStorageRowIterator(rs *readSession) (*arrowIterator, error) { + arrowIt := &arrowIterator{ + ctx: rs.ctx, + session: rs, + records: make(chan arrowRecordBatch, rs.settings.maxWorkerCount+1), + errs: make(chan error, rs.settings.maxWorkerCount+1), + } + if rs.bqSession == nil { + err := rs.start() + if err != nil { + return nil, err + } + } + return arrowIt, nil +} + +func newStorageRowIterator(rs *readSession) (*RowIterator, error) { + arrowIt, err := newRawStorageRowIterator(rs) + if err != nil { + return nil, err + } + totalRows := arrowIt.session.bqSession.EstimatedRowCount + it := &RowIterator{ + ctx: rs.ctx, + arrowIterator: arrowIt, + TotalRows: uint64(totalRows), + rows: [][]Value{}, + } + it.nextFunc = nextFuncForStorageIterator(it) + it.pageInfo = &iterator.PageInfo{ + Token: "", + MaxSize: int(totalRows), + } + return it, nil +} + +func nextFuncForStorageIterator(it *RowIterator) func() error { + return func() error { + if len(it.rows) > 0 { + return nil + } + arrowIt := it.arrowIterator + record, err := arrowIt.next() + if err == iterator.Done { + if len(it.rows) == 0 { + return iterator.Done + } + return nil + } + if err != nil { + return err + } + + rows, err := arrowIt.decoder.decodeArrowRecords(record) + if err != nil { + return err + } + it.rows = rows + return nil + } +} + +func (it *arrowIterator) init() error { + if it.decoder != nil { // Already initialized + return nil + } + + bqSession := it.session.bqSession + if bqSession == nil { + return errors.New("read session not initialized") + } + + streams := bqSession.Streams + if len(streams) == 0 { + return iterator.Done + } + + if it.schema == nil { + meta, err := it.session.table.Metadata(it.ctx) + if err != nil { + return err + } + it.schema = meta.Schema + } + + decoder, err := newArrowDecoderFromSession(it.session, it.schema) + if err != nil { + return err + } + it.decoder = decoder + + wg := sync.WaitGroup{} + wg.Add(len(streams)) + sem := semaphore.NewWeighted(int64(it.session.settings.maxWorkerCount)) + go func() { + wg.Wait() + close(it.records) + close(it.errs) + it.markDone() + }() + + go func() { + for _, readStream := range streams { + err := sem.Acquire(it.ctx, 1) + if err != nil { + wg.Done() + continue + } + go func(readStreamName string) { + it.processStream(readStreamName) + sem.Release(1) + wg.Done() + }(readStream.Name) + } + }() + return nil +} + +func (it *arrowIterator) markDone() { + atomic.StoreUint32(&it.done, 1) +} + +func (it *arrowIterator) isDone() bool { + return atomic.LoadUint32(&it.done) != 0 +} + +func (it *arrowIterator) processStream(readStream string) { + bo := gax.Backoff{} + var offset int64 + for { + rowStream, err := it.session.readRows(&storagepb.ReadRowsRequest{ + ReadStream: readStream, + Offset: offset, + }) + if err != nil { + if it.session.ctx.Err() != nil { // context cancelled, don't try again + return + } + backoff, shouldRetry := retryReadRows(bo, err) + if shouldRetry { + if err := gax.Sleep(it.ctx, backoff); err != nil { + return // context cancelled + } + continue + } + it.errs <- fmt.Errorf("failed to read rows on stream %s: %w", readStream, err) + continue + } + offset, err = it.consumeRowStream(readStream, rowStream, offset) + if errors.Is(err, io.EOF) { + return + } + if err != nil { + if it.session.ctx.Err() != nil { // context cancelled, don't queue error + return + } + // try to re-open row stream with updated offset + } + } +} + +func retryReadRows(bo gax.Backoff, err error) (time.Duration, bool) { + s, ok := status.FromError(err) + if !ok { + return bo.Pause(), false + } + switch s.Code() { + case codes.Aborted, + codes.Canceled, + codes.DeadlineExceeded, + codes.FailedPrecondition, + codes.Internal, + codes.Unavailable: + return bo.Pause(), true + } + return bo.Pause(), false +} + +func (it *arrowIterator) consumeRowStream(readStream string, rowStream storagepb.BigQueryRead_ReadRowsClient, offset int64) (int64, error) { + for { + r, err := rowStream.Recv() + if err != nil { + if err == io.EOF { + return offset, err + } + return offset, fmt.Errorf("failed to consume rows on stream %s: %w", readStream, err) + } + if r.RowCount > 0 { + offset += r.RowCount + arrowRecordBatch := r.GetArrowRecordBatch() + it.records <- arrowRecordBatch.SerializedRecordBatch + } + } +} + +// next return the next batch of rows as an arrow.Record. +// Accessing Arrow Records directly has the drawnback of having to deal +// with memory management. +func (it *arrowIterator) next() (arrowRecordBatch, error) { + if err := it.init(); err != nil { + return nil, err + } + if len(it.records) > 0 { + return <-it.records, nil + } + if it.isDone() { + return nil, iterator.Done + } + select { + case record := <-it.records: + if record == nil { + return nil, iterator.Done + } + return record, nil + case err := <-it.errs: + return nil, err + case <-it.ctx.Done(): + return nil, it.ctx.Err() + } +} + +// IsAccelerated check if the current RowIterator is +// being accelerated by Storage API. +func (it *RowIterator) IsAccelerated() bool { + return it.arrowIterator != nil +} diff --git a/bigquery/storage_iterator_test.go b/bigquery/storage_iterator_test.go new file mode 100644 index 000000000000..03293af069bc --- /dev/null +++ b/bigquery/storage_iterator_test.go @@ -0,0 +1,131 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bigquery + +import ( + "context" + "errors" + "fmt" + "io" + "testing" + "time" + + "cloud.google.com/go/bigquery/storage/apiv1/storagepb" + gax "github.com/googleapis/gax-go/v2" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestStorageIteratorRetry(t *testing.T) { + cancelledCtx, cancel := context.WithCancel(context.Background()) + cancel() + testCases := []struct { + ctx context.Context + desc string + errors []error + wantFail bool + }{ + { + desc: "no error", + errors: []error{nil}, + wantFail: false, + }, + { + desc: "transient failures", + errors: []error{ + status.Errorf(codes.DeadlineExceeded, "try 1"), + status.Errorf(codes.Unavailable, "try 2"), + status.Errorf(codes.Canceled, "try 3"), + status.Errorf(codes.Internal, "try 4"), + nil, + }, + wantFail: false, + }, + { + desc: "permanent error", + errors: []error{ + status.Errorf(codes.InvalidArgument, "invalid args"), + }, + wantFail: true, + }, + { + ctx: cancelledCtx, + desc: "context cancelled/deadline exceeded", + errors: []error{ + fmt.Errorf("random error"), + fmt.Errorf("another random error"), + fmt.Errorf("yet another random error"), + }, + wantFail: true, + }, + } + + for _, tc := range testCases { + baseCtx := tc.ctx + if baseCtx == nil { + baseCtx = context.Background() + } + ctx, cancel := context.WithTimeout(baseCtx, 5*time.Second) + defer cancel() + it, err := newRawStorageRowIterator(&readSession{ + ctx: ctx, + settings: defaultReadClientSettings(), + readRowsFunc: func(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) { + if len(tc.errors) == 0 { + return &testReadRowsClient{}, nil + } + err := tc.errors[0] + tc.errors = tc.errors[1:] + if err != nil { + return nil, err + } + return &testReadRowsClient{}, nil + }, + bqSession: &storagepb.ReadSession{}, + }) + if err != nil { + t.Fatalf("case %s: newRawStorageRowIterator: %v", tc.desc, err) + } + + it.processStream("test-stream") + + if errors.Is(it.ctx.Err(), context.Canceled) || errors.Is(it.ctx.Err(), context.DeadlineExceeded) { + if tc.wantFail { + continue + } + t.Fatalf("case %s: deadline exceeded", tc.desc) + } + if tc.wantFail && len(it.errs) == 0 { + t.Fatalf("case %s:want test to fail, but found no errors", tc.desc) + } + if !tc.wantFail && len(it.errs) > 0 { + t.Fatalf("case %s:test should not fail, but found %d errors", tc.desc, len(it.errs)) + } + } +} + +type testReadRowsClient struct { + storagepb.BigQueryRead_ReadRowsClient + responses []*storagepb.ReadRowsResponse +} + +func (trrc *testReadRowsClient) Recv() (*storagepb.ReadRowsResponse, error) { + if len(trrc.responses) == 0 { + return nil, io.EOF + } + r := trrc.responses[0] + trrc.responses = trrc.responses[:1] + return r, nil +} diff --git a/bigquery/table.go b/bigquery/table.go index 642476d18da5..f0ec29297929 100644 --- a/bigquery/table.go +++ b/bigquery/table.go @@ -813,6 +813,12 @@ func (t *Table) Read(ctx context.Context) *RowIterator { } func (t *Table) read(ctx context.Context, pf pageFetcher) *RowIterator { + if t.c.rc != nil { + it, err := newStorageRowIteratorFromTable(ctx, t, false) + if err == nil { + return it + } + } return newRowIterator(ctx, &rowSource{t: t}, pf) }