Skip to content

Commit

Permalink
Preserve some time to return part of the result if the context is tim…
Browse files Browse the repository at this point in the history
…ing out (#4265)

* Preserve some time to return part of the result if the context is timing out
  • Loading branch information
yux0 authored Jun 10, 2021
1 parent 0f93d57 commit 2b68481
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package history

import (
"context"
"math"
"sync"
"time"

Expand Down Expand Up @@ -821,16 +822,24 @@ func (c *clientImpl) GetReplicationMessages(
req.Tokens = append(req.Tokens, token)
}

// preserve 5% timeout to return partial of the result if context is timing out
now := time.Now()
deadline, ok := ctx.Deadline()
if !ok {
deadline = now.Add(c.timeout)
}
requestTimeout := time.Duration(math.Ceil(float64(deadline.Sub(now)) * 0.95))
requestContext, cancel := context.WithTimeout(ctx, requestTimeout)
defer cancel()

var wg sync.WaitGroup
wg.Add(len(requestsByClient))
respChan := make(chan *types.GetReplicationMessagesResponse, len(requestsByClient))
errChan := make(chan error, 1)

for client, req := range requestsByClient {
go func(client Client, request *types.GetReplicationMessagesRequest) {
go func(ctx context.Context, client Client, request *types.GetReplicationMessagesRequest) {
defer wg.Done()

ctx, cancel := c.createContext(ctx)
defer cancel()
resp, err := client.GetReplicationMessages(ctx, request, opts...)
if err != nil {
c.logger.Warn("Failed to get replication tasks from client", tag.Error(err))
Expand All @@ -844,7 +853,7 @@ func (c *clientImpl) GetReplicationMessages(
return
}
respChan <- resp
}(client, req)
}(requestContext, client, req)
}

wg.Wait()
Expand Down

0 comments on commit 2b68481

Please sign in to comment.