Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat : Execute dashboard query as part of health check #5676

Merged
merged 18 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion admin/worker/deployments_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,25 @@ func (w *Worker) deploymentHealthCheck(ctx context.Context, d *database.Deployme
for k, v := range annotations.ToMap() {
f = append(f, zap.String(k, v))
}
for d, err := range health.DashboardErrors {
w.logger.Error("deployment health check: dashboard error", append(f, zap.String("dashboard", d), zap.String("error", err))...)
}
onlyUnhealthyDash := true
if health.OlapError != "" {
onlyUnhealthyDash = false
f = append(f, zap.String("olap_error", health.OlapError))
}
if health.ControllerError != "" {
onlyUnhealthyDash = false
f = append(f, zap.String("controller_error", health.ControllerError))
}
if health.RepoError != "" {
onlyUnhealthyDash = false
f = append(f, zap.String("repo_error", health.RepoError))
}
if onlyUnhealthyDash {
continue
}
w.logger.Error("deployment health check: runtime instance is unhealthy", f...)
}
return instances, true
Expand All @@ -211,7 +221,7 @@ func runtimeUnhealthy(r *runtimev1.HealthResponse) bool {
}

func instanceUnhealthy(i *runtimev1.InstanceHealth) bool {
return i.OlapError != "" || i.ControllerError != "" || i.RepoError != ""
return i.OlapError != "" || i.ControllerError != "" || i.RepoError != "" || len(i.DashboardErrors) != 0
}

func addExpectedInstance(expectedInstances map[string][]string, d *database.Deployment) {
Expand Down
2,326 changes: 1,173 additions & 1,153 deletions proto/gen/rill/runtime/v1/api.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions proto/gen/rill/runtime/v1/api.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions proto/gen/rill/runtime/v1/runtime.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4129,6 +4129,10 @@ definitions:
type: string
repoError:
type: string
dashboardErrors:
type: object
additionalProperties:
type: string
v1InstanceHealthResponse:
type: object
properties:
Expand Down
1 change: 1 addition & 0 deletions proto/rill/runtime/v1/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ message InstanceHealth {
string controller_error = 1;
string olap_error = 2;
string repo_error = 3;
map<string, string> dashboard_errors = 4;
k-anshul marked this conversation as resolved.
Show resolved Hide resolved
}

// **********
Expand Down
10 changes: 10 additions & 0 deletions runtime/drivers/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type CatalogStore interface {
UpdateModelSplitPending(ctx context.Context, modelID, splitKey string) error
UpdateModelSplitsPendingIfError(ctx context.Context, modelID string) error
DeleteModelSplits(ctx context.Context, modelID string) error

FindInstanceHealth(ctx context.Context, instanceID string) (*InstanceHealth, error)
UpsertInstanceHealth(ctx context.Context, h *InstanceHealth) error
}

// Resource is an entry in a catalog store
Expand Down Expand Up @@ -78,3 +81,10 @@ type ModelSplit struct {
// Elapsed is the duration of the last execution of the split.
Elapsed time.Duration
}

// InstanceHealth represents the health of an instance.
type InstanceHealth struct {
InstanceID string `db:"instance_id"`
Health []byte `db:"health"`
CreatedOn time.Time `db:"created_on"`
k-anshul marked this conversation as resolved.
Show resolved Hide resolved
}
begelundmuller marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 3 additions & 2 deletions runtime/drivers/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ type configProperties struct {
// EmbedPort is the port to run Clickhouse locally (0 is random port).
EmbedPort int `mapstructure:"embed_port"`
// DataDir is the path to directory where db files will be created.
DataDir string `mapstructure:"data_dir"`
TempDir string `mapstructure:"temp_dir"`
DataDir string `mapstructure:"data_dir"`
TempDir string `mapstructure:"temp_dir"`
EnableHealthCheckCache bool `mapstructure:"enable_health_check_cache"`
}

// Open connects to Clickhouse using std API.
Expand Down
8 changes: 8 additions & 0 deletions runtime/drivers/duckdb/catalogv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,11 @@ func (c *connection) UpdateModelSplitsPendingIfError(ctx context.Context, modelI
func (c *connection) DeleteModelSplits(ctx context.Context, modelID string) error {
return drivers.ErrNotImplemented
}

func (c *connection) FindInstanceHealth(ctx context.Context, instanceID string) (*drivers.InstanceHealth, error) {
return nil, drivers.ErrNotImplemented
}

func (c *connection) UpsertInstanceHealth(ctx context.Context, h *drivers.InstanceHealth) error {
return drivers.ErrNotImplemented
}
17 changes: 17 additions & 0 deletions runtime/drivers/sqlite/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,3 +355,20 @@ func (c *catalogStore) DeleteModelSplits(ctx context.Context, modelID string) er

return nil
}

func (c *catalogStore) FindInstanceHealth(ctx context.Context, instanceID string) (*drivers.InstanceHealth, error) {
var h drivers.InstanceHealth
err := c.db.QueryRowContext(ctx, "SELECT health, created_on FROM instance_health WHERE instance_id=?", instanceID).Scan(&h.Health, &h.CreatedOn)
if err != nil {
return nil, err
}

return &h, nil
}

func (c *catalogStore) UpsertInstanceHealth(ctx context.Context, h *drivers.InstanceHealth) error {
_, err := c.db.ExecContext(ctx, `INSERT INTO instance_health(instance_id, health, created_on) Values (?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(instance_id) DO UPDATE SET health=excluded.health, created_on=excluded.created_on;
`, h.InstanceID, h.Health)
return err
}
8 changes: 8 additions & 0 deletions runtime/drivers/sqlite/migrations/0025.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
PRAGMA foreign_keys = ON;

CREATE TABLE instance_health (
instance_id TEXT PRIMARY KEY,
health BLOB NOT NULL,
created_on TIMESTAMP NOT NULL,
FOREIGN KEY (instance_id) REFERENCES instances(id) ON DELETE CASCADE
);
198 changes: 175 additions & 23 deletions runtime/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,45 @@ package runtime

import (
"context"
"crypto/md5"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/drivers"
)

type DashboardHealthQuery func(ctx context.Context, instanceID, name string) (Query, error)
k-anshul marked this conversation as resolved.
Show resolved Hide resolved

type Health struct {
HangingConn error
Registry error
InstancesHealth map[string]*InstanceHealth
}

type InstanceHealth struct {
Controller error
Repo error
OLAP error
}
// always recomputed
Controller string
Repo string

func (h *InstanceHealth) To() *runtimev1.InstanceHealth {
if h == nil {
return nil
}
r := &runtimev1.InstanceHealth{}
if h.Controller != nil {
r.ControllerError = h.Controller.Error()
}
if h.Repo != nil {
r.RepoError = h.Repo.Error()
}
if h.OLAP != nil {
r.OlapError = h.OLAP.Error()
}
return r
// cached
OLAP string
Dashboards map[string]string

Hash string
}

func (r *Runtime) Health(ctx context.Context) (*Health, error) {
func (r *Runtime) Health(ctx context.Context, query DashboardHealthQuery) (*Health, error) {
instances, err := r.registryCache.list()
if err != nil {
return nil, err
}

ih := make(map[string]*InstanceHealth, len(instances))
for _, inst := range instances {
ih[inst.ID], err = r.InstanceHealth(ctx, inst.ID)
ih[inst.ID], err = r.InstanceHealth(ctx, inst.ID, query)
if err != nil && !errors.Is(err, drivers.ErrNotFound) {
return nil, err
}
Expand All @@ -57,6 +52,163 @@ func (r *Runtime) Health(ctx context.Context) (*Health, error) {
}, nil
}

func (r *Runtime) InstanceHealth(ctx context.Context, instanceID string) (*InstanceHealth, error) {
return r.registryCache.instanceHealth(ctx, instanceID)
func (r *Runtime) InstanceHealth(ctx context.Context, instanceID string, query DashboardHealthQuery) (*InstanceHealth, error) {
res := &InstanceHealth{}
// check repo error
repo, rr, err := r.Repo(ctx, instanceID)
if err != nil {
res.Repo = err.Error()
} else {
err = repo.(drivers.Handle).Ping(ctx)
k-anshul marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
res.Repo = err.Error()
}
rr()
}

ctrl, err := r.Controller(ctx, instanceID)
if err != nil {
res.Controller = err.Error()
return res, nil
}

// check olap error
olap, release, err := r.OLAP(ctx, ctrl.InstanceID, "")
if err != nil {
res.OLAP = err.Error()
return res, nil
}
defer release()

resources, err := ctrl.List(ctx, ResourceKindMetricsView, "", false)
if err != nil {
return nil, err
}

cacheEnabled := healthCheckCacheEnabled(olap)
if cacheEnabled {
instanceHealth, ok := r.cachedInstanceHealth(ctx, ctrl.InstanceID, ctrl.catalog.version, resources)
if ok {
res.OLAP = instanceHealth.OLAP
res.Dashboards = instanceHealth.Dashboards
return res, nil
}
}
k-anshul marked this conversation as resolved.
Show resolved Hide resolved

err = olap.(drivers.Handle).Ping(ctx)
if err != nil {
res.OLAP = err.Error()
} else if query != nil {
// run queries against dashboards
res.Dashboards = make(map[string]string, len(resources))
for _, mv := range resources {
q, err := query(ctx, ctrl.InstanceID, mv.Meta.Name.Name)
k-anshul marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
res.Dashboards[mv.Meta.Name.Name] = err.Error()
continue
}
err = r.Query(ctx, ctrl.InstanceID, q, 1)
if err != nil {
res.Dashboards[mv.Meta.Name.Name] = err.Error()
}
}
}
if !cacheEnabled {
return res, nil
}

// save to cache
hash, err := healthResultHash(ctrl.catalog.version, resources)
if err != nil {
return nil, err
}
res.Hash = hash

bytes, err := json.Marshal(res)
if err != nil {
return nil, err
}
k-anshul marked this conversation as resolved.
Show resolved Hide resolved

catalog, release, err := r.Catalog(ctx, instanceID)
if err != nil {
return nil, err
}
defer release()

err = catalog.UpsertInstanceHealth(ctx, &drivers.InstanceHealth{
InstanceID: instanceID,
Health: bytes,
})
if err != nil {
return nil, err
}

return res, nil
}

func (r *Runtime) cachedInstanceHealth(ctx context.Context, instanceID string, ctrlVersion int64, resources []*runtimev1.Resource) (*InstanceHealth, bool) {
catalog, release, err := r.Catalog(ctx, instanceID)
if err != nil {
return nil, false
}
defer release()

cached, err := catalog.FindInstanceHealth(ctx, instanceID)
if err != nil {
return nil, false
}

c := &InstanceHealth{}
err = json.Unmarshal(cached.Health, c)
if err != nil {
return nil, false
}

hash, err := healthResultHash(ctrlVersion, resources)
k-anshul marked this conversation as resolved.
Show resolved Hide resolved
if err != nil || hash != c.Hash {
return nil, false
}
return c, true
}

func healthCheckCacheEnabled(olap drivers.OLAPStore) bool {
if olap.Dialect() != drivers.DialectClickHouse {
return false
}
m := olap.(drivers.Handle).Config()
enabled, ok := m["enable_health_check_cache"].(bool)
if !ok {
return true
}
return enabled
}
k-anshul marked this conversation as resolved.
Show resolved Hide resolved

func healthResultHash(ctrlVersion int64, resources []*runtimev1.Resource) (string, error) {
hash := md5.New()
err := binary.Write(hash, binary.BigEndian, ctrlVersion)
if err != nil {
return "", err
}

for _, res := range resources {
err := binary.Write(hash, binary.BigEndian, res.Meta.StateVersion)
if err != nil {
return "", err
}
}

return hex.EncodeToString(hash.Sum(nil)), nil
}

func (h *InstanceHealth) To() *runtimev1.InstanceHealth {
if h == nil {
return nil
}
r := &runtimev1.InstanceHealth{
ControllerError: h.Controller,
RepoError: h.Repo,
OlapError: h.OLAP,
DashboardErrors: h.Dashboards,
}
return r
}
Loading
Loading