Skip to content

Commit

Permalink
feat(analytics): introduce migrations for analytics
Browse files Browse the repository at this point in the history
  • Loading branch information
yquansah committed Jan 31, 2024
1 parent 81b037e commit 3a83452
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 82 deletions.
4 changes: 2 additions & 2 deletions cmd/flipt/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (c *importCommand) run(cmd *cobra.Command, args []string) error {
// drop tables if specified
if c.dropBeforeImport {

migrator, err := sql.NewMigrator(*cfg, logger)
migrator, err := sql.NewMigrator(*cfg, logger, false)
if err != nil {
return err
}
Expand All @@ -128,7 +128,7 @@ func (c *importCommand) run(cmd *cobra.Command, args []string) error {
}
}

migrator, err := sql.NewMigrator(*cfg, logger)
migrator, err := sql.NewMigrator(*cfg, logger, false)
if err != nil {
return err
}
Expand Down
37 changes: 30 additions & 7 deletions cmd/flipt/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,30 @@ import (
"fmt"

"github.com/spf13/cobra"
"go.flipt.io/flipt/internal/config"
"go.flipt.io/flipt/internal/storage/sql"
"go.uber.org/zap"
)

var (
analytics bool
)

func runMigrations(cfg *config.Config, logger *zap.Logger, analytics bool) error {
migrator, err := sql.NewMigrator(*cfg, logger, analytics)
if err != nil {
return fmt.Errorf("initializing migrator %w", err)
}

defer migrator.Close()

if err := migrator.Up(true); err != nil {
return fmt.Errorf("running migrator %w", err)
}

return nil
}

func newMigrateCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "migrate",
Expand All @@ -21,21 +42,23 @@ func newMigrateCommand() *cobra.Command {
_ = logger.Sync()
}()

migrator, err := sql.NewMigrator(*cfg, logger)
if err != nil {
return fmt.Errorf("initializing migrator %w", err)
// Run the OLTP and OLAP database migrations sequentially because of
// potential danger in DB migrations in general.
if err := runMigrations(cfg, logger, false); err != nil {
return err
}

defer migrator.Close()

if err := migrator.Up(true); err != nil {
return fmt.Errorf("running migrator %w", err)
if analytics {
if err := runMigrations(cfg, logger, true); err != nil {
return err
}
}

return nil
},
}

cmd.Flags().StringVar(&providedConfigFile, "config", "", "path to config file")
cmd.Flags().BoolVar(&analytics, "analytics", false, "migrate analytics database")
return cmd
}
4 changes: 2 additions & 2 deletions internal/cmd/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func NewGRPCServer(
)

if cfg.Analytics.Clickhouse.Enabled {
client, err := clickhouse.New(logger, cfg.Analytics.Clickhouse.ConnectionString)
client, err := clickhouse.New(logger, cfg, forceMigrate)
if err != nil {
return nil, fmt.Errorf("connecting to clickhouse: %w", err)
}
Expand Down Expand Up @@ -580,7 +580,7 @@ var (

func getDB(ctx context.Context, logger *zap.Logger, cfg *config.Config, forceMigrate bool) (*sql.DB, sq.StatementBuilderType, fliptsql.Driver, errFunc, error) {
dbOnce.Do(func() {
migrator, err := fliptsql.NewMigrator(*cfg, logger)
migrator, err := fliptsql.NewMigrator(*cfg, logger, false)
if err != nil {
dbErr = err
return
Expand Down
6 changes: 3 additions & 3 deletions internal/config/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ type AnalyticsConfig struct {

// ClickhouseConfig defines the connection details for connecting Flipt to Clickhouse.
type ClickhouseConfig struct {
Enabled bool `json:"enabled.omitempty" mapstructure:"enabled" yaml:"enabled,omitempty"`
ConnectionString string `json:"connectionString,omitempty" mapstructure:"connection_string" yaml:"connection_string,omitempty"`
Enabled bool `json:"enabled.omitempty" mapstructure:"enabled" yaml:"enabled,omitempty"`
URL string `json:"url,omitempty" mapstructure:"url" yaml:"url,omitempty"`
}

func (m *AnalyticsConfig) setDefaults(v *viper.Viper) error {

Check failure on line 22 in internal/config/analytics.go

View workflow job for this annotation

GitHub Actions / Lint Go

(*AnalyticsConfig).setDefaults - result 0 (error) is always nil (unparam)
Expand All @@ -27,7 +27,7 @@ func (m *AnalyticsConfig) setDefaults(v *viper.Viper) error {
}

func (m *AnalyticsConfig) validate() error {
if m.Clickhouse.Enabled && m.Clickhouse.ConnectionString == "" {
if m.Clickhouse.Enabled && m.Clickhouse.URL == "" {
return errors.New("clickhouse connection string not provided")
}

Expand Down
8 changes: 1 addition & 7 deletions internal/server/analytics/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,13 @@ package analytics

import (
"context"
"time"

"go.flipt.io/flipt/rpc/flipt/analytics"
)

// GetFlagEvaluationsCount is the implemented RPC method that will return aggregated flag evaluation counts.
func (s *Server) GetFlagEvaluationsCount(ctx context.Context, req *analytics.GetFlagEvaluationsCountRequest) (*analytics.GetFlagEvaluationsCountResponse, error) {
duration, err := time.ParseDuration(req.From)
if err != nil {
return nil, err
}

timestamps, values, err := s.client.GetFlagEvaluationsCount(ctx, req.NamespaceKey, req.FlagKey, duration)
timestamps, values, err := s.client.GetFlagEvaluationsCount(ctx, req)
if err != nil {
return nil, err
}
Expand Down
63 changes: 32 additions & 31 deletions internal/server/analytics/clickhouse/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package clickhouse
import (
"context"
"database/sql"
"errors"
"fmt"
"sync"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/golang-migrate/migrate/v4"
clickhouseMigrate "github.com/golang-migrate/migrate/v4/database/clickhouse"
"go.flipt.io/flipt/internal/config"
fliptsql "go.flipt.io/flipt/internal/storage/sql"
"go.flipt.io/flipt/rpc/flipt/analytics"
"go.uber.org/zap"
)

Expand All @@ -30,24 +30,25 @@ const (
)

type Client struct {
conn *sql.DB
conn *sql.DB
forceMigrate bool
}

// New constructs a new clickhouse client that conforms to the analytics.Client contract.
func New(logger *zap.Logger, connectionString string) (*Client, error) {
func New(logger *zap.Logger, cfg *config.Config, forceMigrate bool) (*Client, error) {
var (
conn *sql.DB
clickhouseErr error
)

dbOnce.Do(func() {
err := runMigrations(logger, connectionString)
err := runMigrations(logger, cfg, forceMigrate)
if err != nil {
clickhouseErr = err
return
}

connection, err := connect(connectionString)
connection, err := connect(cfg.Analytics.Clickhouse.URL)
if err != nil {
clickhouseErr = err
return
Expand All @@ -60,30 +61,17 @@ func New(logger *zap.Logger, connectionString string) (*Client, error) {
return nil, clickhouseErr
}

return &Client{conn: conn}, nil
return &Client{conn: conn, forceMigrate: forceMigrate}, nil
}

func runMigrations(logger *zap.Logger, connectionString string) error {
db := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{connectionString},
})

driver, err := clickhouseMigrate.WithInstance(db, &clickhouseMigrate.Config{
MigrationsTableEngine: "MergeTree",
})
// runMigrations will run migrations for clickhouse if enabled from the client.
func runMigrations(logger *zap.Logger, cfg *config.Config, forceMigrate bool) error {
m, err := fliptsql.NewMigrator(*cfg, logger, true)
if err != nil {
logger.Error("error creating driver for clickhouse migrations", zap.Error(err))
return err
}

m, err := migrate.NewWithDatabaseInstance("file://sql", "clickhouse", driver)
if err != nil {
logger.Error("error creating clickhouse DB instance for migrations", zap.Error(err))
return err
}

if err := m.Up(); err != nil && errors.Is(err, migrate.ErrNoChange) {
logger.Error("error running migrations on clickhouse", zap.Error(err))
if err := m.Up(forceMigrate); err != nil {
return err
}

Expand All @@ -102,17 +90,30 @@ func connect(connectionString string) (*sql.DB, error) {
return conn, nil
}

func (c *Client) GetFlagEvaluationsCount(ctx context.Context, namespaceKey, flagKey string, from time.Duration) ([]string, []float32, error) {
step := getStepFromDuration(from)
func (c *Client) GetFlagEvaluationsCount(ctx context.Context, req *analytics.GetFlagEvaluationsCountRequest) ([]string, []float32, error) {
fromTime, err := time.Parse(timeFormat, req.From)
if err != nil {
return nil, nil, err
}

toTime, err := time.Parse(timeFormat, req.To)
if err != nil {
return nil, nil, err
}

duration := toTime.Sub(fromTime)

step := getStepFromDuration(duration)

rows, err := c.conn.QueryContext(ctx, fmt.Sprintf(`SELECT sum(value) AS value, toStartOfInterval(timestamp, INTERVAL %d %s) AS timestamp
FROM %s WHERE namespaceKey = ? AND flag_key = ? AND timestamp >= now() - toIntervalMinute(%f) GROUP BY timestamp ORDER BY timestamp`,
FROM %s WHERE namespaceKey = ? AND flag_key = ? AND timestamp >= %s AND timestamp < %s GROUP BY timestamp ORDER BY timestamp`,
step.intervalValue,
step.intervalStep,
counterAnalyticsTable,
from.Seconds()),
namespaceKey,
flagKey,
fromTime.String(),
toTime.String()),
req.NamespaceKey,
req.FlagKey,
)
if err != nil {
return nil, nil, err
Expand Down
3 changes: 1 addition & 2 deletions internal/server/analytics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package analytics

import (
"context"
"time"

"go.flipt.io/flipt/rpc/flipt/analytics"
"go.uber.org/zap"
Expand All @@ -12,7 +11,7 @@ import (
// Client is a contract that each analytics store needs to conform to for
// getting analytics from the implemented store.
type Client interface {
GetFlagEvaluationsCount(ctx context.Context, namespaceKey, flagKey string, from time.Duration) ([]string, []float32, error)
GetFlagEvaluationsCount(ctx context.Context, req *analytics.GetFlagEvaluationsCountRequest) ([]string, []float32, error)
}

// Server is a grpc server for Flipt analytics.
Expand Down
7 changes: 6 additions & 1 deletion internal/storage/sql/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func BuilderFor(db *sql.DB, driver Driver, preparedStatementsEnabled bool) sq.St
type Options struct {
sslDisabled bool
migrate bool
analytics bool

Check failure on line 92 in internal/storage/sql/db.go

View workflow job for this annotation

GitHub Actions / Lint Go

field `analytics` is unused (unused)
}

type Option func(*Options)
Expand Down Expand Up @@ -179,6 +180,7 @@ var (
Postgres: "postgres",
MySQL: "mysql",
CockroachDB: "cockroachdb",
Clickhouse: "clickhouse",
}

stringToDriver = map[string]Driver{
Expand All @@ -187,6 +189,7 @@ var (
"postgres": Postgres,
"mysql": MySQL,
"cockroachdb": CockroachDB,
"clickhouse": Clickhouse,
}
)

Expand Down Expand Up @@ -214,8 +217,10 @@ const (
MySQL
// CockroachDB ...
CockroachDB
// LibSQL...
// LibSQL ...
LibSQL
// Clickhouse ...
Clickhouse
)

func parse(cfg config.Config, opts Options) (Driver, *dburl.URL, error) {
Expand Down
53 changes: 51 additions & 2 deletions internal/storage/sql/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"errors"
"fmt"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database"
clickhouseMigrate "github.com/golang-migrate/migrate/v4/database/clickhouse"
"github.com/golang-migrate/migrate/v4/database/cockroachdb"
"github.com/golang-migrate/migrate/v4/database/mysql"
"github.com/golang-migrate/migrate/v4/database/postgres"
Expand All @@ -25,7 +27,7 @@ var expectedVersions = map[Driver]uint{
CockroachDB: 9,
}

// Migrator is responsible for migrating the database schema
// rdbmsMigrator is responsible for migrating the relational database schema
type Migrator struct {
db *sql.DB
driver Driver
Expand All @@ -34,7 +36,15 @@ type Migrator struct {
}

// NewMigrator creates a new Migrator
func NewMigrator(cfg config.Config, logger *zap.Logger) (*Migrator, error) {
func NewMigrator(cfg config.Config, logger *zap.Logger, analytics bool) (*Migrator, error) {
if analytics {
return newAnalyticsMigrator(cfg, logger)
}
return newRDBMSMigrator(cfg, logger)
}

// newRDBMSMigrator is a helper function to provide a Migrator instance for relational databases
func newRDBMSMigrator(cfg config.Config, logger *zap.Logger) (*Migrator, error) {
sql, driver, err := open(cfg, Options{migrate: true})
if err != nil {
return nil, fmt.Errorf("opening db: %w", err)
Expand Down Expand Up @@ -79,6 +89,45 @@ func NewMigrator(cfg config.Config, logger *zap.Logger) (*Migrator, error) {
}, nil
}

// newAnalyticsMigrator is a helper function to provide a Migrator instance for OLAP databases
func newAnalyticsMigrator(cfg config.Config, logger *zap.Logger) (*Migrator, error) {
var db *sql.DB

if cfg.Analytics.Clickhouse.Enabled {
db = clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{cfg.Analytics.Clickhouse.URL},
})

driver, err := clickhouseMigrate.WithInstance(db, &clickhouseMigrate.Config{
DatabaseName: "flipt_analytics",
MigrationsTableEngine: "MergeTree",
})
if err != nil {
return nil, fmt.Errorf("creating migrate instance: %w", err)
}

sourceDriver, err := iofs.New(migrations.FS, Clickhouse.Migrations())
if err != nil {
return nil, fmt.Errorf("getting fs: %w", err)
}

m, err := migrate.NewWithInstance("iofs", sourceDriver, Clickhouse.Migrations(), driver)
if err != nil {
logger.Error("error creating clickhouse DB instance for migrations", zap.Error(err))
return nil, err
}

return &Migrator{
db: db,
migrator: m,
logger: logger,
driver: Clickhouse,
}, nil
}

return nil, fmt.Errorf("no analytics db enabled")
}

// Close closes the source and db
func (m *Migrator) Close() (source, db error) {
return m.migrator.Close()
Expand Down
Loading

0 comments on commit 3a83452

Please sign in to comment.