Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Removing time window used by worker to query for exposure batches available for export. #521

Merged
merged 4 commits into from
Jun 5, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type Config struct {
TruncateWindow time.Duration `envconfig:"TRUNCATE_WINDOW" default:"1h"`
MinWindowAge time.Duration `envconfig:"MIN_WINDOW_AGE" default:"2h"`
TTL time.Duration `envconfig:"CLEANUP_TTL" default:"336h"`
// The worker considers export batches closed before the now() minus BatchCloseTimeShift minutes ago.
mgulimonov marked this conversation as resolved.
Show resolved Hide resolved
BatchCloseTimeShift time.Duration `envconfig:"BATCH_CLOSE_TIME_SHIFT" default:"5m"`
mgulimonov marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *Config) BlobstoreConfig() *storage.Config {
Expand Down
8 changes: 4 additions & 4 deletions internal/export/database/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func (db *ExportDB) AddExportBatches(ctx context.Context, batches []*model.Expor
}

// LeaseBatch returns a leased ExportBatch for the worker to process. If no work to do, nil will be returned.
func (db *ExportDB) LeaseBatch(ctx context.Context, ttl time.Duration, now time.Time) (*model.ExportBatch, error) {
func (db *ExportDB) LeaseBatch(ctx context.Context, ttl time.Duration, batchMaxCloseTime time.Time) (*model.ExportBatch, error) {
// Lookup a set of candidate batch IDs.
var openBatchIDs []int64
err := func() error { // Use a func to allow defer conn.Release() to work.
Expand All @@ -427,7 +427,7 @@ func (db *ExportDB) LeaseBatch(ctx context.Context, ttl time.Duration, now time.
AND
end_timestamp < $3
LIMIT 100
`, model.ExportBatchOpen, model.ExportBatchPending, now)
`, model.ExportBatchOpen, model.ExportBatchPending, batchMaxCloseTime)
if err != nil {
return err
}
Expand Down Expand Up @@ -475,7 +475,7 @@ func (db *ExportDB) LeaseBatch(ctx context.Context, ttl time.Duration, now time.
return err
}

if status == model.ExportBatchComplete || (expires != nil && status == model.ExportBatchPending && now.Before(*expires)) {
if status == model.ExportBatchComplete || (expires != nil && status == model.ExportBatchPending && batchMaxCloseTime.Before(*expires)) {
// Something beat us to this batch, it's no longer available.
return nil
}
Expand All @@ -487,7 +487,7 @@ func (db *ExportDB) LeaseBatch(ctx context.Context, ttl time.Duration, now time.
status = $1, lease_expires = $2
WHERE
batch_id = $3
`, model.ExportBatchPending, now.Add(ttl), bid)
`, model.ExportBatchPending, batchMaxCloseTime.Add(ttl), bid)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/export/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ func (s *Server) WorkerHandler(w http.ResponseWriter, r *http.Request) {

// Only consider batches that closed a few minutes ago to allow the publish
// windows to close properly.
minutesAgo := time.Now().Add(-5 * time.Minute)
batchMaxCloseTime := time.Now().Add(-s.config.BatchCloseTimeShift * time.Second)

// Check for a batch and obtain a lease for it.
batch, err := exportDB.LeaseBatch(ctx, s.config.WorkerTimeout, minutesAgo)
batch, err := exportDB.LeaseBatch(ctx, s.config.WorkerTimeout, batchMaxCloseTime)
if err != nil {
logger.Errorf("Failed to lease batch: %v", err)
continue
Expand Down