Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Work on v1.5 #710

Merged
merged 4 commits into from
Jul 13, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/generate/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Config struct {
MaxSameStartIntervalKeys int `env:"MAX_SAME_START_INTERVAL_KEYS, default=2"`
SimulateSameDayRelease bool `env:"SIMULATE_SAME_DAY_RELEASE, default=false"`
MaxIntervalAge time.Duration `env:"MAX_INTERVAL_AGE_ON_PUBLISH, default=360h"`
MaxSymptomOnsetDays int `env:"MAX_SYMPTOM_ONSET_DAYS, default=21"`
TruncateWindow time.Duration `env:"TRUNCATE_WINDOW, default=1h"`
DefaultRegion string `env:"DEFAULT_REGOIN, default=US"`
}
Expand Down
16 changes: 9 additions & 7 deletions internal/generate/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/google/exposure-notifications-server/internal/publish/model"
"github.com/google/exposure-notifications-server/internal/serverenv"
"github.com/google/exposure-notifications-server/internal/util"
"github.com/google/exposure-notifications-server/internal/verification"
verifyapi "github.com/google/exposure-notifications-server/pkg/api/v1alpha1"
)

Expand All @@ -39,7 +40,7 @@ func NewHandler(ctx context.Context, config *Config, env *serverenv.ServerEnv) (
return nil, fmt.Errorf("missing database in server environment")
}

transformer, err := model.NewTransformer(config.MaxKeysOnPublish, config.MaxSameStartIntervalKeys, config.MaxIntervalAge, config.TruncateWindow, false)
transformer, err := model.NewTransformer(config.MaxKeysOnPublish, config.MaxSameStartIntervalKeys, config.MaxIntervalAge, config.TruncateWindow, config.MaxSymptomOnsetDays, false)
mikehelmick marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("model.NewTransformer: %w", err)
}
Expand Down Expand Up @@ -78,12 +79,8 @@ func (h *generateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
for i := 0; i < h.config.NumExposures; i++ {
logger.Infof("Generating exposure %v of %v", i+1, h.config.NumExposures)

tr, err := util.RandomTransmissionRisk()
if err != nil {
tr = 0
}
publish := verifyapi.Publish{
Keys: util.GenerateExposureKeys(h.config.KeysPerExposure, tr, false),
Keys: util.GenerateExposureKeys(h.config.KeysPerExposure, 0, false),
Regions: regions,
AppPackageName: "generated.data",
}
Expand All @@ -101,7 +98,12 @@ func (h *generateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

exposures, err := h.transformer.TransformPublish(ctx, &publish, batchTime)
claims := verification.VerifiedClaims{
ReportType: util.RandomReportType(),
SymptomOnsetInterval: uint32(publish.Keys[0].IntervalNumber),
}

exposures, err := h.transformer.TransformPublish(ctx, &publish, &claims, batchTime)
if err != nil {
message := fmt.Sprintf("Error transforming generated exposures: %v", err)
span.SetStatus(trace.Status{Code: trace.StatusCodeInternal, Message: message})
Expand Down
1 change: 1 addition & 0 deletions internal/publish/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Config struct {
// Provides compatibility w/ 1.5 release.
MaxSameStartIntervalKeys int `env:"MAX_SAME_START_INTERVAL_KEYS, default=3"`
MaxIntervalAge time.Duration `env:"MAX_INTERVAL_AGE_ON_PUBLISH, default=360h"`
MaxSymptomOnsetDays int `env:"MAX_SYMPTOM_ONSET_DAYS, default=21"`
TruncateWindow time.Duration `env:"TRUNCATE_WINDOW, default=1h"`

// Flags for local development and testing. This will cause still valid keys
Expand Down
101 changes: 93 additions & 8 deletions internal/publish/database/exposure.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package database

import (
"context"
"database/sql"
"encoding/base64"
"fmt"
"strconv"
Expand Down Expand Up @@ -53,6 +54,7 @@ type IterateExposuresCriteria struct {
SinceTimestamp time.Time
UntilTimestamp time.Time
LastCursor string
RevisedKeys bool // If true, only revised keys that match will be selected.

// OnlyLocalProvenance indicates that only exposures with LocalProvenance=true will be returned.
OnlyLocalProvenance bool
Expand Down Expand Up @@ -112,7 +114,8 @@ func (db *PublishDB) IterateExposures(ctx context.Context, criteria IterateExpos
syncID *int64
)
if err := rows.Scan(&encodedKey, &m.TransmissionRisk, &m.AppPackageName, &m.Regions, &m.IntervalNumber,
&m.IntervalCount, &m.CreatedAt, &m.LocalProvenance, &syncID); err != nil {
&m.IntervalCount, &m.CreatedAt, &m.LocalProvenance, &syncID, &m.HealthAuthorityID, &m.ReportType,
&m.DaysSinceSymptomOnset, &m.RevisedReportType, &m.RevisedAt, &m.RevisedDaysSinceSymptomOnset); err != nil {
return cursor(), err
}
var err error
Expand All @@ -139,7 +142,8 @@ func generateExposureQuery(criteria IterateExposuresCriteria) (string, []interfa
q := `
SELECT
exposure_key, transmission_risk, LOWER(app_package_name), regions, interval_number, interval_count,
created_at, local_provenance, sync_id
created_at, local_provenance, sync_id, health_authority_id, report_type,
days_since_symptom_onset, revised_report_type, revised_at, revised_days_since_symptom_onset
FROM
Exposure
WHERE 1=1
Expand All @@ -155,19 +159,31 @@ func generateExposureQuery(criteria IterateExposuresCriteria) (string, []interfa
q += fmt.Sprintf(" AND NOT (regions && $%d)", len(args)) // Operation "&&" means "array overlaps / intersects"
}

if criteria.RevisedKeys {
q += " AND revised_at IS NOT NULL"
}

// It is important for StartTimestamp to be inclusive (as opposed to exclusive). When the exposure keys are
// published, they are truncated to a time boundary (e.g., time.Hour). Even though the exposure keys might arrive
// during a current open export batch window, the exposure keys are truncated to the start of that window,
// which would make them fall into the _previous_ (already processed) batch if StartTimestamp is exclusive
// (in the case where the publish window and the export period align).
if !criteria.SinceTimestamp.IsZero() {
args = append(args, criteria.SinceTimestamp)
q += fmt.Sprintf(" AND created_at >= $%d", len(args))
if criteria.RevisedKeys {
mikehelmick marked this conversation as resolved.
Show resolved Hide resolved
q += fmt.Sprintf(" AND revised_at >= $%d", len(args))
} else {
q += fmt.Sprintf(" AND created_at >= $%d", len(args))
}
}

if !criteria.UntilTimestamp.IsZero() {
args = append(args, criteria.UntilTimestamp)
q += fmt.Sprintf(" AND created_at < $%d", len(args))
if criteria.RevisedKeys {
q += fmt.Sprintf(" AND revised_at < $%d", len(args))
} else {
q += fmt.Sprintf(" AND created_at < $%d", len(args))
}
}

if criteria.OnlyLocalProvenance {
Expand All @@ -190,6 +206,73 @@ func generateExposureQuery(criteria IterateExposuresCriteria) (string, []interfa
return q, args, nil
}

// ReadExposures will read an existing set of exposures from the database.
// This is necessary in case a key needs to be revised.
// In the return map, the key is the base64 of the ExposureKey.
func (db *PublishDB) ReadExposures(ctx context.Context, b64keys []string) (map[string]*model.Exposure, error) {
conn, err := db.db.Pool.Acquire(ctx)
if err != nil {
return nil, fmt.Errorf("acquiring connection: %v", err)
}
defer conn.Release()

query := `
SELECT
exposure_key, transmission_risk, app_package_name, regions,
interval_number, interval_count, created_at, local_provenance, sync_id,
health_authority_id, report_type, days_since_symptom_onset,
revised_report_type, revised_at, revised_days_since_symptom_onset
FROM
Exposure
WHERE exposure_key = ANY($1)`
rows, err := conn.Query(ctx, query, b64keys)
if err != nil {
return nil, err
}
defer rows.Close()

result := make(map[string]*model.Exposure)
for rows.Next() {
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterating rows: %w", err)
}

var encodedKey string
var syncID sql.NullInt64

var exposure model.Exposure
if err := rows.Scan(
&encodedKey, &exposure.TransmissionRisk, &exposure.AppPackageName,
&exposure.Regions, &exposure.IntervalNumber, &exposure.IntervalCount,
&exposure.CreatedAt, &exposure.LocalProvenance, &syncID,
&exposure.HealthAuthorityID, &exposure.ReportType, &exposure.DaysSinceSymptomOnset,
&exposure.RevisedReportType, &exposure.RevisedAt, &exposure.RevisedDaysSinceSymptomOnset,
); err != nil {
return nil, err
}

// Base64 decode the exposure key
exposure.ExposureKey, err = decodeExposureKey(encodedKey)
if err != nil {
return nil, err
}
// Optionally set all of the nullable columns.
if syncID.Valid {
exposure.FederationSyncID = syncID.Int64
}

result[exposure.ExposureKeyBase64()] = &exposure
}

return result, nil
}

// ReviseExposures transactionally revises and inserts a set of keys as necessary.
func (db *PublishDB) ReviseExposures(ctx context.Context, exposures []*model.Exposure) error {
// TODO(mikehelmick): implement revise exposures functionality.
return fmt.Errorf("REVISE EXPOSURES NOT YET IMPLEMENTED")
}

// InsertExposures inserts a set of exposures.
func (db *PublishDB) InsertExposures(ctx context.Context, exposures []*model.Exposure) error {
return db.db.InTx(ctx, pgx.ReadCommitted, func(tx pgx.Tx) error {
Expand All @@ -198,9 +281,9 @@ func (db *PublishDB) InsertExposures(ctx context.Context, exposures []*model.Exp
INSERT INTO
Exposure
(exposure_key, transmission_risk, app_package_name, regions, interval_number, interval_count,
created_at, local_provenance, sync_id)
created_at, local_provenance, sync_id, health_authority_id, report_type, days_since_symptom_onset)
VALUES
($1, $2, LOWER($3), $4, $5, $6, $7, $8, $9)
($1, $2, LOWER($3), $4, $5, $6, $7, $8, $9, $10, $11, $12)
ON CONFLICT (exposure_key) DO NOTHING
`)
if err != nil {
Expand All @@ -212,8 +295,10 @@ func (db *PublishDB) InsertExposures(ctx context.Context, exposures []*model.Exp
if inf.FederationSyncID != 0 {
syncID = &inf.FederationSyncID
}
_, err := tx.Exec(ctx, stmtName, encodeExposureKey(inf.ExposureKey), inf.TransmissionRisk, inf.AppPackageName, inf.Regions, inf.IntervalNumber, inf.IntervalCount,
inf.CreatedAt, inf.LocalProvenance, syncID)
_, err := tx.Exec(ctx, stmtName, encodeExposureKey(inf.ExposureKey), inf.TransmissionRisk,
inf.AppPackageName, inf.Regions, inf.IntervalNumber, inf.IntervalCount,
inf.CreatedAt, inf.LocalProvenance, syncID,
inf.HealthAuthorityID, inf.ReportType, inf.DaysSinceSymptomOnset)
if err != nil {
return fmt.Errorf("inserting exposure: %v", err)
}
Expand Down
65 changes: 65 additions & 0 deletions internal/publish/database/exposure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,80 @@ import (
"context"
"errors"
"fmt"
"sort"
"strings"
"testing"
"time"

"github.com/google/exposure-notifications-server/internal/database"

"github.com/google/exposure-notifications-server/internal/publish/model"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)

var (
approxTime = cmp.Options{cmpopts.EquateApproxTime(time.Second)}
)

func TestReadExposures(t *testing.T) {
t.Parallel()

testDB := database.NewTestDatabase(t)
testPublishDB := New(testDB)
ctx := context.Background()

// Insert some Exposures.
createdAt := time.Date(2020, 5, 1, 0, 0, 0, 0, time.UTC).Truncate(time.Microsecond)
exposures := []*model.Exposure{
{
ExposureKey: []byte("ABC123"),
Regions: []string{"US"},
IntervalNumber: 100,
IntervalCount: 144,
CreatedAt: createdAt,
LocalProvenance: true,
},
{
ExposureKey: []byte("DEF456"),
Regions: []string{"US"},
IntervalNumber: 244,
IntervalCount: 144,
CreatedAt: createdAt,
LocalProvenance: true,
},
}
if err := testPublishDB.InsertExposures(ctx, exposures); err != nil {
t.Fatal(err)
}

keys := make([]string, 0, len(exposures))
for _, e := range exposures {
keys = append(keys, e.ExposureKeyBase64())
}

readBack, err := testPublishDB.ReadExposures(ctx, keys)
if err != nil {
t.Fatal(err)
}
got := make([]*model.Exposure, 0, len(exposures))
for _, v := range readBack {
got = append(got, v)
}

sorter := cmp.Transformer("Sort", func(in []*model.Exposure) []*model.Exposure {
out := append([]*model.Exposure(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i int, j int) bool {
return strings.Compare(out[i].ExposureKeyBase64(), out[j].ExposureKeyBase64()) <= 0
})
return out
})

if diff := cmp.Diff(exposures, got, approxTime, sorter); diff != "" {
t.Errorf("ReadExposures mismatch (-want, +got):\n%s", diff)
}
}

func TestExposures(t *testing.T) {
t.Parallel()

Expand Down
Loading