Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Removing time window used by worker to query for exposure batches available for export. #521

Merged
merged 4 commits into from
Jun 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions internal/export/database/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func (db *ExportDB) AddExportBatches(ctx context.Context, batches []*model.Expor
}

// LeaseBatch returns a leased ExportBatch for the worker to process. If no work to do, nil will be returned.
func (db *ExportDB) LeaseBatch(ctx context.Context, ttl time.Duration, now time.Time) (*model.ExportBatch, error) {
func (db *ExportDB) LeaseBatch(ctx context.Context, ttl time.Duration, batchMaxCloseTime time.Time) (*model.ExportBatch, error) {
// Lookup a set of candidate batch IDs.
var openBatchIDs []int64
err := func() error { // Use a func to allow defer conn.Release() to work.
Expand All @@ -427,7 +427,7 @@ func (db *ExportDB) LeaseBatch(ctx context.Context, ttl time.Duration, now time.
AND
end_timestamp < $3
LIMIT 100
`, model.ExportBatchOpen, model.ExportBatchPending, now)
`, model.ExportBatchOpen, model.ExportBatchPending, batchMaxCloseTime)
if err != nil {
return err
}
Expand Down Expand Up @@ -475,7 +475,7 @@ func (db *ExportDB) LeaseBatch(ctx context.Context, ttl time.Duration, now time.
return err
}

if status == model.ExportBatchComplete || (expires != nil && status == model.ExportBatchPending && now.Before(*expires)) {
if status == model.ExportBatchComplete || (expires != nil && status == model.ExportBatchPending && batchMaxCloseTime.Before(*expires)) {
// Something beat us to this batch, it's no longer available.
return nil
}
Expand All @@ -487,7 +487,7 @@ func (db *ExportDB) LeaseBatch(ctx context.Context, ttl time.Duration, now time.
status = $1, lease_expires = $2
WHERE
batch_id = $3
`, model.ExportBatchPending, now.Add(ttl), bid)
`, model.ExportBatchPending, batchMaxCloseTime.Add(ttl), bid)
if err != nil {
return err
}
Expand Down
6 changes: 1 addition & 5 deletions internal/export/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,8 @@ func (s *Server) WorkerHandler(w http.ResponseWriter, r *http.Request) {
return
}

// Only consider batches that closed a few minutes ago to allow the publish
// windows to close properly.
minutesAgo := time.Now().Add(-5 * time.Minute)

// Check for a batch and obtain a lease for it.
batch, err := exportDB.LeaseBatch(ctx, s.config.WorkerTimeout, minutesAgo)
batch, err := exportDB.LeaseBatch(ctx, s.config.WorkerTimeout, time.Now())
if err != nil {
logger.Errorf("Failed to lease batch: %v", err)
continue
Expand Down