Skip to content

Commit

Permalink
fix: call cancel() as soon as possible to release the resources assoc…
Browse files Browse the repository at this point in the history
…iatd with context (#691)
  • Loading branch information
fengberlin authored Jul 6, 2021
1 parent 2bdc662 commit 8612bf7
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 8 deletions.
12 changes: 9 additions & 3 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,9 @@ func (c *rmqClient) InvokeSync(ctx context.Context, addr string, request *remote
if c.close {
return nil, ErrServiceState
}
ctx, _ = context.WithTimeout(ctx, timeoutMillis)
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeoutMillis)
defer cancel()
return c.remoteClient.InvokeSync(ctx, addr, request)
}

Expand Down Expand Up @@ -524,14 +526,16 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
}
cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode())

ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
response, err := c.remoteClient.InvokeSync(ctx, addr, cmd)
if err != nil {
cancel()
rlog.Warning("send heart beat to broker error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return true
}
cancel()
if response.Code == ResSuccess {
c.namesrvs.AddBrokerVersion(brokerName, addr, int32(response.Version))
rlog.Debug("send heart beat to broker success", map[string]interface{}{
Expand Down Expand Up @@ -633,7 +637,9 @@ func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC
// PullMessage with sync
func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader) (*primitive.PullResult, error) {
cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
ctx, _ = context.WithTimeout(ctx, 30*time.Second)
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 30*time.Second)
defer cancel()
res, err := c.remoteClient.InvokeSync(ctx, brokerAddrs, cmd)
if err != nil {
return nil, err
Expand Down
6 changes: 4 additions & 2 deletions internal/remote/remote_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func TestResponseFutureTimeout(t *testing.T) {
}

func TestResponseFutureWaitResponse(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(1000))
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(1000))
defer cancel()
future := NewResponseFuture(ctx, 10, nil)
if _, err := future.waitResponse(); err != utils.ErrRequestTimeout {
t.Errorf("wrong ResponseFuture waitResponse. want=%v, got=%v",
Expand Down Expand Up @@ -289,7 +290,8 @@ func TestInvokeAsyncTimeout(t *testing.T) {
clientSend.Add(1)
go func() {
clientSend.Wait()
ctx, _ := context.WithTimeout(context.Background(), time.Duration(10*time.Second))
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(10*time.Second))
defer cancel()
err := client.InvokeAsync(ctx, addr, clientSendRemtingCommand,
func(r *ResponseFuture) {
assert.NotNil(t, r.Err)
Expand Down
4 changes: 3 additions & 1 deletion internal/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,12 +371,14 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData,

for i := 0; i < s.Size(); i++ {
rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
ctx, _ := context.WithTimeout(context.Background(), requestTimeout)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
response, err = s.nameSrvClient.InvokeSync(ctx, s.getNameServerAddress(), rc)

if err == nil {
cancel()
break
}
cancel()
}
if err != nil {
rlog.Error("connect to namesrv failed.", map[string]interface{}{
Expand Down
3 changes: 2 additions & 1 deletion internal/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,8 @@ func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, dat
}

var req = td.buildSendRequest(mq, msg)
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) {
resp := primitive.NewSendResult()
if e != nil {
Expand Down
4 changes: 3 additions & 1 deletion producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message,
return errors.Errorf("topic=%s route info not found", mq.Topic)
}

ctx, _ = context.WithTimeout(ctx, 3*time.Second)
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 3*time.Second)
defer cancel()
return p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) {
resp := primitive.NewSendResult()
if err != nil {
Expand Down

0 comments on commit 8612bf7

Please sign in to comment.