diff --git a/cmd/export/main.go b/cmd/export/main.go index ac43c1b8a..c9d823614 100644 --- a/cmd/export/main.go +++ b/cmd/export/main.go @@ -18,7 +18,6 @@ package main import ( "context" "fmt" - "net/http" "github.com/google/exposure-notifications-server/internal/export" "github.com/google/exposure-notifications-server/internal/interrupt" @@ -53,15 +52,11 @@ func realMain(ctx context.Context) error { return fmt.Errorf("export.NewServer: %w", err) } - mux := http.NewServeMux() - mux.HandleFunc("/create-batches", batchServer.CreateBatchesHandler) - mux.HandleFunc("/do-work", batchServer.WorkerHandler) - srv, err := server.New(config.Port) if err != nil { return fmt.Errorf("server.New: %w", err) } logger.Infof("listening on :%s", config.Port) - return srv.ServeHTTPHandler(ctx, mux) + return srv.ServeHTTPHandler(ctx, batchServer.Routes(ctx)) } diff --git a/internal/database/config.go b/internal/database/config.go index 4159745f1..46282c1c6 100644 --- a/internal/database/config.go +++ b/internal/database/config.go @@ -19,21 +19,21 @@ import ( ) type Config struct { - Name string `env:"DB_NAME"` - User string `env:"DB_USER"` - Host string `env:"DB_HOST, default=localhost"` - Port string `env:"DB_PORT, default=5432"` - SSLMode string `env:"DB_SSLMODE, default=require"` - ConnectionTimeout int `env:"DB_CONNECT_TIMEOUT"` + Name string `env:"DB_NAME" json:",omitempty"` + User string `env:"DB_USER" json:",omitempty"` + Host string `env:"DB_HOST, default=localhost" json:",omitempty"` + Port string `env:"DB_PORT, default=5432" json:",omitempty"` + SSLMode string `env:"DB_SSLMODE, default=require" json:",omitempty"` + ConnectionTimeout int `env:"DB_CONNECT_TIMEOUT" json:",omitempty"` Password string `env:"DB_PASSWORD" json:"-"` // ignored by zap's JSON formatter - SSLCertPath string `env:"DB_SSLCERT"` - SSLKeyPath string `env:"DB_SSLKEY"` - SSLRootCertPath string `env:"DB_SSLROOTCERT"` - PoolMinConnections string `env:"DB_POOL_MIN_CONNS"` - PoolMaxConnections string `env:"DB_POOL_MAX_CONNS"` - PoolMaxConnLife time.Duration `env:"DB_POOL_MAX_CONN_LIFETIME"` - PoolMaxConnIdle time.Duration `env:"DB_POOL_MAX_CONN_IDLE_TIME"` - PoolHealthCheck time.Duration `env:"DB_POOL_HEALTH_CHECK_PERIOD"` + SSLCertPath string `env:"DB_SSLCERT" json:",omitempty"` + SSLKeyPath string `env:"DB_SSLKEY" json:",omitempty"` + SSLRootCertPath string `env:"DB_SSLROOTCERT" json:",omitempty"` + PoolMinConnections string `env:"DB_POOL_MIN_CONNS" json:",omitempty"` + PoolMaxConnections string `env:"DB_POOL_MAX_CONNS" json:",omitempty"` + PoolMaxConnLife time.Duration `env:"DB_POOL_MAX_CONN_LIFETIME" json:",omitempty"` + PoolMaxConnIdle time.Duration `env:"DB_POOL_MAX_CONN_IDLE_TIME" json:",omitempty"` + PoolHealthCheck time.Duration `env:"DB_POOL_HEALTH_CHECK_PERIOD" json:",omitempty"` } func (c *Config) DatabaseConfig() *Config { diff --git a/internal/export/batcher.go b/internal/export/batcher.go index ec02caa42..3247e96b3 100644 --- a/internal/export/batcher.go +++ b/internal/export/batcher.go @@ -22,84 +22,89 @@ import ( "time" coredb "github.com/google/exposure-notifications-server/internal/database" - "github.com/google/exposure-notifications-server/internal/export/database" + exportdatabase "github.com/google/exposure-notifications-server/internal/export/database" "github.com/google/exposure-notifications-server/internal/export/model" publishmodel "github.com/google/exposure-notifications-server/internal/publish/model" "github.com/google/exposure-notifications-server/internal/logging" ) -// CreateBatchesHandler is a handler to iterate the rows of ExportConfig and +// handleCreateBatches is a handler to iterate the rows of ExportConfig and // create entries in ExportBatchJob as appropriate. -func (s *Server) CreateBatchesHandler(w http.ResponseWriter, r *http.Request) { - ctx, cancel := context.WithTimeout(r.Context(), s.config.CreateTimeout) - defer cancel() +func (s *Server) handleCreateBatches(ctx context.Context) http.HandlerFunc { logger := logging.FromContext(ctx) metrics := s.env.MetricsExporter(ctx) - - // Obtain lock to make sure there are no other processes working to create batches. - lock := "create_batches" - unlockFn, err := s.db.Lock(ctx, lock, s.config.CreateTimeout) - if err != nil { - if errors.Is(err, coredb.ErrAlreadyLocked) { - metrics.WriteInt("export-batcher-lock-contention", true, 1) - msg := fmt.Sprintf("Lock %s already in use, no work will be performed", lock) - logger.Infof(msg) - fmt.Fprint(w, msg) // We return status 200 here so that Cloud Scheduler does not retry. + db := s.env.Database() + + return func(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), s.config.CreateTimeout) + defer cancel() + + // Obtain lock to make sure there are no other processes working to create batches. + lock := "create_batches" + unlockFn, err := db.Lock(ctx, lock, s.config.CreateTimeout) + if err != nil { + if errors.Is(err, coredb.ErrAlreadyLocked) { + metrics.WriteInt("export-batcher-lock-contention", true, 1) + msg := fmt.Sprintf("Lock %s already in use, no work will be performed", lock) + logger.Infof(msg) + fmt.Fprint(w, msg) // We return status 200 here so that Cloud Scheduler does not retry. + return + } + logger.Errorf("Could not acquire lock %s: %v", lock, err) + http.Error(w, fmt.Sprintf("Could not acquire lock %s, check logs.", lock), http.StatusInternalServerError) return } - logger.Errorf("Could not acquire lock %s: %v", lock, err) - http.Error(w, fmt.Sprintf("Could not acquire lock %s, check logs.", lock), http.StatusInternalServerError) - return - } - defer func() { - if err := unlockFn(); err != nil { - logger.Errorf("failed to unlock: %v", err) - } - }() - - totalConfigs := 0 - totalBatches := 0 - totalConfigsWithBatches := 0 - defer func() { - logger.Infof("Processed %d configs creating %d batches across %d configs", totalConfigs, totalBatches, totalConfigsWithBatches) - }() - - effectiveTime := time.Now().Add(-1 * s.config.MinWindowAge) - err = database.New(s.db).IterateExportConfigs(ctx, effectiveTime, func(ec *model.ExportConfig) error { - totalConfigs++ - if batchesCreated, err := s.maybeCreateBatches(ctx, ec, effectiveTime); err != nil { - logger.Errorf("Failed to create batches for config %d: %v, continuing to next config", ec.ConfigID, err) - } else { - totalBatches += batchesCreated - if batchesCreated > 0 { - totalConfigsWithBatches++ + defer func() { + if err := unlockFn(); err != nil { + logger.Errorf("failed to unlock: %v", err) + } + }() + + totalConfigs := 0 + totalBatches := 0 + totalConfigsWithBatches := 0 + defer func() { + logger.Infof("Processed %d configs creating %d batches across %d configs", totalConfigs, totalBatches, totalConfigsWithBatches) + }() + + effectiveTime := time.Now().Add(-1 * s.config.MinWindowAge) + err = exportdatabase.New(db).IterateExportConfigs(ctx, effectiveTime, func(ec *model.ExportConfig) error { + totalConfigs++ + if batchesCreated, err := s.maybeCreateBatches(ctx, ec, effectiveTime); err != nil { + logger.Errorf("Failed to create batches for config %d: %v, continuing to next config", ec.ConfigID, err) + } else { + totalBatches += batchesCreated + if batchesCreated > 0 { + totalConfigsWithBatches++ + } } + return nil + }) + if err != nil { + // some specific error handling below, but just need one metric. + metrics.WriteInt("export-batcher-failed", true, 1) + } + switch { + case err == nil: + return + case errors.Is(err, context.DeadlineExceeded): + logger.Infof("Timed out creating batches, batch creation will continue on next invocation") + case errors.Is(err, context.Canceled): + logger.Infof("Canceled while creating batches, batch creation will continue on next invocation") + default: + logger.Errorf("creating batches: %v", err) + http.Error(w, "Failed to create batches, check logs.", http.StatusInternalServerError) } - return nil - }) - if err != nil { - // some specific error handling below, but just need one metric. - metrics.WriteInt("export-batcher-failed", true, 1) - } - switch { - case err == nil: - return - case errors.Is(err, context.DeadlineExceeded): - logger.Infof("Timed out creating batches, batch creation will continue on next invocation") - case errors.Is(err, context.Canceled): - logger.Infof("Canceled while creating batches, batch creation will continue on next invocation") - default: - logger.Errorf("creating batches: %v", err) - http.Error(w, "Failed to create batches, check logs.", http.StatusInternalServerError) } } func (s *Server) maybeCreateBatches(ctx context.Context, ec *model.ExportConfig, now time.Time) (int, error) { logger := logging.FromContext(ctx) metrics := s.env.MetricsExporter(ctx) + db := s.env.Database() - latestEnd, err := database.New(s.db).LatestExportBatchEnd(ctx, ec) + latestEnd, err := exportdatabase.New(db).LatestExportBatchEnd(ctx, ec) if err != nil { return 0, fmt.Errorf("fetching most recent batch for config %d: %w", ec.ConfigID, err) } @@ -128,7 +133,7 @@ func (s *Server) maybeCreateBatches(ctx context.Context, ec *model.ExportConfig, }) } - if err := database.New(s.db).AddExportBatches(ctx, batches); err != nil { + if err := exportdatabase.New(db).AddExportBatches(ctx, batches); err != nil { return 0, fmt.Errorf("creating export batches for config %d: %w", ec.ConfigID, err) } diff --git a/internal/export/database/export.go b/internal/export/database/export.go index 527b647c0..a3cfab088 100644 --- a/internal/export/database/export.go +++ b/internal/export/database/export.go @@ -125,7 +125,7 @@ func (db *ExportDB) GetAllExportConfigs(ctx context.Context) ([]*model.ExportCon } defer rows.Close() - results := []*model.ExportConfig{} + var results []*model.ExportConfig for rows.Next() { ec, err := scanOneExportConfig(rows) if err != nil { diff --git a/internal/export/server.go b/internal/export/server.go index e759787fd..2fb35b06a 100644 --- a/internal/export/server.go +++ b/internal/export/server.go @@ -15,11 +15,9 @@ package export import ( + "context" "fmt" - - coredb "github.com/google/exposure-notifications-server/internal/database" - "github.com/google/exposure-notifications-server/internal/export/database" - publishdb "github.com/google/exposure-notifications-server/internal/publish/database" + "net/http" "github.com/google/exposure-notifications-server/internal/serverenv" ) @@ -30,6 +28,9 @@ func NewServer(config *Config, env *serverenv.ServerEnv) (*Server, error) { if env.Blobstore() == nil { return nil, fmt.Errorf("export.NewBatchServer requires Blobstore present in the ServerEnv") } + if env.Database() == nil { + return nil, fmt.Errorf("export.NewBatchServer requires Database present in the ServerEnv") + } if env.KeyManager() == nil { return nil, fmt.Errorf("export.NewBatchServer requires KeyManager present in the ServerEnv") } @@ -38,19 +39,23 @@ func NewServer(config *Config, env *serverenv.ServerEnv) (*Server, error) { } return &Server{ - db: env.Database(), - exportdb: database.New(env.Database()), - publishdb: publishdb.New(env.Database()), - config: config, - env: env, + config: config, + env: env, }, nil } // Server hosts end points to manage export batches. type Server struct { - db *coredb.DB - exportdb *database.ExportDB - publishdb *publishdb.PublishDB - config *Config - env *serverenv.ServerEnv + config *Config + env *serverenv.ServerEnv +} + +// Routes defines and returns the routes for this server. +func (s *Server) Routes(ctx context.Context) *http.ServeMux { + mux := http.NewServeMux() + + mux.HandleFunc("/create-batches", s.handleCreateBatches(ctx)) + mux.HandleFunc("/do-work", s.handleDoWork(ctx)) + + return mux } diff --git a/internal/export/server_test.go b/internal/export/server_test.go index 63c74ec48..eb6649368 100644 --- a/internal/export/server_test.go +++ b/internal/export/server_test.go @@ -42,15 +42,29 @@ func TestNewServer(t *testing.T) { env: serverenv.New(ctx), err: fmt.Errorf("export.NewBatchServer requires Blobstore present in the ServerEnv"), }, + { + name: "nil Database", + env: serverenv.New(ctx, + serverenv.WithBlobStorage(emptyStorage), + ), + err: fmt.Errorf("export.NewBatchServer requires Database present in the ServerEnv"), + }, { name: "nil KeyManager", - env: serverenv.New(ctx, serverenv.WithBlobStorage(emptyStorage)), - err: fmt.Errorf("export.NewBatchServer requires KeyManager present in the ServerEnv"), + env: serverenv.New(ctx, + serverenv.WithBlobStorage(emptyStorage), + serverenv.WithDatabase(emptyDB), + ), + err: fmt.Errorf("export.NewBatchServer requires KeyManager present in the ServerEnv"), }, { name: "Fully Specified", - env: serverenv.New(ctx, serverenv.WithBlobStorage(emptyStorage), serverenv.WithKeyManager(emptyKMS), serverenv.WithDatabase(emptyDB)), - err: nil, + env: serverenv.New(ctx, + serverenv.WithBlobStorage(emptyStorage), + serverenv.WithDatabase(emptyDB), + serverenv.WithKeyManager(emptyKMS), + ), + err: nil, }, } diff --git a/internal/export/worker.go b/internal/export/worker.go index 0bb5d2878..fe7dcd6ce 100644 --- a/internal/export/worker.go +++ b/internal/export/worker.go @@ -26,8 +26,8 @@ import ( "time" coredb "github.com/google/exposure-notifications-server/internal/database" - "github.com/google/exposure-notifications-server/internal/export/database" - publishdb "github.com/google/exposure-notifications-server/internal/publish/database" + exportdatabase "github.com/google/exposure-notifications-server/internal/export/database" + publishdatabase "github.com/google/exposure-notifications-server/internal/publish/database" "github.com/google/exposure-notifications-server/internal/export/model" publishmodel "github.com/google/exposure-notifications-server/internal/publish/model" @@ -43,55 +43,60 @@ const ( blobOperationTimeout = 50 * time.Second ) -// WorkerHandler is a handler to iterate the rows of ExportBatch, and creates +// handleDoWork is a handler to iterate the rows of ExportBatch, and creates // export files. -func (s *Server) WorkerHandler(w http.ResponseWriter, r *http.Request) { - ctx, cancel := context.WithTimeout(r.Context(), s.config.WorkerTimeout) - defer cancel() +func (s *Server) handleDoWork(ctx context.Context) http.HandlerFunc { logger := logging.FromContext(ctx) - exportDB := database.New(s.db) + db := s.env.Database() + + return func(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), s.config.WorkerTimeout) + defer cancel() + + emitIndexForEmptyBatch := true + for { + if ctx.Err() != nil { + msg := "Timed out processing batches. Will continue on next invocation." + logger.Info(msg) + fmt.Fprintln(w, msg) + return + } - emitIndexForEmptyBatch := true - for { - if ctx.Err() != nil { - msg := "Timed out processing batches. Will continue on next invocation." - logger.Info(msg) - fmt.Fprintln(w, msg) - return - } + // Check for a batch and obtain a lease for it. + batch, err := exportdatabase.New(db).LeaseBatch(ctx, s.config.WorkerTimeout, time.Now()) + if err != nil { + logger.Errorf("Failed to lease batch: %v", err) + continue + } + if batch == nil { + msg := "No more work to do" + logger.Info(msg) + fmt.Fprintln(w, msg) + return + } - // Check for a batch and obtain a lease for it. - batch, err := exportDB.LeaseBatch(ctx, s.config.WorkerTimeout, time.Now()) - if err != nil { - logger.Errorf("Failed to lease batch: %v", err) - continue - } - if batch == nil { - msg := "No more work to do" - logger.Info(msg) - fmt.Fprintln(w, msg) - return - } + if err = s.exportBatch(ctx, batch, emitIndexForEmptyBatch); err != nil { + logger.Errorf("Failed to create files for batch: %v.", err) + continue + } + // We re-write the index file for empty batches for self-healing so that the + // index file reflects the ExportFile table in database. However, if a + // single worker processes a number of empty batches quickly, we want to + // avoid writing the same file repeatedly and hitting a rate limit. + emitIndexForEmptyBatch = false - if err = s.exportBatch(ctx, batch, emitIndexForEmptyBatch); err != nil { - logger.Errorf("Failed to create files for batch: %v.", err) - continue + fmt.Fprintf(w, "Batch %d marked completed. \n", batch.BatchID) } - // We re-write the index file for empty batches for self-healing so that the - // index file reflects the ExportFile table in database. However, if a - // single worker processes a number of empty batches quickly, we want to - // avoid writing the same file repeatedly and hitting a rate limit. - emitIndexForEmptyBatch = false - - fmt.Fprintf(w, "Batch %d marked completed. \n", batch.BatchID) } } func (s *Server) exportBatch(ctx context.Context, eb *model.ExportBatch, emitIndexForEmptyBatch bool) error { logger := logging.FromContext(ctx) + db := s.env.Database() + logger.Infof("Processing export batch %d (root: %q, region: %s), max records per file %d", eb.BatchID, eb.FilenameRoot, eb.OutputRegion, s.config.MaxRecords) - criteria := publishdb.IterateExposuresCriteria{ + criteria := publishdatabase.IterateExposuresCriteria{ SinceTimestamp: eb.StartTimestamp, UntilTimestamp: eb.EndTimestamp, IncludeRegions: eb.EffectiveInputRegions(), @@ -105,7 +110,7 @@ func (s *Server) exportBatch(ctx context.Context, eb *model.ExportBatch, emitInd var groups [][]*publishmodel.Exposure var exposures []*publishmodel.Exposure - _, err := s.publishdb.IterateExposures(ctx, criteria, func(exp *publishmodel.Exposure) error { + _, err := publishdatabase.New(db).IterateExposures(ctx, criteria, func(exp *publishmodel.Exposure) error { exposures = append(exposures, exp) if len(exposures) == s.config.MaxRecords { groups = append(groups, exposures) @@ -132,7 +137,7 @@ func (s *Server) exportBatch(ctx context.Context, eb *model.ExportBatch, emitInd } // Load the non-expired signature infos associated with this export batch. - sigInfos, err := s.exportdb.LookupSignatureInfos(ctx, eb.SignatureInfoIDs, time.Now()) + sigInfos, err := exportdatabase.New(db).LookupSignatureInfos(ctx, eb.SignatureInfoIDs, time.Now()) if err != nil { return fmt.Errorf("error loading signature info for batch %d, %w", eb.BatchID, err) } @@ -171,7 +176,7 @@ func (s *Server) exportBatch(ctx context.Context, eb *model.ExportBatch, emitInd } // Write the files records in database and complete the batch. - if err := s.exportdb.FinalizeBatch(ctx, eb, objectNames, batchSize); err != nil { + if err := exportdatabase.New(db).FinalizeBatch(ctx, eb, objectNames, batchSize); err != nil { return fmt.Errorf("completing batch: %w", err) } logger.Infof("Batch %d completed", eb.BatchID) @@ -219,6 +224,7 @@ func (s *Server) createFile(ctx context.Context, cfi createFileInfo) (string, er // We use a lock to make them line up after one another. func (s *Server) retryingCreateIndex(ctx context.Context, eb *model.ExportBatch, objectNames []string) error { logger := logging.FromContext(ctx) + db := s.env.Database() lockID := fmt.Sprintf("export-batch-%d", eb.BatchID) sleep := 10 * time.Second @@ -228,7 +234,7 @@ func (s *Server) retryingCreateIndex(ctx context.Context, eb *model.ExportBatch, return nil } - unlock, err := s.db.Lock(ctx, lockID, time.Minute) + unlock, err := db.Lock(ctx, lockID, time.Minute) if err != nil { if errors.Is(err, coredb.ErrAlreadyLocked) { logger.Debugf("Lock %s is locked; sleeping %v and will try again", lockID, sleep) @@ -255,8 +261,9 @@ func (s *Server) retryingCreateIndex(ctx context.Context, eb *model.ExportBatch, } func (s *Server) createIndex(ctx context.Context, eb *model.ExportBatch, newObjectNames []string) (string, int, error) { - exportDB := database.New(s.db) - objects, err := exportDB.LookupExportFiles(ctx, s.config.TTL) + db := s.env.Database() + + objects, err := exportdatabase.New(db).LookupExportFiles(ctx, s.config.TTL) if err != nil { return "", 0, fmt.Errorf("lookup available export files: %w", err) } diff --git a/internal/integration/integration_test.go b/internal/integration/integration_test.go index d54997fb7..f4d57382e 100644 --- a/internal/integration/integration_test.go +++ b/internal/integration/integration_test.go @@ -112,8 +112,7 @@ func testServer(tb testing.TB) (*serverenv.ServerEnv, *http.Client) { if err != nil { tb.Fatal(err) } - mux.HandleFunc("/export/create-batches", exportServer.CreateBatchesHandler) - mux.HandleFunc("/export/do-work", exportServer.WorkerHandler) + mux.Handle("/export/", http.StripPrefix("/export", exportServer.Routes(ctx))) // Federation federationInConfig := &federationin.Config{