Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Add debug endpoints on export #609

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions cmd/export/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package main
import (
"context"
"fmt"
"net/http"

"github.com/google/exposure-notifications-server/internal/export"
"github.com/google/exposure-notifications-server/internal/interrupt"
Expand Down Expand Up @@ -53,15 +52,11 @@ func realMain(ctx context.Context) error {
return fmt.Errorf("export.NewServer: %w", err)
}

mux := http.NewServeMux()
mux.HandleFunc("/create-batches", batchServer.CreateBatchesHandler)
mux.HandleFunc("/do-work", batchServer.WorkerHandler)

srv, err := server.New(config.Port)
if err != nil {
return fmt.Errorf("server.New: %w", err)
}
logger.Infof("listening on :%s", config.Port)

return srv.ServeHTTPHandler(ctx, mux)
return srv.ServeHTTPHandler(ctx, batchServer.Routes(ctx))
}
28 changes: 14 additions & 14 deletions internal/database/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ import (
)

type Config struct {
Name string `env:"DB_NAME"`
User string `env:"DB_USER"`
Host string `env:"DB_HOST, default=localhost"`
Port string `env:"DB_PORT, default=5432"`
SSLMode string `env:"DB_SSLMODE, default=require"`
ConnectionTimeout int `env:"DB_CONNECT_TIMEOUT"`
Name string `env:"DB_NAME" json:",omitempty"`
User string `env:"DB_USER" json:",omitempty"`
Host string `env:"DB_HOST, default=localhost" json:",omitempty"`
Port string `env:"DB_PORT, default=5432" json:",omitempty"`
SSLMode string `env:"DB_SSLMODE, default=require" json:",omitempty"`
ConnectionTimeout int `env:"DB_CONNECT_TIMEOUT" json:",omitempty"`
Password string `env:"DB_PASSWORD" json:"-"` // ignored by zap's JSON formatter
SSLCertPath string `env:"DB_SSLCERT"`
SSLKeyPath string `env:"DB_SSLKEY"`
SSLRootCertPath string `env:"DB_SSLROOTCERT"`
PoolMinConnections string `env:"DB_POOL_MIN_CONNS"`
PoolMaxConnections string `env:"DB_POOL_MAX_CONNS"`
PoolMaxConnLife time.Duration `env:"DB_POOL_MAX_CONN_LIFETIME"`
PoolMaxConnIdle time.Duration `env:"DB_POOL_MAX_CONN_IDLE_TIME"`
PoolHealthCheck time.Duration `env:"DB_POOL_HEALTH_CHECK_PERIOD"`
SSLCertPath string `env:"DB_SSLCERT" json:",omitempty"`
SSLKeyPath string `env:"DB_SSLKEY" json:",omitempty"`
SSLRootCertPath string `env:"DB_SSLROOTCERT" json:",omitempty"`
PoolMinConnections string `env:"DB_POOL_MIN_CONNS" json:",omitempty"`
PoolMaxConnections string `env:"DB_POOL_MAX_CONNS" json:",omitempty"`
PoolMaxConnLife time.Duration `env:"DB_POOL_MAX_CONN_LIFETIME" json:",omitempty"`
PoolMaxConnIdle time.Duration `env:"DB_POOL_MAX_CONN_IDLE_TIME" json:",omitempty"`
PoolHealthCheck time.Duration `env:"DB_POOL_HEALTH_CHECK_PERIOD" json:",omitempty"`
}

func (c *Config) DatabaseConfig() *Config {
Expand Down
23 changes: 14 additions & 9 deletions internal/export/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,27 @@ import (
"time"

coredb "github.com/google/exposure-notifications-server/internal/database"
"github.com/google/exposure-notifications-server/internal/export/database"
exportdatabase "github.com/google/exposure-notifications-server/internal/export/database"
"github.com/google/exposure-notifications-server/internal/export/model"
publishmodel "github.com/google/exposure-notifications-server/internal/publish/model"

"github.com/google/exposure-notifications-server/internal/logging"
)

// CreateBatchesHandler is a handler to iterate the rows of ExportConfig and
// handleCreateBatches is a handler to iterate the rows of ExportConfig and
// create entries in ExportBatchJob as appropriate.
func (s *Server) CreateBatchesHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), s.config.CreateTimeout)
defer cancel()
func (s *Server) handleCreateBatches(ctx context.Context) http.HandlerFunc {
logger := logging.FromContext(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is technically a change in where the logger comes from, but materially it won't matter since we haven't configured different loggers by context ever.

metrics := s.env.MetricsExporter(ctx)
db := s.env.Database()

return func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), s.config.CreateTimeout)
defer cancel()

// Obtain lock to make sure there are no other processes working to create batches.
lock := "create_batches"
unlockFn, err := s.db.Lock(ctx, lock, s.config.CreateTimeout)
unlockFn, err := db.Lock(ctx, lock, s.config.CreateTimeout)
if err != nil {
if errors.Is(err, coredb.ErrAlreadyLocked) {
metrics.WriteInt("export-batcher-lock-contention", true, 1)
Expand All @@ -66,7 +69,7 @@ func (s *Server) CreateBatchesHandler(w http.ResponseWriter, r *http.Request) {
}()

effectiveTime := time.Now().Add(-1 * s.config.MinWindowAge)
err = database.New(s.db).IterateExportConfigs(ctx, effectiveTime, func(ec *model.ExportConfig) error {
err = exportdatabase.New(db).IterateExportConfigs(ctx, effectiveTime, func(ec *model.ExportConfig) error {
totalConfigs++
if batchesCreated, err := s.maybeCreateBatches(ctx, ec, effectiveTime); err != nil {
logger.Errorf("Failed to create batches for config %d: %v, continuing to next config", ec.ConfigID, err)
Expand Down Expand Up @@ -94,12 +97,14 @@ func (s *Server) CreateBatchesHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Failed to create batches, check logs.", http.StatusInternalServerError)
}
}
}

func (s *Server) maybeCreateBatches(ctx context.Context, ec *model.ExportConfig, now time.Time) (int, error) {
logger := logging.FromContext(ctx)
metrics := s.env.MetricsExporter(ctx)
db := s.env.Database()

latestEnd, err := database.New(s.db).LatestExportBatchEnd(ctx, ec)
latestEnd, err := exportdatabase.New(db).LatestExportBatchEnd(ctx, ec)
if err != nil {
return 0, fmt.Errorf("fetching most recent batch for config %d: %w", ec.ConfigID, err)
}
Expand Down Expand Up @@ -128,7 +133,7 @@ func (s *Server) maybeCreateBatches(ctx context.Context, ec *model.ExportConfig,
})
}

if err := database.New(s.db).AddExportBatches(ctx, batches); err != nil {
if err := exportdatabase.New(db).AddExportBatches(ctx, batches); err != nil {
return 0, fmt.Errorf("creating export batches for config %d: %w", ec.ConfigID, err)
}

Expand Down
6 changes: 6 additions & 0 deletions internal/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ type Config struct {
TruncateWindow time.Duration `env:"TRUNCATE_WINDOW, default=1h"`
MinWindowAge time.Duration `env:"MIN_WINDOW_AGE, default=2h"`
TTL time.Duration `env:"CLEANUP_TTL, default=336h"`

// Debugging flags follow. These should not be enabled in production
// environments.

// DebugEndpointEnabled controls whether the debug endpoint is enabled.
DebugEndpoint bool `env:"DEBUG_ENDPOINT"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so - I think we want the debug endpoint on the exposure service not the export service.

in our current config, the export service is not accessible without IAM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to verify that both publish and export are using same truncate window. But if getting a /debug on publish/exposure service is lower hanging fruit that's totally fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My plan was to do this on all services, just starting with one for now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking more... export is guarded by auth already, so you'd need to have the right IAM permissions to invoke the /debug endpoint. publish is the only public-facing service where this makes sense, but the publisher won't have access to any of the export runtime configuration (unless we do something whacky like call the Cloud Run API, which seems like a bad ideA).

}

func (c *Config) BlobstoreConfig() *storage.Config {
Expand Down
2 changes: 1 addition & 1 deletion internal/export/database/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (db *ExportDB) GetAllExportConfigs(ctx context.Context) ([]*model.ExportCon
}
defer rows.Close()

results := []*model.ExportConfig{}
var results []*model.ExportConfig
for rows.Next() {
ec, err := scanOneExportConfig(rows)
if err != nil {
Expand Down
97 changes: 97 additions & 0 deletions internal/export/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package export

import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"

exportdatabase "github.com/google/exposure-notifications-server/internal/export/database"
exportmodel "github.com/google/exposure-notifications-server/internal/export/model"
"github.com/google/exposure-notifications-server/internal/logging"
)

func (s *Server) handleDebug(ctx context.Context) http.HandlerFunc {
logger := logging.FromContext(ctx)

type response struct {
Config *Config
ExportConfigs []*exportmodel.ExportConfig
ExportBatchEnds map[int64]time.Time
ExportBatchFiles []string

SignatureInfos []*exportmodel.SignatureInfo
}

return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
db := s.env.Database()

exportConfigs, err := exportdatabase.New(db).GetAllExportConfigs(ctx)
if err != nil {
logger.Errorf("failed to get all export configs: %v", err)
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, http.StatusText(http.StatusInternalServerError))
return
}

exportBatchEnds := make(map[int64]time.Time, len(exportConfigs))
for _, ec := range exportConfigs {
end, err := exportdatabase.New(db).LatestExportBatchEnd(ctx, ec)
if err != nil {
logger.Errorf("failed to get latest export batch end for %d: %v", ec.ConfigID, err)
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, http.StatusText(http.StatusInternalServerError))
return
}
exportBatchEnds[ec.ConfigID] = end
}

exportBatchFiles, err := exportdatabase.New(db).LookupExportFiles(ctx, 4*time.Hour)
if err != nil {
logger.Errorf("failed to get export files: %v", err)
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, http.StatusText(http.StatusInternalServerError))
return
}

signatureInfos, err := exportdatabase.New(db).ListAllSigntureInfos(ctx)
if err != nil {
logger.Errorf("failed to get all signature infos: %v", err)
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, http.StatusText(http.StatusInternalServerError))
return
}

resp := &response{
Config: s.config,
ExportConfigs: exportConfigs,
ExportBatchEnds: exportBatchEnds,
ExportBatchFiles: exportBatchFiles,
SignatureInfos: signatureInfos,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also include the last X uploaded TEKs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were we planning a similar debug handler on the publish server? We could dump authorized apps and some of the config it has too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should only put on e on the exposure/publish server

}

w.Header().Set("Content-Type", "application/json")

e := json.NewEncoder(w)
e.SetIndent("", " ")
if err := e.Encode(resp); err != nil {
panic(err)
}
}
}
30 changes: 20 additions & 10 deletions internal/export/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
package export

import (
"context"
"fmt"

coredb "github.com/google/exposure-notifications-server/internal/database"
"github.com/google/exposure-notifications-server/internal/export/database"
publishdb "github.com/google/exposure-notifications-server/internal/publish/database"
"net/http"

"github.com/google/exposure-notifications-server/internal/serverenv"
)
Expand All @@ -30,6 +28,9 @@ func NewServer(config *Config, env *serverenv.ServerEnv) (*Server, error) {
if env.Blobstore() == nil {
return nil, fmt.Errorf("export.NewBatchServer requires Blobstore present in the ServerEnv")
}
if env.Database() == nil {
return nil, fmt.Errorf("export.NewBatchServer requires Database present in the ServerEnv")
}
if env.KeyManager() == nil {
return nil, fmt.Errorf("export.NewBatchServer requires KeyManager present in the ServerEnv")
}
Expand All @@ -38,19 +39,28 @@ func NewServer(config *Config, env *serverenv.ServerEnv) (*Server, error) {
}

return &Server{
db: env.Database(),
exportdb: database.New(env.Database()),
publishdb: publishdb.New(env.Database()),
config: config,
env: env,
}, nil
}

// Server hosts end points to manage export batches.
type Server struct {
db *coredb.DB
exportdb *database.ExportDB
publishdb *publishdb.PublishDB
config *Config
env *serverenv.ServerEnv
}

// Routes defines and returns the routes for this server.
func (s *Server) Routes(ctx context.Context) *http.ServeMux {
mux := http.NewServeMux()

mux.HandleFunc("/create-batches", s.handleCreateBatches(ctx))
mux.HandleFunc("/do-work", s.handleDoWork(ctx))

// Enable debug endpoint if configured.
if s.config.DebugEndpoint {
mux.HandleFunc("/debug", s.handleDebug(ctx))
}

return mux
}
18 changes: 16 additions & 2 deletions internal/export/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,28 @@ func TestNewServer(t *testing.T) {
env: serverenv.New(ctx),
err: fmt.Errorf("export.NewBatchServer requires Blobstore present in the ServerEnv"),
},
{
name: "nil Database",
env: serverenv.New(ctx,
serverenv.WithBlobStorage(emptyStorage),
),
err: fmt.Errorf("export.NewBatchServer requires Database present in the ServerEnv"),
},
{
name: "nil KeyManager",
env: serverenv.New(ctx, serverenv.WithBlobStorage(emptyStorage)),
env: serverenv.New(ctx,
serverenv.WithBlobStorage(emptyStorage),
serverenv.WithDatabase(emptyDB),
),
err: fmt.Errorf("export.NewBatchServer requires KeyManager present in the ServerEnv"),
},
{
name: "Fully Specified",
env: serverenv.New(ctx, serverenv.WithBlobStorage(emptyStorage), serverenv.WithKeyManager(emptyKMS), serverenv.WithDatabase(emptyDB)),
env: serverenv.New(ctx,
serverenv.WithBlobStorage(emptyStorage),
serverenv.WithDatabase(emptyDB),
serverenv.WithKeyManager(emptyKMS),
),
err: nil,
},
}
Expand Down
Loading