Skip to content

Commit

Permalink
Export handlers cleanup (google#621)
Browse files Browse the repository at this point in the history
I pulled the non-debug changes out of googleGH-609 since I'm not sure we agree
that the /debug endpoint should exist on export. However, these changes
are still worth including (especially the database check and logger
fixes).
  • Loading branch information
sethvargo authored and krazykid committed Jul 13, 2020
1 parent c426269 commit 0093608
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 144 deletions.
7 changes: 1 addition & 6 deletions cmd/export/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package main
import (
"context"
"fmt"
"net/http"

"github.com/google/exposure-notifications-server/internal/export"
"github.com/google/exposure-notifications-server/internal/interrupt"
Expand Down Expand Up @@ -53,15 +52,11 @@ func realMain(ctx context.Context) error {
return fmt.Errorf("export.NewServer: %w", err)
}

mux := http.NewServeMux()
mux.HandleFunc("/create-batches", batchServer.CreateBatchesHandler)
mux.HandleFunc("/do-work", batchServer.WorkerHandler)

srv, err := server.New(config.Port)
if err != nil {
return fmt.Errorf("server.New: %w", err)
}
logger.Infof("listening on :%s", config.Port)

return srv.ServeHTTPHandler(ctx, mux)
return srv.ServeHTTPHandler(ctx, batchServer.Routes(ctx))
}
28 changes: 14 additions & 14 deletions internal/database/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ import (
)

type Config struct {
Name string `env:"DB_NAME"`
User string `env:"DB_USER"`
Host string `env:"DB_HOST, default=localhost"`
Port string `env:"DB_PORT, default=5432"`
SSLMode string `env:"DB_SSLMODE, default=require"`
ConnectionTimeout int `env:"DB_CONNECT_TIMEOUT"`
Name string `env:"DB_NAME" json:",omitempty"`
User string `env:"DB_USER" json:",omitempty"`
Host string `env:"DB_HOST, default=localhost" json:",omitempty"`
Port string `env:"DB_PORT, default=5432" json:",omitempty"`
SSLMode string `env:"DB_SSLMODE, default=require" json:",omitempty"`
ConnectionTimeout int `env:"DB_CONNECT_TIMEOUT" json:",omitempty"`
Password string `env:"DB_PASSWORD" json:"-"` // ignored by zap's JSON formatter
SSLCertPath string `env:"DB_SSLCERT"`
SSLKeyPath string `env:"DB_SSLKEY"`
SSLRootCertPath string `env:"DB_SSLROOTCERT"`
PoolMinConnections string `env:"DB_POOL_MIN_CONNS"`
PoolMaxConnections string `env:"DB_POOL_MAX_CONNS"`
PoolMaxConnLife time.Duration `env:"DB_POOL_MAX_CONN_LIFETIME"`
PoolMaxConnIdle time.Duration `env:"DB_POOL_MAX_CONN_IDLE_TIME"`
PoolHealthCheck time.Duration `env:"DB_POOL_HEALTH_CHECK_PERIOD"`
SSLCertPath string `env:"DB_SSLCERT" json:",omitempty"`
SSLKeyPath string `env:"DB_SSLKEY" json:",omitempty"`
SSLRootCertPath string `env:"DB_SSLROOTCERT" json:",omitempty"`
PoolMinConnections string `env:"DB_POOL_MIN_CONNS" json:",omitempty"`
PoolMaxConnections string `env:"DB_POOL_MAX_CONNS" json:",omitempty"`
PoolMaxConnLife time.Duration `env:"DB_POOL_MAX_CONN_LIFETIME" json:",omitempty"`
PoolMaxConnIdle time.Duration `env:"DB_POOL_MAX_CONN_IDLE_TIME" json:",omitempty"`
PoolHealthCheck time.Duration `env:"DB_POOL_HEALTH_CHECK_PERIOD" json:",omitempty"`
}

func (c *Config) DatabaseConfig() *Config {
Expand Down
123 changes: 64 additions & 59 deletions internal/export/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,84 +22,89 @@ import (
"time"

coredb "github.com/google/exposure-notifications-server/internal/database"
"github.com/google/exposure-notifications-server/internal/export/database"
exportdatabase "github.com/google/exposure-notifications-server/internal/export/database"
"github.com/google/exposure-notifications-server/internal/export/model"
publishmodel "github.com/google/exposure-notifications-server/internal/publish/model"

"github.com/google/exposure-notifications-server/internal/logging"
)

// CreateBatchesHandler is a handler to iterate the rows of ExportConfig and
// handleCreateBatches is a handler to iterate the rows of ExportConfig and
// create entries in ExportBatchJob as appropriate.
func (s *Server) CreateBatchesHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), s.config.CreateTimeout)
defer cancel()
func (s *Server) handleCreateBatches(ctx context.Context) http.HandlerFunc {
logger := logging.FromContext(ctx)
metrics := s.env.MetricsExporter(ctx)

// Obtain lock to make sure there are no other processes working to create batches.
lock := "create_batches"
unlockFn, err := s.db.Lock(ctx, lock, s.config.CreateTimeout)
if err != nil {
if errors.Is(err, coredb.ErrAlreadyLocked) {
metrics.WriteInt("export-batcher-lock-contention", true, 1)
msg := fmt.Sprintf("Lock %s already in use, no work will be performed", lock)
logger.Infof(msg)
fmt.Fprint(w, msg) // We return status 200 here so that Cloud Scheduler does not retry.
db := s.env.Database()

return func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), s.config.CreateTimeout)
defer cancel()

// Obtain lock to make sure there are no other processes working to create batches.
lock := "create_batches"
unlockFn, err := db.Lock(ctx, lock, s.config.CreateTimeout)
if err != nil {
if errors.Is(err, coredb.ErrAlreadyLocked) {
metrics.WriteInt("export-batcher-lock-contention", true, 1)
msg := fmt.Sprintf("Lock %s already in use, no work will be performed", lock)
logger.Infof(msg)
fmt.Fprint(w, msg) // We return status 200 here so that Cloud Scheduler does not retry.
return
}
logger.Errorf("Could not acquire lock %s: %v", lock, err)
http.Error(w, fmt.Sprintf("Could not acquire lock %s, check logs.", lock), http.StatusInternalServerError)
return
}
logger.Errorf("Could not acquire lock %s: %v", lock, err)
http.Error(w, fmt.Sprintf("Could not acquire lock %s, check logs.", lock), http.StatusInternalServerError)
return
}
defer func() {
if err := unlockFn(); err != nil {
logger.Errorf("failed to unlock: %v", err)
}
}()

totalConfigs := 0
totalBatches := 0
totalConfigsWithBatches := 0
defer func() {
logger.Infof("Processed %d configs creating %d batches across %d configs", totalConfigs, totalBatches, totalConfigsWithBatches)
}()

effectiveTime := time.Now().Add(-1 * s.config.MinWindowAge)
err = database.New(s.db).IterateExportConfigs(ctx, effectiveTime, func(ec *model.ExportConfig) error {
totalConfigs++
if batchesCreated, err := s.maybeCreateBatches(ctx, ec, effectiveTime); err != nil {
logger.Errorf("Failed to create batches for config %d: %v, continuing to next config", ec.ConfigID, err)
} else {
totalBatches += batchesCreated
if batchesCreated > 0 {
totalConfigsWithBatches++
defer func() {
if err := unlockFn(); err != nil {
logger.Errorf("failed to unlock: %v", err)
}
}()

totalConfigs := 0
totalBatches := 0
totalConfigsWithBatches := 0
defer func() {
logger.Infof("Processed %d configs creating %d batches across %d configs", totalConfigs, totalBatches, totalConfigsWithBatches)
}()

effectiveTime := time.Now().Add(-1 * s.config.MinWindowAge)
err = exportdatabase.New(db).IterateExportConfigs(ctx, effectiveTime, func(ec *model.ExportConfig) error {
totalConfigs++
if batchesCreated, err := s.maybeCreateBatches(ctx, ec, effectiveTime); err != nil {
logger.Errorf("Failed to create batches for config %d: %v, continuing to next config", ec.ConfigID, err)
} else {
totalBatches += batchesCreated
if batchesCreated > 0 {
totalConfigsWithBatches++
}
}
return nil
})
if err != nil {
// some specific error handling below, but just need one metric.
metrics.WriteInt("export-batcher-failed", true, 1)
}
switch {
case err == nil:
return
case errors.Is(err, context.DeadlineExceeded):
logger.Infof("Timed out creating batches, batch creation will continue on next invocation")
case errors.Is(err, context.Canceled):
logger.Infof("Canceled while creating batches, batch creation will continue on next invocation")
default:
logger.Errorf("creating batches: %v", err)
http.Error(w, "Failed to create batches, check logs.", http.StatusInternalServerError)
}
return nil
})
if err != nil {
// some specific error handling below, but just need one metric.
metrics.WriteInt("export-batcher-failed", true, 1)
}
switch {
case err == nil:
return
case errors.Is(err, context.DeadlineExceeded):
logger.Infof("Timed out creating batches, batch creation will continue on next invocation")
case errors.Is(err, context.Canceled):
logger.Infof("Canceled while creating batches, batch creation will continue on next invocation")
default:
logger.Errorf("creating batches: %v", err)
http.Error(w, "Failed to create batches, check logs.", http.StatusInternalServerError)
}
}

func (s *Server) maybeCreateBatches(ctx context.Context, ec *model.ExportConfig, now time.Time) (int, error) {
logger := logging.FromContext(ctx)
metrics := s.env.MetricsExporter(ctx)
db := s.env.Database()

latestEnd, err := database.New(s.db).LatestExportBatchEnd(ctx, ec)
latestEnd, err := exportdatabase.New(db).LatestExportBatchEnd(ctx, ec)
if err != nil {
return 0, fmt.Errorf("fetching most recent batch for config %d: %w", ec.ConfigID, err)
}
Expand Down Expand Up @@ -128,7 +133,7 @@ func (s *Server) maybeCreateBatches(ctx context.Context, ec *model.ExportConfig,
})
}

if err := database.New(s.db).AddExportBatches(ctx, batches); err != nil {
if err := exportdatabase.New(db).AddExportBatches(ctx, batches); err != nil {
return 0, fmt.Errorf("creating export batches for config %d: %w", ec.ConfigID, err)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/export/database/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (db *ExportDB) GetAllExportConfigs(ctx context.Context) ([]*model.ExportCon
}
defer rows.Close()

results := []*model.ExportConfig{}
var results []*model.ExportConfig
for rows.Next() {
ec, err := scanOneExportConfig(rows)
if err != nil {
Expand Down
33 changes: 19 additions & 14 deletions internal/export/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
package export

import (
"context"
"fmt"

coredb "github.com/google/exposure-notifications-server/internal/database"
"github.com/google/exposure-notifications-server/internal/export/database"
publishdb "github.com/google/exposure-notifications-server/internal/publish/database"
"net/http"

"github.com/google/exposure-notifications-server/internal/serverenv"
)
Expand All @@ -30,6 +28,9 @@ func NewServer(config *Config, env *serverenv.ServerEnv) (*Server, error) {
if env.Blobstore() == nil {
return nil, fmt.Errorf("export.NewBatchServer requires Blobstore present in the ServerEnv")
}
if env.Database() == nil {
return nil, fmt.Errorf("export.NewBatchServer requires Database present in the ServerEnv")
}
if env.KeyManager() == nil {
return nil, fmt.Errorf("export.NewBatchServer requires KeyManager present in the ServerEnv")
}
Expand All @@ -38,19 +39,23 @@ func NewServer(config *Config, env *serverenv.ServerEnv) (*Server, error) {
}

return &Server{
db: env.Database(),
exportdb: database.New(env.Database()),
publishdb: publishdb.New(env.Database()),
config: config,
env: env,
config: config,
env: env,
}, nil
}

// Server hosts end points to manage export batches.
type Server struct {
db *coredb.DB
exportdb *database.ExportDB
publishdb *publishdb.PublishDB
config *Config
env *serverenv.ServerEnv
config *Config
env *serverenv.ServerEnv
}

// Routes defines and returns the routes for this server.
func (s *Server) Routes(ctx context.Context) *http.ServeMux {
mux := http.NewServeMux()

mux.HandleFunc("/create-batches", s.handleCreateBatches(ctx))
mux.HandleFunc("/do-work", s.handleDoWork(ctx))

return mux
}
22 changes: 18 additions & 4 deletions internal/export/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,29 @@ func TestNewServer(t *testing.T) {
env: serverenv.New(ctx),
err: fmt.Errorf("export.NewBatchServer requires Blobstore present in the ServerEnv"),
},
{
name: "nil Database",
env: serverenv.New(ctx,
serverenv.WithBlobStorage(emptyStorage),
),
err: fmt.Errorf("export.NewBatchServer requires Database present in the ServerEnv"),
},
{
name: "nil KeyManager",
env: serverenv.New(ctx, serverenv.WithBlobStorage(emptyStorage)),
err: fmt.Errorf("export.NewBatchServer requires KeyManager present in the ServerEnv"),
env: serverenv.New(ctx,
serverenv.WithBlobStorage(emptyStorage),
serverenv.WithDatabase(emptyDB),
),
err: fmt.Errorf("export.NewBatchServer requires KeyManager present in the ServerEnv"),
},
{
name: "Fully Specified",
env: serverenv.New(ctx, serverenv.WithBlobStorage(emptyStorage), serverenv.WithKeyManager(emptyKMS), serverenv.WithDatabase(emptyDB)),
err: nil,
env: serverenv.New(ctx,
serverenv.WithBlobStorage(emptyStorage),
serverenv.WithDatabase(emptyDB),
serverenv.WithKeyManager(emptyKMS),
),
err: nil,
},
}

Expand Down
Loading

0 comments on commit 0093608

Please sign in to comment.