From 05c92e72256e0cc23ec5af644f748ab483d59dc9 Mon Sep 17 00:00:00 2001 From: asmyasnikov <79263256394@ya.ru> Date: Fri, 11 Feb 2022 04:35:00 +0300 Subject: [PATCH 1/2] Add TestRaceWgClosed --- CHANGELOG.md | 3 ++ internal/meta/version.go | 2 +- internal/table/client_test.go | 57 +++++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ed3c9a7b..eefa70962 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 3.9.4 +* Fixed data race on closing session pool + ## 3.9.3 * Fixed busy loop on call internal logger with external logger implementation of `log.Logger` diff --git a/internal/meta/version.go b/internal/meta/version.go index f66c782a5..68f7fdcde 100644 --- a/internal/meta/version.go +++ b/internal/meta/version.go @@ -1,5 +1,5 @@ package meta const ( - Version = "ydb-go-sdk/3.9.3" + Version = "ydb-go-sdk/3.9.4" ) diff --git a/internal/table/client_test.go b/internal/table/client_test.go index 45e89375c..9f97f9875 100644 --- a/internal/table/client_test.go +++ b/internal/table/client_test.go @@ -17,6 +17,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/errors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/rand" + "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/config" "github.com/ydb-platform/ydb-go-sdk/v3/testutil" "github.com/ydb-platform/ydb-go-sdk/v3/testutil/timeutil" @@ -322,6 +323,62 @@ func TestSessionPoolClose(t *testing.T) { } } +func TestRaceWgClosed(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 55*time.Second) + defer cancel() + + defer func() { + if e := recover(); e != nil { + t.Fatal(e) + } + }() + + limit := 100 + + for { + select { + case <-ctx.Done(): + return + default: + t.Log("start") + wg := sync.WaitGroup{} + p := newClientWithStubBuilder( + t, + testutil.NewDB(testutil.WithInvokeHandlers(testutil.InvokeHandlers{ + // nolint:unparam + testutil.TableCreateSession: func(interface{}) (proto.Message, error) { + return &Ydb_Table.CreateSessionResult{ + SessionId: testutil.SessionID(), + }, nil + }, + })), + limit, + config.WithSizeLimit(limit), + ) + for j := 0; j < limit*10; j++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + err := p.Do( + ctx, + func(ctx context.Context, s table.Session) error { + return nil + }, + ) + if errors.Is(err, ErrSessionPoolClosed) { + return + } + } + }() + } + _ = p.Close(context.Background()) + wg.Wait() + t.Log("done") + } + } +} + func TestSessionPoolDeleteReleaseWait(t *testing.T) { for _, test := range []struct { name string From 599adfd0c803d2f86b5d6ae43fdd99e19cb1658e Mon Sep 17 00:00:00 2001 From: asmyasnikov <79263256394@ya.ru> Date: Fri, 11 Feb 2022 04:36:13 +0300 Subject: [PATCH 2/2] Fixed data race on closing session pool --- internal/table/client.go | 75 ++++++++++++++++++++++++++-------------- 1 file changed, 50 insertions(+), 25 deletions(-) diff --git a/internal/table/client.go b/internal/table/client.go index cd4e29b43..4b95263bf 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -209,6 +209,11 @@ func (c *client) createSession(ctx context.Context) (Session, error) { return } delete(c.index, r.s) + + if c.closed { + return + } + c.notify(nil) if info.idle != nil { panic("ydb: table: session closed while still in idle client") @@ -354,6 +359,7 @@ func (c *client) Put(ctx context.Context, s Session) (err error) { }() c.mu.Lock() + switch { case c.closed: err = ErrSessionPoolClosed @@ -369,10 +375,18 @@ func (c *client) Put(ctx context.Context, s Session) (err error) { c.pushIdle(s, timeutil.Now()) } } + c.mu.Unlock() if err != nil { - _ = c.closeSession(ctx, s) + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout( + deadline.ContextWithoutDeadline(ctx), + c.config.DeleteTimeout(), + ) + defer cancel() + + _ = s.Close(ctx) } return err @@ -482,40 +496,46 @@ func (c *client) Close(ctx context.Context) (err error) { } c.mu.Lock() - c.closed = true + keeperDone := c.keeperDone if ch := c.keeperStop; ch != nil { close(ch) } - c.mu.Unlock() if keeperDone != nil { <-keeperDone } - c.mu.Lock() - idle := c.idle - waitq := c.waitq + for el := c.waitq.Front(); el != nil; el = el.Next() { + ch := el.Value.(*chan Session) + close(*ch) + } + + issues := make([]error, 0, len(c.index)) + + for e := c.idle.Front(); e != nil; e = e.Next() { + if err = c.closeSession(ctx, e.Value.(Session)); err != nil { + issues = append(issues, err) + } + } + c.limit = 0 c.idle = list.New() c.waitq = list.New() c.index = make(map[Session]sessionInfo) + c.closed = true + c.mu.Unlock() - for el := waitq.Front(); el != nil; el = el.Next() { - ch := el.Value.(*chan Session) - close(*ch) - } - for e := idle.Front(); e != nil; e = e.Next() { - s := e.Value.(Session) - func() { - closeCtx, cancel := context.WithTimeout(deadline.ContextWithoutDeadline(ctx), c.config.DeleteTimeout()) - _ = s.Close(closeCtx) - cancel() - }() - } c.wgClosed.Wait() + if len(issues) > 0 { + return errors.NewWithIssues( + "table client closed with issues", + issues..., + ) + } + return nil } @@ -835,25 +855,30 @@ func (c *client) notify(s Session) (notified bool) { func (c *client) CloseSession(ctx context.Context, s Session) error { onDone := trace.TableOnPoolSessionClose(c.config.Trace().Compose(trace.ContextTable(ctx)), &ctx, s) defer onDone() + + c.mu.Lock() + defer c.mu.Unlock() + return c.closeSession(ctx, s) } +// closeSession is an async func which close session, but without `trace.OnPoolSessionClose` tracing +// c.mu must be locked func (c *client) closeSession(ctx context.Context, s Session) error { c.wgClosed.Add(1) - fn := func() { + + go func() { defer c.wgClosed.Done() + closeCtx, cancel := context.WithTimeout( deadline.ContextWithoutDeadline(ctx), c.config.DeleteTimeout(), ) defer cancel() + _ = s.Close(closeCtx) - } - if c.isClosed() { - fn() - } else { - go fn() - } + }() + return nil }