diff --git a/.github/workflows/changelog.yml b/.github/workflows/changelog.yml new file mode 100644 index 000000000..3ccff7d60 --- /dev/null +++ b/.github/workflows/changelog.yml @@ -0,0 +1,19 @@ +on: + pull_request_target: + +name: changelog + +jobs: + changelog: + name: changelog + runs-on: ubuntu-latest + steps: + - name: Checkout sources + uses: actions/checkout@v2 + + - name: Changelog updated + uses: Zomzog/changelog-checker@v1.2.0 + with: + fileName: CHANGELOG.md + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/version.yml b/.github/workflows/version.yml new file mode 100644 index 000000000..0567e14ea --- /dev/null +++ b/.github/workflows/version.yml @@ -0,0 +1,19 @@ +on: + pull_request_target: + +name: version + +jobs: + changelog: + name: version + runs-on: ubuntu-latest + steps: + - name: Checkout sources + uses: actions/checkout@v2 + + - name: Version updated + uses: Zomzog/changelog-checker@v1.2.0 + with: + fileName: internal/meta/version.go + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/AUTHORS b/AUTHORS index 898b45d08..483a9f3e6 100644 --- a/AUTHORS +++ b/AUTHORS @@ -1,4 +1,4 @@ The following authors have created the source code of "Yandex Database GO SDK" published and distributed by YANDEX LLC as the owner: -Sergey Kamardin +Aleksey Myasnikov diff --git a/CHANGELOG.md b/CHANGELOG.md index 1dc381e37..8f105aca3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,16 @@ +## 3.0.0 +* Refactored sources for splitting public interfaces and internal + implementation for core changes in the future without change major version +* Refactored of transport level of driver - now we use grpc code generation by stock `protoc-gen-go` instead internal protoc codegen. New API provide operate from codegen grpc-clients with driver as a single grpc client connection. But driver hide inside self a pool of grpc connections to different cluster endpoints YDB. All communications with YDB (base services includes to driver: table, discovery, coordiantion and ratelimiter) provides stock codegen grpc-clients now. +* Much changed API of driver for easy usage. +* Dropped package `ydbsql` (moved to external project) +* Extracted yandex-cloud authentication to external project +* Extracted examples to external project +* Changed of traces API for next usage in jaeger и prometheus +* Dropped old APIs marked as `deprecated` +* Added integration tests with docker ydb container +* Changed table session and endpoint link type from string address to integer NodeID + ## 2.10.1 * Fixed race on ydbsql concurrent connect. This hotfix only for v2 version diff --git a/NEXT_MAJOR_RELEASE.md b/NEXT_MAJOR_RELEASE.md index 6bcf8035e..7a54b58f7 100644 --- a/NEXT_MAJOR_RELEASE.md +++ b/NEXT_MAJOR_RELEASE.md @@ -1,30 +1,2 @@ # Breaking changes for the next major release -- [x] Delete deprecated api package -- [x] Delete deprecated internalapi package -- [x] Delete deprecated parameter KeepAliveBatchSize from session pool -- [x] Delete deprecated ConnUsePolicy, EndpointInfo, WithEndpointInfo, WithEndpointInfoAndPolicy, ContextConn -- [x] Delete deprecated client option DefaultMaxQueryCacheSize, MaxQueryCacheSize and client query cache -- [x] Change `proto` codegen code in `api` from `internal/cmd/protoc-gen` tool to standard `protoc-gen-go` tool. - This need for change imports to standard. Current imports are deprecated and linters alarms -- [x] Replace grpc and protobuf libraries to actual -- [x] Replace all internal usages of `driver.Call()` and `driver.StreamRead()` to code-generated grpc-clients, - which will be use driver as `grpc.ClientConnInterface` provider. -- [x] Delete deprecated Driver interface -- [x] Remove or hide (do private) deprecated API for new `scanner`. -- [x] Delete deprecated ready statistics from session pool -- [x] Hide (do private) entity `table.Client` or `table.SessionPool` because it most difficultly for SDK users -- [x] Drop `table.SessionProvider.PutBusy()` interface func -- [x] Drop `Retry.MustCheckSession()` func -- [x] Drop `RetryCheckSession` constant -- [x] Drop `table.SessionPoolStats.BusyCheck` counter -- [x] Drop `ydbsql.WithSessionPoolBusyCheckInterval()` -- [x] Drop `connect.WithSessionPoolBusyCheckInterval()` -- [x] Drop marked as deprecated some retry constants -- [x] Extract auth package to neighbour project(-s) for isolation ydb-go-sdk from unnecessary dependencies -- [x] Extract coordination package to neighbour project as plugin over ydb-go-sdk -- [x] Extract ratelimiter package to neighbour project as plugin over ydb-go-sdk -- [x] Extract experimental package to neighbour project as plugin over ydb-go-sdk -- [x] Refactoring Trace API: exclude duplicates of data from closure Trace functions -- [x] Move traceutil from internal to root -- [X] Drop ReadConnStats() public method. Use Cluster.Stats() instead -- [x] Drop deprecated NextStreamSet (merged logic into NextSet) \ No newline at end of file +- [ ] diff --git a/README.md b/README.md index b098e77a6..e620fda6c 100644 --- a/README.md +++ b/README.md @@ -59,17 +59,12 @@ The straightforward example of querying data may look similar to this: // Determine timeout for connect or do nothing ctx := context.Background() - connectParams, err := ydb.ConnectionString(os.Getenv("YDB")) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "cannot create connect params from connection string env['YDB'] = '%s': %v\n", os.Getenv("YDB"), err) - os.Exit(1) - } - // connect package helps to connect to database, returns connection object which // provide necessary clients such as table.Client, scheme.Client, etc. db, err := ydb.New( ctx, connectParams, + ydb.WithConnectionString(ydb.ConnectionString(os.Getenv("YDB_CONNECTION_STRING"))), ydb.WithDialTimeout(3 * time.Second), ydb.WithCertificatesFromFile("~/.ydb/CA.pem"), ydb.WithSessionPoolIdleThreshold(time.Second * 5), @@ -80,35 +75,42 @@ The straightforward example of querying data may look similar to this: if err != nil { // handle error } - defer func() { _ = db.Close() }() - - // Create session for execute queries - session, err := db.Table().CreateSession(ctx) - if err != nil { - // handle error - } - defer session.Close(ctx) + defer func() { _ = db.Close(ctx) }() // Prepare transaction control for upcoming query execution. // NOTE: result of TxControl() may be reused. txc := table.TxControl( - table.BeginTx(table.WithSerializableReadWrite()), - table.CommitTx(), + table.BeginTx(table.WithSerializableReadWrite()), + table.CommitTx(), ) - // Execute text query without preparation and with given "autocommit" - // transaction control. That is, transaction will be committed without - // additional calls. Notice the "_" unused variable – it stands for created - // transaction during execution, but as said above, transaction is committed - // for us and `ydb-go-sdk` do not want to do anything with it. - _, res, err := session.Execute(ctx, txc, - `--!syntax_v1 - DECLARE $mystr AS Utf8?; - SELECT 42 as id, $mystr as mystr - `, - table.NewQueryParameters( - table.ValueParam("$mystr", types.OptionalValue(types.UTF8Value("test"))), - ), + var res *table.Result + + // Do() provide the best effort for executing operation + // Do implements internal busy loop until one of the following conditions occurs: + // - deadline was cancelled or deadlined + // - operation returned nil as error + // Note that in case of prepared statements call to Prepare() must be made + // inside the function body. + err := c.Do( + ctx, + func(ctx context.Context, s table.Session) (err error) { + // Execute text query without preparation and with given "autocommit" + // transaction control. That is, transaction will be committed without + // additional calls. Notice the "_" unused variable – it stands for created + // transaction during execution, but as said above, transaction is committed + // for us and `ydb-go-sdk` do not want to do anything with it. + _, res, err := session.Execute(ctx, txc, + `--!syntax_v1 + DECLARE $mystr AS Utf8?; + SELECT 42 as id, $mystr as mystr + `, + table.NewQueryParameters( + table.ValueParam("$mystr", types.OptionalValue(types.UTF8Value("test"))), + ), + ) + return err + }, ) if err != nil { return err // handle error @@ -139,35 +141,13 @@ The straightforward example of querying data may look similar to this: } ``` -This example can be tested as https://github.com/ydb-platform/ydb-go-examples/tree/master/from_readme - YDB sessions may become staled and appropriate error will be returned. To -reduce boilerplate overhead for such cases `ydb-go-sdk` provides generic retry logic: - -```go - var res *table.Result - // Retry() provide the best effort fo retrying operation - // Retry implements internal busy loop until one of the following conditions occurs: - // - deadline was cancelled or deadlined - // - retry operation returned nil as error - // Note that in case of prepared statements call to Prepare() must be made - // inside the function body. - err := c.Retry(ctx, false, - func(ctx context.Context, s table.Session) (err error) { - res, err = s.Execute(...) - return - }, - ) -``` - -That is, instead of manual creation of `table.Session`, we give a -`table.Client` such responsibility. It holds instances of active sessions and -"pings" them periodically to keep them alive. +reduce boilerplate overhead for such cases `ydb-go-sdk` provides generic retry logic ## Credentials There are different variants to get `ydb.Credentials` object to get authorized. -Usage examples can be found [here](https://github.com/ydb-platform/ydb-go-examples/tree/master/auth). +Usage examples can be found [here](https://github.com/ydb-platform/ydb-go-examples/tree/master/cmd/auth). ## Examples diff --git a/go.mod b/go.mod index b75c9ea0e..b11fc4ee9 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,15 @@ module github.com/ydb-platform/ydb-go-sdk/v3 go 1.14 require ( + github.com/ydb-platform/ydb-api-protos v0.0.0-20210921091122-f697ac767e19 // indirect github.com/ydb-platform/ydb-go-genproto v0.0.0-20210916081217-f4e55570b874 - google.golang.org/grpc v1.37.0 + google.golang.org/grpc v1.38.0 google.golang.org/protobuf v1.26.0 ) + +retract ( + v3.0.0 + v3.0.1 + v3.0.2 + v3.0.3 +) \ No newline at end of file diff --git a/go.sum b/go.sum index 097f8e03d..090bf00ac 100644 --- a/go.sum +++ b/go.sum @@ -33,6 +33,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/ydb-platform/ydb-api-protos v0.0.0-20210921091122-f697ac767e19 h1:uywQn4qvfAgy7muv7G+C8t+KbKnTkQhs82O45/9CcPg= +github.com/ydb-platform/ydb-api-protos v0.0.0-20210921091122-f697ac767e19/go.mod h1:I0NWnA2hX8d2tKctW3AO2ne1Xc+FhvhA8PbP4iXQa5M= github.com/ydb-platform/ydb-go-genproto v0.0.0-20210916081217-f4e55570b874 h1:wbpYXEn87uEDkH3RvG37XW2ipEkqxpJUCGAoU1on0yE= github.com/ydb-platform/ydb-go-genproto v0.0.0-20210916081217-f4e55570b874/go.mod h1:cc138nptTn9eKptCQl/grxP6pBKpo/bnXDiOxuVZtps= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -70,8 +72,9 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= -google.golang.org/grpc v1.37.0 h1:uSZWeQJX5j11bIQ4AJoj+McDBo29cY1MCoC1wO3ts+c= google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= +google.golang.org/grpc v1.38.0 h1:/9BgsAsa5nWe26HqOlvlgJnqBuktYOLCgjCPqsa56W0= +google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/internal/driver/cluster/balancer/conn/conn.go b/internal/driver/cluster/balancer/conn/conn.go index 8052a3f23..86be13f4a 100644 --- a/internal/driver/cluster/balancer/conn/conn.go +++ b/internal/driver/cluster/balancer/conn/conn.go @@ -1,7 +1,5 @@ package conn -import "C" - import ( "context" "sync" diff --git a/internal/meta/version.go b/internal/meta/version.go index 0765a354d..0018ad8da 100644 --- a/internal/meta/version.go +++ b/internal/meta/version.go @@ -1,5 +1,5 @@ package meta const ( - Version = "ydb-go-sdk/2.10.1" + Version = "ydb-go-sdk/3.0.0" ) diff --git a/internal/response/response.go b/internal/response/response.go index 88f89690d..ee114dc55 100644 --- a/internal/response/response.go +++ b/internal/response/response.go @@ -1,93 +1,7 @@ package response -import ( - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations" -) +import "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations" -//type Response interface { -// GetOpReady() bool -// GetOpID() string -// GetStatus() Ydb.StatusIds_StatusCode -// GetIssues() []*Ydb_Issue.IssueMessage -// GetResult() *anypb.Any -// GetResponseProto() proto.Message -//} -// type Response interface { - //proto.Message GetOperation() *Ydb_Operations.Operation } - -//var _ Response = &opResponseWrapper{} -// -//type opResponseWrapper struct { -// response Response -//} -// -//func WrapOpResponse(resp Response) Response { -// return &opResponseWrapper{response: resp} -//} -// -//func (r *opResponseWrapper) GetOpReady() bool { -// return r.response.GetOperation().GetReady() -//} -// -//func (r *opResponseWrapper) GetOpID() string { -// return r.response.GetOperation().GetId() -//} -// -//func (r *opResponseWrapper) GetStatus() Ydb.StatusIds_StatusCode { -// return r.response.GetOperation().GetStatus() -//} -// -//func (r *opResponseWrapper) GetIssues() []*Ydb_Issue.IssueMessage { -// return r.response.GetOperation().GetIssues() -//} -// -//func (r *opResponseWrapper) GetResult() *anypb.Any { -// return r.response.GetOperation().GetResult() -//} -// -//func (r *opResponseWrapper) GetResponseProto() proto.Message { -// return r.response -//} -// -//type NoOpResponse interface { -// proto.Message -// GetStatus() Ydb.StatusIds_StatusCode -// GetIssues() []*Ydb_Issue.IssueMessage -//} -// -//var _ Response = &noOpResponseWrapper{} -// -//type noOpResponseWrapper struct { -// response NoOpResponse -//} -// -//func WrapNoOpResponse(resp NoOpResponse) Response { -// return &noOpResponseWrapper{response: resp} -//} -// -//func (r *noOpResponseWrapper) GetIssues() []*Ydb_Issue.IssueMessage { -// return r.response.GetIssues() -//} -// -//func (r *noOpResponseWrapper) GetOpReady() bool { -// return true -//} -// -//func (r *noOpResponseWrapper) GetOpID() string { -// return "" -//} -// -//func (r *noOpResponseWrapper) GetResponseProto() proto.Message { -// return r.response -//} -// -//func (r *noOpResponseWrapper) GetResult() *anypb.Any { -// return nil -//} -// -//func (r *noOpResponseWrapper) GetStatus() Ydb.StatusIds_StatusCode { -// return r.response.GetStatus() -//} diff --git a/internal/table/client.go b/internal/table/client.go index 6c4764dd9..da72e40af 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -43,8 +43,6 @@ type Client interface { Get(ctx context.Context) (s Session, err error) Take(ctx context.Context, s Session) (took bool, err error) Put(ctx context.Context, s Session) (err error) - Create(ctx context.Context) (s Session, err error) - Retry(ctx context.Context, isIdempotentOperation bool, op table.RetryOperation) error Close(ctx context.Context) error } @@ -114,14 +112,6 @@ type client struct { testHookGetWaitCh func() // nil except some tests. } -func (c *client) RetryIdempotent(ctx context.Context, op table.RetryOperation) (err error) { - return c.Retry(ctx, true, op) -} - -func (c *client) RetryNonIdempotent(ctx context.Context, op table.RetryOperation) (err error) { - return c.Retry(ctx, false, op) -} - func (c *client) isClosed() bool { c.mu.Lock() defer c.mu.Unlock() @@ -536,18 +526,22 @@ func (c *client) Close(ctx context.Context) (err error) { return nil } -// Retry provide the best effort fo retrying operation -// Retry implements internal busy loop until one of the following conditions is met: +// Do provide the best effort for execute operation +// Do implements internal busy loop until one of the following conditions is met: // - deadline was canceled or deadlined // - retry operation returned nil as error // Warning: if deadline without deadline or cancellation func Retry will be worked infinite -func (c *client) Retry(ctx context.Context, isOperationIdempotent bool, op table.RetryOperation) (err error) { +func (c *client) Do(ctx context.Context, op table.Operation, opts ...table.Option) (err error) { + options := table.Options{} + for _, o := range opts { + o(&options) + } return retryBackoff( ctx, c, retry.FastBackoff, retry.SlowBackoff, - isOperationIdempotent, + options.Idempotent, op, ) } diff --git a/internal/table/retry.go b/internal/table/retry.go index 169ec6162..ba95842f1 100644 --- a/internal/table/retry.go +++ b/internal/table/retry.go @@ -25,24 +25,26 @@ type SessionProvider interface { // CloseSession must be fast. If necessary, can be async. CloseSession(ctx context.Context, s Session) error - // Retry provide the best effort fo retrying operation - // Retry implements internal busy loop until one of the following conditions is met: - // - deadline was canceled or deadlined - // - retry operation returned nil as error - // If deadline without deadline used build client RetryTimeout - Retry(ctx context.Context, retryNoIdempotent bool, op table.RetryOperation) (err error) - // Close provide cleanup sessions Close(ctx context.Context) error + + // Do provide the best effort for execute operation + // Do implements internal busy loop until one of the following conditions is met: + // - deadline was canceled or deadlined + // - retry operation returned nil as error + // Warning: if deadline without deadline or cancellation func Retry will be worked infinite + Do(ctx context.Context, op table.Operation, opts ...table.Option) (err error) } type SessionProviderFunc struct { OnGet func(context.Context) (Session, error) OnPut func(context.Context, Session) error - OnRetry func(context.Context, table.RetryOperation) error + OnDo func(context.Context, table.Operation, ...table.Option) error OnClose func(context.Context) error } +var _ SessionProvider = SessionProviderFunc{} + func (f SessionProviderFunc) Close(ctx context.Context) error { if f.OnClose == nil { return nil @@ -64,11 +66,11 @@ func (f SessionProviderFunc) Put(ctx context.Context, s Session) error { return f.OnPut(ctx, s) } -func (f SessionProviderFunc) Retry(ctx context.Context, _ bool, op table.RetryOperation) (err error) { - if f.OnRetry == nil { +func (f SessionProviderFunc) Do(ctx context.Context, op table.Operation, opts ...table.Option) (err error) { + if f.OnDo == nil { return retryBackoff(ctx, f, nil, nil, false, op) } - return f.OnRetry(ctx, op) + return f.OnDo(ctx, op) } func (f SessionProviderFunc) CloseSession(ctx context.Context, s Session) error { @@ -93,12 +95,16 @@ type singleSession struct { empty bool } -func (s *singleSession) Close(ctx context.Context) error { - return s.CloseSession(ctx, s.s) +func (s *singleSession) Do(ctx context.Context, op table.Operation, opts ...table.Option) (err error) { + options := table.Options{} + for _, o := range opts { + o(&options) + } + return retryBackoff(ctx, s, s.b, s.b, options.Idempotent, op) } -func (s *singleSession) Retry(ctx context.Context, _ bool, op table.RetryOperation) (err error) { - return retryBackoff(ctx, s, s.b, s.b, false, op) +func (s *singleSession) Close(ctx context.Context) error { + return s.CloseSession(ctx, s.s) } func (s *singleSession) Get(context.Context) (Session, error) { @@ -137,7 +143,7 @@ func retryBackoff( fastBackoff retry.Backoff, slowBackoff retry.Backoff, isOperationIdempotent bool, - op table.RetryOperation, + op table.Operation, ) (err error) { var ( s Session diff --git a/internal/table/retry_test.go b/internal/table/retry_test.go index cd5be0eef..7c8c2a288 100644 --- a/internal/table/retry_test.go +++ b/internal/table/retry_test.go @@ -43,7 +43,7 @@ func TestRetryerBackoffRetryCancelation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) results := make(chan error) go func() { - err := p.Retry(ctx, false, func(ctx context.Context, _ table.Session) error { + err := p.Do(ctx, func(ctx context.Context, _ table.Session) error { return testErr }) results <- err @@ -86,9 +86,8 @@ func TestRetryerBadSession(t *testing.T) { sessions []table.Session ) ctx, cancel := context.WithCancel(context.Background()) - err := p.Retry( + err := p.Do( ctx, - false, func(ctx context.Context, s table.Session) error { sessions = append(sessions, s) i++ @@ -136,15 +135,14 @@ func TestRetryerImmediateReturn(t *testing.T) { t.Fatalf("unexpected panic: %v", e) } }() - pool := SingleSession( + p := SingleSession( simpleSession(t), testutil.BackoffFunc(func(n int) <-chan time.Time { panic("this code will not be called") }), ) - err := pool.Retry( + err := p.Do( context.Background(), - false, func(ctx context.Context, _ table.Session) error { return testErr }, @@ -256,7 +254,7 @@ func TestRetryContextDeadline(t *testing.T) { client := &client{ cluster: testutil.NewCluster(testutil.WithInvokeHandlers(testutil.InvokeHandlers{})), } - pool := SessionProviderFunc{ + p := SessionProviderFunc{ OnGet: client.createSession, } for i := range timeouts { @@ -267,7 +265,7 @@ func TestRetryContextDeadline(t *testing.T) { random := rand.New(rand.NewSource(time.Now().Unix())) ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - _ = pool.Retry( + _ = p.Do( trace.WithRetry( ctx, trace.Retry{ @@ -280,7 +278,6 @@ func TestRetryContextDeadline(t *testing.T) { }, }, ), - false, func(ctx context.Context, _ table.Session) error { select { case <-ctx.Done(): diff --git a/scheme.go b/scheme.go index 3b3252029..dbee954ba 100644 --- a/scheme.go +++ b/scheme.go @@ -109,7 +109,7 @@ func (s *lazyScheme) CleanupDatabase(ctx context.Context, prefix string, names . } case scheme.EntryTable: - err = s.db.Table().RetryNonIdempotent(ctx, func(ctx context.Context, session table.Session) (err error) { + err = s.db.Table().Do(ctx, func(ctx context.Context, session table.Session) (err error) { return session.DropTable(ctx, pt) }) if err != nil { diff --git a/table.go b/table.go index 995b5041e..de9e9e9aa 100644 --- a/table.go +++ b/table.go @@ -18,9 +18,9 @@ type lazyTable struct { m sync.Mutex } -func (t *lazyTable) Pool(ctx context.Context) internal.Client { +func (t *lazyTable) Do(ctx context.Context, op table.Operation, opts ...table.Option) (err error) { t.init(ctx) - return t.client.(internal.Client) + return t.client.Do(ctx, op, opts...) } func (t *lazyTable) Close(ctx context.Context) error { @@ -35,17 +35,6 @@ func (t *lazyTable) Close(ctx context.Context) error { return t.client.Close(ctx) } -func (t *lazyTable) RetryIdempotent(ctx context.Context, op table.RetryOperation) (err error) { - t.init(ctx) - return t.client.RetryIdempotent(ctx, op) -} - -func (t *lazyTable) RetryNonIdempotent(ctx context.Context, op table.RetryOperation) (err error) { - t.init(ctx) - return t.client.RetryNonIdempotent(ctx, op) - -} - func (t *lazyTable) init(ctx context.Context) { t.m.Lock() if t.client == nil { diff --git a/table/client.go b/table/client.go index 2242a510a..022494ad9 100644 --- a/table/client.go +++ b/table/client.go @@ -4,25 +4,29 @@ import ( "context" ) -// RetryOperation is the interface that holds an operation for retry. -type RetryOperation func(context.Context, Session) (err error) +// Operation is the interface that holds an operation for retry. +type Operation func(context.Context, Session) (err error) + +type Option func(o *Options) + +type Options struct { + Idempotent bool +} + +func WithIdempotent() Option { + return func(o *Options) { + o.Idempotent = true + } +} type Client interface { + // Close closes table client Close(ctx context.Context) error - // RetryIdempotent retries the retry operation as a idempotent - // operation (i.e. the operation can't change the state of the database) - // while the operation returns an error - // - // This is an experimental API that can be changed without changing - // the major version. - RetryIdempotent(ctx context.Context, op RetryOperation) (err error) - - // RetryNonIdempotent retries the retry operation as a non-idempotent - // operation (i.e. the operation can change the state of the database) - // while the operation returns an error - // - // This is an experimental API that can be changed without changing - // the major version. - RetryNonIdempotent(ctx context.Context, op RetryOperation) (err error) + // Do provide the best effort for execute operation + // Do implements internal busy loop until one of the following conditions is met: + // - deadline was canceled or deadlined + // - retry operation returned nil as error + // Warning: if deadline without deadline or cancellation func Retry will be worked infinite + Do(ctx context.Context, op Operation, opts ...Option) (err error) } diff --git a/test/series.go b/test/series.go index 583a9c0b2..7c44fd1fa 100644 --- a/test/series.go +++ b/test/series.go @@ -73,7 +73,7 @@ FROM AS_TABLE($episodesData); func readTable(ctx context.Context, c table.Client, path string) error { var res resultset.Result - err := c.RetryIdempotent( + err := c.Do( ctx, func(ctx context.Context, s table.Session) (err error) { res, err = s.StreamReadTable(ctx, path, @@ -133,7 +133,7 @@ func readTable(ctx context.Context, c table.Client, path string) error { func describeTableOptions(ctx context.Context, c table.Client) error { var desc options.TableOptionsDescription - err := c.RetryIdempotent( + err := c.Do( ctx, func(ctx context.Context, s table.Session) (err error) { desc, err = s.DescribeTableOptions(ctx) @@ -196,7 +196,7 @@ func selectSimple(ctx context.Context, c table.Client, prefix string) error { table.CommitTx(), ) var res resultset.Result - err := c.RetryIdempotent( + err := c.Do( ctx, func(ctx context.Context, s table.Session) (err error) { _, res, err = s.Execute(ctx, readTx, query, @@ -254,7 +254,7 @@ func scanQuerySelect(ctx context.Context, c table.Client, prefix string) error { ) var res resultset.Result - err := c.RetryIdempotent( + err := c.Do( ctx, func(ctx context.Context, s table.Session) (err error) { res, err = s.StreamExecuteScanQuery(ctx, query, @@ -300,7 +300,7 @@ func Fill(ctx context.Context, c table.Client, prefix string) error { ), table.CommitTx(), ) - err := c.RetryNonIdempotent( + err := c.Do( ctx, func(ctx context.Context, s table.Session) (err error) { stmt, err := s.Prepare(ctx, render(fill, templateConfig{ @@ -321,7 +321,7 @@ func Fill(ctx context.Context, c table.Client, prefix string) error { } func createTables(ctx context.Context, c table.Client, prefix string) error { - err := c.RetryNonIdempotent( + err := c.Do( ctx, func(ctx context.Context, s table.Session) (err error) { return s.CreateTable(ctx, path.Join(prefix, "series"), @@ -338,7 +338,7 @@ func createTables(ctx context.Context, c table.Client, prefix string) error { return err } - err = c.RetryNonIdempotent( + err = c.Do( ctx, func(ctx context.Context, s table.Session) (err error) { return s.CreateTable(ctx, path.Join(prefix, "seasons"), @@ -355,7 +355,7 @@ func createTables(ctx context.Context, c table.Client, prefix string) error { return err } - err = c.RetryNonIdempotent( + err = c.Do( ctx, func(ctx context.Context, s table.Session) (err error) { return s.CreateTable(ctx, path.Join(prefix, "episodes"), @@ -372,7 +372,7 @@ func createTables(ctx context.Context, c table.Client, prefix string) error { } func describeTable(ctx context.Context, c table.Client, path string) (err error) { - err = c.RetryIdempotent( + err = c.Do( ctx, func(ctx context.Context, s table.Session) (err error) { desc, err := s.DescribeTable(ctx, path)