Skip to content

Commit

Permalink
feat(clickhouse): include migrations for clickhouse
Browse files Browse the repository at this point in the history
  • Loading branch information
yquansah committed Jan 30, 2024
1 parent 2522702 commit 81b037e
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 10 deletions.
53 changes: 44 additions & 9 deletions internal/server/analytics/clickhouse/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package clickhouse
import (
"context"
"database/sql"
"errors"
"fmt"
"sync"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/golang-migrate/migrate/v4"
clickhouseMigrate "github.com/golang-migrate/migrate/v4/database/clickhouse"
"go.uber.org/zap"
)

Expand All @@ -33,31 +36,63 @@ type Client struct {
// New constructs a new clickhouse client that conforms to the analytics.Client contract.
func New(logger *zap.Logger, connectionString string) (*Client, error) {
var (
conn *sql.DB
connectErr error
conn *sql.DB
clickhouseErr error
)

dbOnce.Do(func() {
err := runMigrations(logger, connectionString)
if err != nil {
clickhouseErr = err
return
}

connection, err := connect(connectionString)
if err != nil {
connectErr = err
clickhouseErr = err
return
}

conn = connection
})

if connectErr != nil {
return nil, connectErr
if clickhouseErr != nil {
return nil, clickhouseErr
}

return &Client{conn: conn}, nil
}

func runMigrations(logger *zap.Logger, connectionString string) error {
db := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{connectionString},
})

driver, err := clickhouseMigrate.WithInstance(db, &clickhouseMigrate.Config{
MigrationsTableEngine: "MergeTree",
})
if err != nil {
logger.Error("error creating driver for clickhouse migrations", zap.Error(err))
return err
}

m, err := migrate.NewWithDatabaseInstance("file://sql", "clickhouse", driver)
if err != nil {
logger.Error("error creating clickhouse DB instance for migrations", zap.Error(err))
return err
}

if err := m.Up(); err != nil && errors.Is(err, migrate.ErrNoChange) {
logger.Error("error running migrations on clickhouse", zap.Error(err))
return err
}

return nil
}

func connect(connectionString string) (*sql.DB, error) {
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{connectionString},
Protocol: clickhouse.HTTP,
Addr: []string{connectionString},
})

if err := conn.Ping(); err != nil {
Expand Down Expand Up @@ -136,8 +171,8 @@ func getStepFromDuration(from time.Duration) *Step {

// IncrementFlagEvaluation inserts a row into Clickhouse that corresponds to a time when a flag was evaluated.
// This acts as a "prometheus-like" counter metric.
func (c *Client) IncrementFlagEvaluation(ctx context.Context, flagKey string) error {
_, err := c.conn.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s VALUES (toDateTime(?),?,?,?)", counterAnalyticsTable), time.Now().Format(timeFormat), counterAnalyticsName, flagKey, 1)
func (c *Client) IncrementFlagEvaluation(ctx context.Context, namespaceKey, flagKey string) error {
_, err := c.conn.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s VALUES (toDateTime(?),?,?,?,?)", counterAnalyticsTable), time.Now().Format(timeFormat), counterAnalyticsName, namespaceKey, flagKey, 1)

return err
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CREATE TABLE IF NOT EXISTS flipt_counter_analytics (
`timestamp` DateTime, `name` String, `flag_key` String, `namespace_key` String, `value` UInt32
`timestamp` DateTime, `name` String, `namespace_key` String, `flag_key` String, `value` UInt32
) Engine = MergeTree
ORDER BY timestamp
TTL timestamp + INTERVAL 1 WEEK;

0 comments on commit 81b037e

Please sign in to comment.