Skip to content

Commit

Permalink
Merge pull request #116 from ydb-platform/race-on-close
Browse files Browse the repository at this point in the history
v3.9.4: Fixed data race on closing session pool
  • Loading branch information
asmyasnikov authored Feb 11, 2022
2 parents 4a6ed70 + 599adfd commit 63089e5
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 26 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.9.4
* Fixed data race on closing session pool

## 3.9.3
* Fixed busy loop on call internal logger with external logger implementation of `log.Logger`

Expand Down
2 changes: 1 addition & 1 deletion internal/meta/version.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package meta

const (
Version = "ydb-go-sdk/3.9.3"
Version = "ydb-go-sdk/3.9.4"
)
75 changes: 50 additions & 25 deletions internal/table/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ func (c *client) createSession(ctx context.Context) (Session, error) {
return
}
delete(c.index, r.s)

if c.closed {
return
}

c.notify(nil)
if info.idle != nil {
panic("ydb: table: session closed while still in idle client")
Expand Down Expand Up @@ -354,6 +359,7 @@ func (c *client) Put(ctx context.Context, s Session) (err error) {
}()

c.mu.Lock()

switch {
case c.closed:
err = ErrSessionPoolClosed
Expand All @@ -369,10 +375,18 @@ func (c *client) Put(ctx context.Context, s Session) (err error) {
c.pushIdle(s, timeutil.Now())
}
}

c.mu.Unlock()

if err != nil {
_ = c.closeSession(ctx, s)
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(
deadline.ContextWithoutDeadline(ctx),
c.config.DeleteTimeout(),
)
defer cancel()

_ = s.Close(ctx)
}

return err
Expand Down Expand Up @@ -482,40 +496,46 @@ func (c *client) Close(ctx context.Context) (err error) {
}

c.mu.Lock()
c.closed = true

keeperDone := c.keeperDone
if ch := c.keeperStop; ch != nil {
close(ch)
}
c.mu.Unlock()

if keeperDone != nil {
<-keeperDone
}

c.mu.Lock()
idle := c.idle
waitq := c.waitq
for el := c.waitq.Front(); el != nil; el = el.Next() {
ch := el.Value.(*chan Session)
close(*ch)
}

issues := make([]error, 0, len(c.index))

for e := c.idle.Front(); e != nil; e = e.Next() {
if err = c.closeSession(ctx, e.Value.(Session)); err != nil {
issues = append(issues, err)
}
}

c.limit = 0
c.idle = list.New()
c.waitq = list.New()
c.index = make(map[Session]sessionInfo)
c.closed = true

c.mu.Unlock()

for el := waitq.Front(); el != nil; el = el.Next() {
ch := el.Value.(*chan Session)
close(*ch)
}
for e := idle.Front(); e != nil; e = e.Next() {
s := e.Value.(Session)
func() {
closeCtx, cancel := context.WithTimeout(deadline.ContextWithoutDeadline(ctx), c.config.DeleteTimeout())
_ = s.Close(closeCtx)
cancel()
}()
}
c.wgClosed.Wait()

if len(issues) > 0 {
return errors.NewWithIssues(
"table client closed with issues",
issues...,
)
}

return nil
}

Expand Down Expand Up @@ -835,25 +855,30 @@ func (c *client) notify(s Session) (notified bool) {
func (c *client) CloseSession(ctx context.Context, s Session) error {
onDone := trace.TableOnPoolSessionClose(c.config.Trace().Compose(trace.ContextTable(ctx)), &ctx, s)
defer onDone()

c.mu.Lock()
defer c.mu.Unlock()

return c.closeSession(ctx, s)
}

// closeSession is an async func which close session, but without `trace.OnPoolSessionClose` tracing
// c.mu must be locked
func (c *client) closeSession(ctx context.Context, s Session) error {
c.wgClosed.Add(1)
fn := func() {

go func() {
defer c.wgClosed.Done()

closeCtx, cancel := context.WithTimeout(
deadline.ContextWithoutDeadline(ctx),
c.config.DeleteTimeout(),
)
defer cancel()

_ = s.Close(closeCtx)
}
if c.isClosed() {
fn()
} else {
go fn()
}
}()

return nil
}

Expand Down
57 changes: 57 additions & 0 deletions internal/table/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/ydb-platform/ydb-go-sdk/v3/internal/errors"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/rand"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/config"
"github.com/ydb-platform/ydb-go-sdk/v3/testutil"
"github.com/ydb-platform/ydb-go-sdk/v3/testutil/timeutil"
Expand Down Expand Up @@ -322,6 +323,62 @@ func TestSessionPoolClose(t *testing.T) {
}
}

func TestRaceWgClosed(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 55*time.Second)
defer cancel()

defer func() {
if e := recover(); e != nil {
t.Fatal(e)
}
}()

limit := 100

for {
select {
case <-ctx.Done():
return
default:
t.Log("start")
wg := sync.WaitGroup{}
p := newClientWithStubBuilder(
t,
testutil.NewDB(testutil.WithInvokeHandlers(testutil.InvokeHandlers{
// nolint:unparam
testutil.TableCreateSession: func(interface{}) (proto.Message, error) {
return &Ydb_Table.CreateSessionResult{
SessionId: testutil.SessionID(),
}, nil
},
})),
limit,
config.WithSizeLimit(limit),
)
for j := 0; j < limit*10; j++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
err := p.Do(
ctx,
func(ctx context.Context, s table.Session) error {
return nil
},
)
if errors.Is(err, ErrSessionPoolClosed) {
return
}
}
}()
}
_ = p.Close(context.Background())
wg.Wait()
t.Log("done")
}
}
}

func TestSessionPoolDeleteReleaseWait(t *testing.T) {
for _, test := range []struct {
name string
Expand Down

0 comments on commit 63089e5

Please sign in to comment.