diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 5d8f7ece82..fe9b3748e7 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -64,7 +64,7 @@ const ( archiveNamespace = "clickhouse-archive" signozTraceDBName = "signoz_traces" signozHistoryDBName = "signoz_analytics" - ruleStateHistoryTableName = "distributed_rule_state_history" + ruleStateHistoryTableName = "distributed_rule_state_history_v0" signozDurationMVTable = "distributed_durationSort" signozUsageExplorerTable = "distributed_usage_explorer" signozSpansTable = "distributed_signoz_spans" @@ -5222,6 +5222,18 @@ func (r *ClickHouseReader) AddRuleStateHistory(ctx context.Context, ruleStateHis return nil } +func (r *ClickHouseReader) GetLastSavedRuleStateHistory(ctx context.Context, ruleID string) ([]v3.RuleStateHistory, error) { + query := fmt.Sprintf("SELECT * FROM %s.%s WHERE rule_id = '%s' AND state_changed = true ORDER BY unix_milli DESC LIMIT 1 BY fingerprint", + signozHistoryDBName, ruleStateHistoryTableName, ruleID) + + history := []v3.RuleStateHistory{} + err := r.db.Select(ctx, &history, query) + if err != nil { + return nil, err + } + return history, nil +} + func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID( ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.RuleStateTimeline, error) { @@ -5287,6 +5299,7 @@ func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID( signozHistoryDBName, ruleStateHistoryTableName, whereClause, params.Order, params.Limit, params.Offset) history := []v3.RuleStateHistory{} + zap.L().Debug("rule state history query", zap.String("query", query)) err := r.db.Select(ctx, &history, query) if err != nil { zap.L().Error("Error while reading rule state history", zap.Error(err)) @@ -5294,15 +5307,43 @@ func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID( } var total uint64 + zap.L().Debug("rule state history total query", zap.String("query", fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE %s", + signozHistoryDBName, ruleStateHistoryTableName, whereClause))) err = r.db.QueryRow(ctx, fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE %s", signozHistoryDBName, ruleStateHistoryTableName, whereClause)).Scan(&total) if err != nil { return nil, err } + labelsQuery := fmt.Sprintf("SELECT DISTINCT labels FROM %s.%s WHERE rule_id = $1", + signozHistoryDBName, ruleStateHistoryTableName) + rows, err := r.db.Query(ctx, labelsQuery, ruleID) + if err != nil { + return nil, err + } + defer rows.Close() + + labelsMap := make(map[string][]string) + for rows.Next() { + var rawLabel string + err = rows.Scan(&rawLabel) + if err != nil { + return nil, err + } + label := map[string]string{} + err = json.Unmarshal([]byte(rawLabel), &label) + if err != nil { + return nil, err + } + for k, v := range label { + labelsMap[k] = append(labelsMap[k], v) + } + } + timeline := &v3.RuleStateTimeline{ - Items: history, - Total: total, + Items: history, + Total: total, + Labels: labelsMap, } return timeline, nil @@ -5315,11 +5356,13 @@ func (r *ClickHouseReader) ReadRuleStateHistoryTopContributorsByRuleID( any(labels) as labels, count(*) as count FROM %s.%s - WHERE rule_id = '%s' AND (state_changed = true) AND (state = 'firing') AND unix_milli >= %d AND unix_milli <= %d + WHERE rule_id = '%s' AND (state_changed = true) AND (state = '%s') AND unix_milli >= %d AND unix_milli <= %d GROUP BY fingerprint + HAVING labels != '{}' ORDER BY count DESC`, - signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End) + signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.Start, params.End) + zap.L().Debug("rule state history top contributors query", zap.String("query", query)) contributors := []v3.RuleStateHistoryContributor{} err := r.db.Select(ctx, &contributors, query) if err != nil { @@ -5330,7 +5373,7 @@ func (r *ClickHouseReader) ReadRuleStateHistoryTopContributorsByRuleID( return contributors, nil } -func (r *ClickHouseReader) GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateTransition, error) { +func (r *ClickHouseReader) GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.ReleStateItem, error) { tmpl := `WITH firing_events AS ( SELECT @@ -5338,7 +5381,7 @@ func (r *ClickHouseReader) GetOverallStateTransitions(ctx context.Context, ruleI state, unix_milli AS firing_time FROM %s.%s - WHERE overall_state = 'firing' + WHERE overall_state = '` + model.StateFiring.String() + `' AND overall_state_changed = true AND rule_id IN ('%s') AND unix_milli >= %d AND unix_milli <= %d @@ -5349,7 +5392,7 @@ resolution_events AS ( state, unix_milli AS resolution_time FROM %s.%s - WHERE overall_state = 'normal' + WHERE overall_state = '` + model.StateInactive.String() + `' AND overall_state_changed = true AND rule_id IN ('%s') AND unix_milli >= %d AND unix_milli <= %d @@ -5374,13 +5417,87 @@ ORDER BY firing_time ASC;` signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End, signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End) + zap.L().Debug("overall state transitions query", zap.String("query", query)) + transitions := []v3.RuleStateTransition{} err := r.db.Select(ctx, &transitions, query) if err != nil { return nil, err } - return transitions, nil + stateItems := []v3.ReleStateItem{} + + for idx, item := range transitions { + start := item.FiringTime + end := item.ResolutionTime + stateItems = append(stateItems, v3.ReleStateItem{ + State: item.State, + Start: start, + End: end, + }) + if idx < len(transitions)-1 { + nextStart := transitions[idx+1].FiringTime + if nextStart > end { + stateItems = append(stateItems, v3.ReleStateItem{ + State: model.StateInactive, + Start: end, + End: nextStart, + }) + } + } + } + + // fetch the most recent overall_state from the table + var state model.AlertState + stateQuery := fmt.Sprintf("SELECT state FROM %s.%s WHERE rule_id = '%s' AND unix_milli <= %d ORDER BY unix_milli DESC LIMIT 1", + signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.End) + if err := r.db.QueryRow(ctx, stateQuery).Scan(&state); err != nil { + if err != sql.ErrNoRows { + return nil, err + } + state = model.StateInactive + } + + if len(transitions) == 0 { + // no transitions found, it is either firing or inactive for whole time range + stateItems = append(stateItems, v3.ReleStateItem{ + State: state, + Start: params.Start, + End: params.End, + }) + } else { + // there were some transitions, we need to add the last state at the end + if state == model.StateInactive { + stateItems = append(stateItems, v3.ReleStateItem{ + State: model.StateInactive, + Start: transitions[len(transitions)-1].ResolutionTime, + End: params.End, + }) + } else { + // fetch the most recent firing event from the table in the given time range + var firingTime int64 + firingQuery := fmt.Sprintf(` + SELECT + unix_milli + FROM %s.%s + WHERE rule_id = '%s' AND overall_state_changed = true AND overall_state = '%s' AND unix_milli <= %d + ORDER BY unix_milli DESC LIMIT 1`, signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.End) + if err := r.db.QueryRow(ctx, firingQuery).Scan(&firingTime); err != nil { + return nil, err + } + stateItems = append(stateItems, v3.ReleStateItem{ + State: model.StateInactive, + Start: transitions[len(transitions)-1].ResolutionTime, + End: firingTime, + }) + stateItems = append(stateItems, v3.ReleStateItem{ + State: model.StateFiring, + Start: firingTime, + End: params.End, + }) + } + } + return stateItems, nil } func (r *ClickHouseReader) GetAvgResolutionTime(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (float64, error) { @@ -5392,7 +5509,7 @@ WITH firing_events AS ( state, unix_milli AS firing_time FROM %s.%s - WHERE overall_state = 'firing' + WHERE overall_state = '` + model.StateFiring.String() + `' AND overall_state_changed = true AND rule_id IN ('%s') AND unix_milli >= %d AND unix_milli <= %d @@ -5403,7 +5520,7 @@ resolution_events AS ( state, unix_milli AS resolution_time FROM %s.%s - WHERE overall_state = 'normal' + WHERE overall_state = '` + model.StateInactive.String() + `' AND overall_state_changed = true AND rule_id IN ('%s') AND unix_milli >= %d AND unix_milli <= %d @@ -5428,6 +5545,7 @@ FROM matched_events; signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End, signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End) + zap.L().Debug("avg resolution time query", zap.String("query", query)) var avgResolutionTime float64 err := r.db.QueryRow(ctx, query).Scan(&avgResolutionTime) if err != nil { @@ -5448,7 +5566,7 @@ WITH firing_events AS ( state, unix_milli AS firing_time FROM %s.%s - WHERE overall_state = 'firing' + WHERE overall_state = '` + model.StateFiring.String() + `' AND overall_state_changed = true AND rule_id IN ('%s') AND unix_milli >= %d AND unix_milli <= %d @@ -5459,7 +5577,7 @@ resolution_events AS ( state, unix_milli AS resolution_time FROM %s.%s - WHERE overall_state = 'normal' + WHERE overall_state = '` + model.StateInactive.String() + `' AND overall_state_changed = true AND rule_id IN ('%s') AND unix_milli >= %d AND unix_milli <= %d @@ -5485,6 +5603,7 @@ ORDER BY ts ASC;` signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End, signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End, step) + zap.L().Debug("avg resolution time by interval query", zap.String("query", query)) result, err := r.GetTimeSeriesResultV3(ctx, query) if err != nil || len(result) == 0 { return nil, err @@ -5494,10 +5613,11 @@ ORDER BY ts ASC;` } func (r *ClickHouseReader) GetTotalTriggers(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (uint64, error) { - query := fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = 'firing') AND unix_milli >= %d AND unix_milli <= %d", - signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End) + query := fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = '%s') AND unix_milli >= %d AND unix_milli <= %d", + signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.Start, params.End) var totalTriggers uint64 + err := r.db.QueryRow(ctx, query).Scan(&totalTriggers) if err != nil { return 0, err @@ -5509,8 +5629,8 @@ func (r *ClickHouseReader) GetTotalTriggers(ctx context.Context, ruleID string, func (r *ClickHouseReader) GetTriggersByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error) { step := common.MinAllowedStepInterval(params.Start, params.End) - query := fmt.Sprintf("SELECT count(*), toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = 'firing') AND unix_milli >= %d AND unix_milli <= %d GROUP BY ts ORDER BY ts ASC", - step, signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End) + query := fmt.Sprintf("SELECT count(*), toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = '%s') AND unix_milli >= %d AND unix_milli <= %d GROUP BY ts ORDER BY ts ASC", + step, signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.Start, params.End) result, err := r.GetTimeSeriesResultV3(ctx, query) if err != nil || len(result) == 0 { diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 1210cd4f67..957ea5aaff 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -746,34 +746,12 @@ func (aH *APIHandler) getOverallStateTransitions(w http.ResponseWriter, r *http. return } - res, err := aH.reader.GetOverallStateTransitions(r.Context(), ruleID, ¶ms) + stateItems, err := aH.reader.GetOverallStateTransitions(r.Context(), ruleID, ¶ms) if err != nil { RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) return } - stateItems := []v3.ReleStateItem{} - - for idx, item := range res { - start := item.FiringTime - end := item.ResolutionTime - stateItems = append(stateItems, v3.ReleStateItem{ - State: item.State, - Start: start, - End: end, - }) - if idx < len(res)-1 { - nextStart := res[idx+1].FiringTime - if nextStart > end { - stateItems = append(stateItems, v3.ReleStateItem{ - State: "normal", - Start: end, - End: nextStart, - }) - } - } - } - aH.Respond(w, stateItems) } diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index f275579104..cfb4f9159e 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -109,13 +109,15 @@ type Reader interface { GetMetricMetadata(context.Context, string, string) (*v3.MetricMetadataResponse, error) AddRuleStateHistory(ctx context.Context, ruleStateHistory []v3.RuleStateHistory) error - GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateTransition, error) + GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.ReleStateItem, error) ReadRuleStateHistoryByRuleID(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.RuleStateTimeline, error) GetTotalTriggers(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (uint64, error) GetTriggersByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error) GetAvgResolutionTime(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (float64, error) GetAvgResolutionTimeByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error) ReadRuleStateHistoryTopContributorsByRuleID(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateHistoryContributor, error) + GetLastSavedRuleStateHistory(ctx context.Context, ruleID string) ([]v3.RuleStateHistory, error) + GetMinAndMaxTimestampForTraceID(ctx context.Context, traceID []string) (int64, int64, error) // Query Progress tracking helpers. diff --git a/pkg/query-service/migrate/migate.go b/pkg/query-service/migrate/migate.go index 60e65d6d72..2db7243f58 100644 --- a/pkg/query-service/migrate/migate.go +++ b/pkg/query-service/migrate/migate.go @@ -60,7 +60,7 @@ func ClickHouseMigrate(conn driver.Conn, cluster string) error { database := "CREATE DATABASE IF NOT EXISTS signoz_analytics ON CLUSTER %s" - localTable := `CREATE TABLE IF NOT EXISTS signoz_analytics.rule_state_history ON CLUSTER %s + localTable := `CREATE TABLE IF NOT EXISTS signoz_analytics.rule_state_history_v0 ON CLUSTER %s ( _retention_days UInt32 DEFAULT 180, rule_id LowCardinality(String), @@ -80,7 +80,7 @@ ORDER BY (rule_id, unix_milli) TTL toDateTime(unix_milli / 1000) + toIntervalDay(_retention_days) SETTINGS ttl_only_drop_parts = 1, index_granularity = 8192` - distributedTable := `CREATE TABLE IF NOT EXISTS signoz_analytics.distributed_rule_state_history ON CLUSTER %s + distributedTable := `CREATE TABLE IF NOT EXISTS signoz_analytics.distributed_rule_state_history_v0 ON CLUSTER %s ( rule_id LowCardinality(String), rule_name LowCardinality(String), @@ -93,7 +93,7 @@ SETTINGS ttl_only_drop_parts = 1, index_granularity = 8192` value Float64 CODEC(Gorilla, ZSTD(1)), labels String CODEC(ZSTD(5)), ) -ENGINE = Distributed(%s, signoz_analytics, rule_state_history, cityHash64(rule_id, rule_name, fingerprint))` +ENGINE = Distributed(%s, signoz_analytics, rule_state_history_v0, cityHash64(rule_id, rule_name, fingerprint))` // check if db exists dbExists := `SELECT count(*) FROM system.databases WHERE name = 'signoz_analytics'` @@ -111,7 +111,7 @@ ENGINE = Distributed(%s, signoz_analytics, rule_state_history, cityHash64(rule_i } // check if table exists - tableExists := `SELECT count(*) FROM system.tables WHERE name = 'rule_state_history' AND database = 'signoz_analytics'` + tableExists := `SELECT count(*) FROM system.tables WHERE name = 'rule_state_history_v0' AND database = 'signoz_analytics'` var tableCount uint64 err = conn.QueryRow(context.Background(), tableExists).Scan(&tableCount) if err != nil { @@ -126,7 +126,7 @@ ENGINE = Distributed(%s, signoz_analytics, rule_state_history, cityHash64(rule_i } // check if distributed table exists - distributedTableExists := `SELECT count(*) FROM system.tables WHERE name = 'distributed_rule_state_history' AND database = 'signoz_analytics'` + distributedTableExists := `SELECT count(*) FROM system.tables WHERE name = 'distributed_rule_state_history_v0' AND database = 'signoz_analytics'` var distributedTableCount uint64 err = conn.QueryRow(context.Background(), distributedTableExists).Scan(&distributedTableCount) if err != nil { diff --git a/pkg/query-service/model/alerting.go b/pkg/query-service/model/alerting.go new file mode 100644 index 0000000000..c065fdfae6 --- /dev/null +++ b/pkg/query-service/model/alerting.go @@ -0,0 +1,90 @@ +package model + +import ( + "database/sql/driver" + "encoding/json" + + "github.com/pkg/errors" +) + +// AlertState denotes the state of an active alert. +type AlertState int + +const ( + StateInactive AlertState = iota + StatePending + StateFiring + StateNoData + StateDisabled +) + +func (s AlertState) String() string { + switch s { + case StateInactive: + return "inactive" + case StatePending: + return "pending" + case StateFiring: + return "firing" + case StateNoData: + return "nodata" + case StateDisabled: + return "disabled" + } + panic(errors.Errorf("unknown alert state: %d", s)) +} + +func (s AlertState) MarshalJSON() ([]byte, error) { + return json.Marshal(s.String()) +} + +func (s *AlertState) UnmarshalJSON(b []byte) error { + var v interface{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + switch value := v.(type) { + case string: + switch value { + case "inactive": + *s = StateInactive + case "pending": + *s = StatePending + case "firing": + *s = StateFiring + case "nodata": + *s = StateNoData + case "disabled": + *s = StateDisabled + default: + return errors.New("invalid alert state") + } + return nil + default: + return errors.New("invalid alert state") + } +} + +func (s *AlertState) Scan(value interface{}) error { + v, ok := value.(string) + if !ok { + return errors.New("invalid alert state") + } + switch v { + case "inactive": + *s = StateInactive + case "pending": + *s = StatePending + case "firing": + *s = StateFiring + case "nodata": + *s = StateNoData + case "disabled": + *s = StateDisabled + } + return nil +} + +func (s *AlertState) Value() (driver.Value, error) { + return s.String(), nil +} diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index b0e786a6d6..dbc0092ea8 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -1182,23 +1182,24 @@ func (l LabelsString) String() string { } type RuleStateTimeline struct { - Items []RuleStateHistory `json:"items"` - Total uint64 `json:"total"` + Items []RuleStateHistory `json:"items"` + Total uint64 `json:"total"` + Labels map[string][]string `json:"labels"` } type RuleStateHistory struct { RuleID string `json:"ruleID" ch:"rule_id"` RuleName string `json:"ruleName" ch:"rule_name"` // One of ["normal", "firing"] - OverallState string `json:"overallState" ch:"overall_state"` - OverallStateChanged bool `json:"overallStateChanged" ch:"overall_state_changed"` + OverallState model.AlertState `json:"overallState" ch:"overall_state"` + OverallStateChanged bool `json:"overallStateChanged" ch:"overall_state_changed"` // One of ["normal", "firing", "no_data", "muted"] - State string `json:"state" ch:"state"` - StateChanged bool `json:"stateChanged" ch:"state_changed"` - UnixMilli int64 `json:"unixMilli" ch:"unix_milli"` - Labels LabelsString `json:"labels" ch:"labels"` - Fingerprint uint64 `json:"fingerprint" ch:"fingerprint"` - Value float64 `json:"value" ch:"value"` + State model.AlertState `json:"state" ch:"state"` + StateChanged bool `json:"stateChanged" ch:"state_changed"` + UnixMilli int64 `json:"unixMilli" ch:"unix_milli"` + Labels LabelsString `json:"labels" ch:"labels"` + Fingerprint uint64 `json:"fingerprint" ch:"fingerprint"` + Value float64 `json:"value" ch:"value"` RelatedTracesLink string `json:"relatedTracesLink"` RelatedLogsLink string `json:"relatedLogsLink"` @@ -1236,16 +1237,16 @@ type RuleStateHistoryContributor struct { } type RuleStateTransition struct { - RuleID string `json:"ruleID" ch:"rule_id"` - State string `json:"state" ch:"state"` - FiringTime int64 `json:"firingTime" ch:"firing_time"` - ResolutionTime int64 `json:"resolutionTime" ch:"resolution_time"` + RuleID string `json:"ruleID" ch:"rule_id"` + State model.AlertState `json:"state" ch:"state"` + FiringTime int64 `json:"firingTime" ch:"firing_time"` + ResolutionTime int64 `json:"resolutionTime" ch:"resolution_time"` } type ReleStateItem struct { - State string `json:"state"` - Start int64 `json:"start"` - End int64 `json:"end"` + State model.AlertState `json:"state"` + Start int64 `json:"start"` + End int64 `json:"end"` } type Stats struct { diff --git a/pkg/query-service/rules/alerting.go b/pkg/query-service/rules/alerting.go index 7c7fb40ed6..f6826ed3d8 100644 --- a/pkg/query-service/rules/alerting.go +++ b/pkg/query-service/rules/alerting.go @@ -8,6 +8,7 @@ import ( "time" "github.com/pkg/errors" + "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/utils/labels" ) @@ -37,61 +38,8 @@ const ( HealthBad RuleHealth = "err" ) -// AlertState denotes the state of an active alert. -type AlertState int - -const ( - StateInactive AlertState = iota - StatePending - StateFiring - StateDisabled -) - -func (s AlertState) String() string { - switch s { - case StateInactive: - return "inactive" - case StatePending: - return "pending" - case StateFiring: - return "firing" - case StateDisabled: - return "disabled" - } - panic(errors.Errorf("unknown alert state: %d", s)) -} - -func (s AlertState) MarshalJSON() ([]byte, error) { - return json.Marshal(s.String()) -} - -func (s *AlertState) UnmarshalJSON(b []byte) error { - var v interface{} - if err := json.Unmarshal(b, &v); err != nil { - return err - } - switch value := v.(type) { - case string: - switch value { - case "inactive": - *s = StateInactive - case "pending": - *s = StatePending - case "firing": - *s = StateFiring - case "disabled": - *s = StateDisabled - default: - return errors.New("invalid alert state") - } - return nil - default: - return errors.New("invalid alert state") - } -} - type Alert struct { - State AlertState + State model.AlertState Labels labels.BaseLabels Annotations labels.BaseLabels @@ -114,7 +62,7 @@ type Alert struct { } func (a *Alert) needsSending(ts time.Time, resendDelay time.Duration) bool { - if a.State == StatePending { + if a.State == model.StatePending { return false } diff --git a/pkg/query-service/rules/api_params.go b/pkg/query-service/rules/api_params.go index d460d39973..6d3288ece1 100644 --- a/pkg/query-service/rules/api_params.go +++ b/pkg/query-service/rules/api_params.go @@ -259,8 +259,8 @@ type GettableRules struct { // GettableRule has info for an alerting rules. type GettableRule struct { - Id string `json:"id"` - State AlertState `json:"state"` + Id string `json:"id"` + State model.AlertState `json:"state"` PostableRule CreatedAt *time.Time `json:"createAt"` CreatedBy *string `json:"createBy"` diff --git a/pkg/query-service/rules/manager.go b/pkg/query-service/rules/manager.go index 768c753cb8..37fe39adba 100644 --- a/pkg/query-service/rules/manager.go +++ b/pkg/query-service/rules/manager.go @@ -617,7 +617,7 @@ func (m *Manager) ListRuleStates(ctx context.Context) (*GettableRules, error) { // fetch state of rule from memory if rm, ok := m.rules[ruleResponse.Id]; !ok { - ruleResponse.State = StateDisabled + ruleResponse.State = model.StateDisabled ruleResponse.Disabled = true } else { ruleResponse.State = rm.State() @@ -644,7 +644,7 @@ func (m *Manager) GetRule(ctx context.Context, id string) (*GettableRule, error) r.Id = fmt.Sprintf("%d", s.Id) // fetch state of rule from memory if rm, ok := m.rules[r.Id]; !ok { - r.State = StateDisabled + r.State = model.StateDisabled r.Disabled = true } else { r.State = rm.State() @@ -751,7 +751,7 @@ func (m *Manager) PatchRule(ctx context.Context, ruleStr string, ruleId string) // fetch state of rule from memory if rm, ok := m.rules[ruleId]; !ok { - response.State = StateDisabled + response.State = model.StateDisabled response.Disabled = true } else { response.State = rm.State() diff --git a/pkg/query-service/rules/prom_rule.go b/pkg/query-service/rules/prom_rule.go index a9890a9503..2241d32a4b 100644 --- a/pkg/query-service/rules/prom_rule.go +++ b/pkg/query-service/rules/prom_rule.go @@ -15,6 +15,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/converter" "go.signoz.io/signoz/pkg/query-service/formatter" "go.signoz.io/signoz/pkg/query-service/interfaces" + "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" qslabels "go.signoz.io/signoz/pkg/query-service/utils/labels" "go.signoz.io/signoz/pkg/query-service/utils/times" @@ -56,6 +57,8 @@ type PromRule struct { opts PromRuleOpts reader interfaces.Reader + + handledRestart bool } func NewPromRule( @@ -218,9 +221,9 @@ func (r *PromRule) GetEvaluationTimestamp() time.Time { // State returns the maximum state of alert instances for this rule. // StateFiring > StatePending > StateInactive -func (r *PromRule) State() AlertState { +func (r *PromRule) State() model.AlertState { - maxState := StateInactive + maxState := model.StateInactive for _, a := range r.active { if a.State > maxState { maxState = a.State @@ -338,6 +341,102 @@ func (r *PromRule) compareOp() CompareOp { return r.ruleCondition.CompareOp } +// TODO(srikanthccv): implement base rule and use for all types of rules +func (r *PromRule) recordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []v3.RuleStateHistory) error { + zap.L().Debug("recording rule state history", zap.String("ruleid", r.ID()), zap.Any("prevState", prevState), zap.Any("currentState", currentState), zap.Any("itemsToAdd", itemsToAdd)) + revisedItemsToAdd := map[uint64]v3.RuleStateHistory{} + + lastSavedState, err := r.reader.GetLastSavedRuleStateHistory(ctx, r.ID()) + if err != nil { + return err + } + // if the query-service has been restarted, or the rule has been modified (which re-initializes the rule), + // the state would reset so we need to add the corresponding state changes to previously saved states + if !r.handledRestart && len(lastSavedState) > 0 { + zap.L().Debug("handling restart", zap.String("ruleid", r.ID()), zap.Any("lastSavedState", lastSavedState)) + l := map[uint64]v3.RuleStateHistory{} + for _, item := range itemsToAdd { + l[item.Fingerprint] = item + } + + shouldSkip := map[uint64]bool{} + + for _, item := range lastSavedState { + // for the last saved item with fingerprint, check if there is a corresponding entry in the current state + currentState, ok := l[item.Fingerprint] + if !ok { + // there was a state change in the past, but not in the current state + // if the state was firing, then we should add a resolved state change + if item.State == model.StateFiring || item.State == model.StateNoData { + item.State = model.StateInactive + item.StateChanged = true + item.UnixMilli = time.Now().UnixMilli() + revisedItemsToAdd[item.Fingerprint] = item + } + // there is nothing to do if the prev state was normal + } else { + if item.State != currentState.State { + item.State = currentState.State + item.StateChanged = true + item.UnixMilli = time.Now().UnixMilli() + revisedItemsToAdd[item.Fingerprint] = item + } + } + // do not add this item to revisedItemsToAdd as it is already processed + shouldSkip[item.Fingerprint] = true + } + zap.L().Debug("after lastSavedState loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) + + // if there are any new state changes that were not saved, add them to the revised items + for _, item := range itemsToAdd { + if _, ok := revisedItemsToAdd[item.Fingerprint]; !ok && !shouldSkip[item.Fingerprint] { + revisedItemsToAdd[item.Fingerprint] = item + } + } + zap.L().Debug("after itemsToAdd loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) + + newState := model.StateInactive + for _, item := range revisedItemsToAdd { + if item.State == model.StateFiring || item.State == model.StateNoData { + newState = model.StateFiring + break + } + } + zap.L().Debug("newState", zap.String("ruleid", r.ID()), zap.Any("newState", newState)) + + // if there is a change in the overall state, update the overall state + if lastSavedState[0].OverallState != newState { + for fingerprint, item := range revisedItemsToAdd { + item.OverallState = newState + item.OverallStateChanged = true + revisedItemsToAdd[fingerprint] = item + } + } + zap.L().Debug("revisedItemsToAdd after newState", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) + + } else { + for _, item := range itemsToAdd { + revisedItemsToAdd[item.Fingerprint] = item + } + } + + if len(revisedItemsToAdd) > 0 && r.reader != nil { + zap.L().Debug("writing rule state history", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) + + entries := make([]v3.RuleStateHistory, 0, len(revisedItemsToAdd)) + for _, item := range revisedItemsToAdd { + entries = append(entries, item) + } + err := r.reader.AddRuleStateHistory(ctx, entries) + if err != nil { + zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd)) + } + } + r.handledRestart = true + + return nil +} + func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (interface{}, error) { prevState := r.State() @@ -442,7 +541,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( QueryResultLables: resultLabels, Annotations: annotations, ActiveAt: ts, - State: StatePending, + State: model.StatePending, Value: alertSmpl.F, GeneratorURL: r.GeneratorURL(), Receivers: r.preferredChannels, @@ -454,7 +553,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( for h, a := range alerts { // Check whether we already have alerting state for the identifying label set. // Update the last value and annotations if so, create a new alert entry otherwise. - if alert, ok := r.active[h]; ok && alert.State != StateInactive { + if alert, ok := r.active[h]; ok && alert.State != model.StateInactive { alert.Value = a.Value alert.Annotations = a.Annotations alert.Receivers = r.preferredChannels @@ -469,23 +568,23 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( // Check if any pending alerts should be removed or fire now. Write out alert timeseries. for fp, a := range r.active { - labelsJSON, err := json.Marshal(a.Labels) + labelsJSON, err := json.Marshal(a.QueryResultLables) if err != nil { zap.L().Error("error marshaling labels", zap.Error(err), zap.String("name", r.Name())) } if _, ok := resultFPs[fp]; !ok { // If the alert was previously firing, keep it around for a given // retention time so it is reported as resolved to the AlertManager. - if a.State == StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) { + if a.State == model.StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) { delete(r.active, fp) } - if a.State != StateInactive { - a.State = StateInactive + if a.State != model.StateInactive { + a.State = model.StateInactive a.ResolvedAt = ts itemsToAdd = append(itemsToAdd, v3.RuleStateHistory{ RuleID: r.ID(), RuleName: r.Name(), - State: "normal", + State: model.StateInactive, StateChanged: true, UnixMilli: ts.UnixMilli(), Labels: v3.LabelsString(labelsJSON), @@ -495,12 +594,12 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( continue } - if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration { - a.State = StateFiring + if a.State == model.StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration { + a.State = model.StateFiring a.FiredAt = ts - state := "firing" + state := model.StateFiring if a.Missing { - state = "no_data" + state = model.StateNoData } itemsToAdd = append(itemsToAdd, v3.RuleStateHistory{ RuleID: r.ID(), @@ -520,23 +619,14 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( currentState := r.State() - if currentState != prevState { - for idx := range itemsToAdd { - if currentState == StateInactive { - itemsToAdd[idx].OverallState = "normal" - } else { - itemsToAdd[idx].OverallState = currentState.String() - } - itemsToAdd[idx].OverallStateChanged = true - } + overallStateChanged := currentState != prevState + for idx, item := range itemsToAdd { + item.OverallStateChanged = overallStateChanged + item.OverallState = currentState + itemsToAdd[idx] = item } - if len(itemsToAdd) > 0 && r.reader != nil { - err := r.reader.AddRuleStateHistory(ctx, itemsToAdd) - if err != nil { - zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd)) - } - } + r.recordRuleStateHistory(ctx, prevState, currentState, itemsToAdd) return len(r.active), nil } diff --git a/pkg/query-service/rules/prom_rule_task.go b/pkg/query-service/rules/prom_rule_task.go index f2f11cd494..820b52aeb0 100644 --- a/pkg/query-service/rules/prom_rule_task.go +++ b/pkg/query-service/rules/prom_rule_task.go @@ -192,7 +192,7 @@ func (g *PromRuleTask) HasAlertingRules() bool { defer g.mtx.Unlock() for _, rule := range g.rules { - if _, ok := rule.(*ThresholdRule); ok { + if _, ok := rule.(*PromRule); ok { return true } } @@ -284,11 +284,11 @@ func (g *PromRuleTask) CopyState(fromTask Task) error { g.seriesInPreviousEval[i] = from.seriesInPreviousEval[fi] ruleMap[nameAndLabels] = indexes[1:] - ar, ok := rule.(*ThresholdRule) + ar, ok := rule.(*PromRule) if !ok { continue } - far, ok := from.rules[fi].(*ThresholdRule) + far, ok := from.rules[fi].(*PromRule) if !ok { continue } @@ -296,6 +296,7 @@ func (g *PromRuleTask) CopyState(fromTask Task) error { for fp, a := range far.active { ar.active[fp] = a } + ar.handledRestart = far.handledRestart } // Handle deleted and unmatched duplicate rules. diff --git a/pkg/query-service/rules/rule.go b/pkg/query-service/rules/rule.go index 8228f70c8f..eeb7de9066 100644 --- a/pkg/query-service/rules/rule.go +++ b/pkg/query-service/rules/rule.go @@ -4,6 +4,7 @@ import ( "context" "time" + "go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/utils/labels" ) @@ -17,7 +18,7 @@ type Rule interface { Labels() labels.BaseLabels Annotations() labels.BaseLabels Condition() *RuleCondition - State() AlertState + State() model.AlertState ActiveAlerts() []*Alert PreferredChannels() []string diff --git a/pkg/query-service/rules/rule_task.go b/pkg/query-service/rules/rule_task.go index eb657c9f7c..313751dec3 100644 --- a/pkg/query-service/rules/rule_task.go +++ b/pkg/query-service/rules/rule_task.go @@ -288,6 +288,7 @@ func (g *RuleTask) CopyState(fromTask Task) error { for fp, a := range far.active { ar.active[fp] = a } + ar.handledRestart = far.handledRestart } return nil diff --git a/pkg/query-service/rules/threshold_rule.go b/pkg/query-service/rules/threshold_rule.go index e657af9288..e50fb4b761 100644 --- a/pkg/query-service/rules/threshold_rule.go +++ b/pkg/query-service/rules/threshold_rule.go @@ -20,6 +20,7 @@ import ( "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/converter" + "go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/postprocess" "go.signoz.io/signoz/pkg/query-service/app/querier" @@ -100,6 +101,8 @@ type ThresholdRule struct { reader interfaces.Reader evalDelay time.Duration + + handledRestart bool } type ThresholdRuleOpts struct { @@ -309,9 +312,9 @@ func (r *ThresholdRule) GetEvaluationTimestamp() time.Time { // State returns the maximum state of alert instances for this rule. // StateFiring > StatePending > StateInactive -func (r *ThresholdRule) State() AlertState { +func (r *ThresholdRule) State() model.AlertState { - maxState := StateInactive + maxState := model.StateInactive for _, a := range r.active { if a.State > maxState { maxState = a.State @@ -898,6 +901,102 @@ func normalizeLabelName(name string) string { return normalized } +// TODO(srikanthccv): implement base rule and use for all types of rules +func (r *ThresholdRule) recordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []v3.RuleStateHistory) error { + zap.L().Debug("recording rule state history", zap.String("ruleid", r.ID()), zap.Any("prevState", prevState), zap.Any("currentState", currentState), zap.Any("itemsToAdd", itemsToAdd)) + revisedItemsToAdd := map[uint64]v3.RuleStateHistory{} + + lastSavedState, err := r.reader.GetLastSavedRuleStateHistory(ctx, r.ID()) + if err != nil { + return err + } + // if the query-service has been restarted, or the rule has been modified (which re-initializes the rule), + // the state would reset so we need to add the corresponding state changes to previously saved states + if !r.handledRestart && len(lastSavedState) > 0 { + zap.L().Debug("handling restart", zap.String("ruleid", r.ID()), zap.Any("lastSavedState", lastSavedState)) + l := map[uint64]v3.RuleStateHistory{} + for _, item := range itemsToAdd { + l[item.Fingerprint] = item + } + + shouldSkip := map[uint64]bool{} + + for _, item := range lastSavedState { + // for the last saved item with fingerprint, check if there is a corresponding entry in the current state + currentState, ok := l[item.Fingerprint] + if !ok { + // there was a state change in the past, but not in the current state + // if the state was firing, then we should add a resolved state change + if item.State == model.StateFiring || item.State == model.StateNoData { + item.State = model.StateInactive + item.StateChanged = true + item.UnixMilli = time.Now().UnixMilli() + revisedItemsToAdd[item.Fingerprint] = item + } + // there is nothing to do if the prev state was normal + } else { + if item.State != currentState.State { + item.State = currentState.State + item.StateChanged = true + item.UnixMilli = time.Now().UnixMilli() + revisedItemsToAdd[item.Fingerprint] = item + } + } + // do not add this item to revisedItemsToAdd as it is already processed + shouldSkip[item.Fingerprint] = true + } + zap.L().Debug("after lastSavedState loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) + + // if there are any new state changes that were not saved, add them to the revised items + for _, item := range itemsToAdd { + if _, ok := revisedItemsToAdd[item.Fingerprint]; !ok && !shouldSkip[item.Fingerprint] { + revisedItemsToAdd[item.Fingerprint] = item + } + } + zap.L().Debug("after itemsToAdd loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) + + newState := model.StateInactive + for _, item := range revisedItemsToAdd { + if item.State == model.StateFiring || item.State == model.StateNoData { + newState = model.StateFiring + break + } + } + zap.L().Debug("newState", zap.String("ruleid", r.ID()), zap.Any("newState", newState)) + + // if there is a change in the overall state, update the overall state + if lastSavedState[0].OverallState != newState { + for fingerprint, item := range revisedItemsToAdd { + item.OverallState = newState + item.OverallStateChanged = true + revisedItemsToAdd[fingerprint] = item + } + } + zap.L().Debug("revisedItemsToAdd after newState", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) + + } else { + for _, item := range itemsToAdd { + revisedItemsToAdd[item.Fingerprint] = item + } + } + + if len(revisedItemsToAdd) > 0 && r.reader != nil { + zap.L().Debug("writing rule state history", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) + + entries := make([]v3.RuleStateHistory, 0, len(revisedItemsToAdd)) + for _, item := range revisedItemsToAdd { + entries = append(entries, item) + } + err := r.reader.AddRuleStateHistory(ctx, entries) + if err != nil { + zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd)) + } + } + r.handledRestart = true + + return nil +} + func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (interface{}, error) { prevState := r.State() @@ -1005,7 +1104,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie QueryResultLables: resultLabels, Annotations: annotations, ActiveAt: ts, - State: StatePending, + State: model.StatePending, Value: smpl.V, GeneratorURL: r.GeneratorURL(), Receivers: r.preferredChannels, @@ -1019,7 +1118,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie for h, a := range alerts { // Check whether we already have alerting state for the identifying label set. // Update the last value and annotations if so, create a new alert entry otherwise. - if alert, ok := r.active[h]; ok && alert.State != StateInactive { + if alert, ok := r.active[h]; ok && alert.State != model.StateInactive { alert.Value = a.Value alert.Annotations = a.Annotations @@ -1042,31 +1141,32 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie if _, ok := resultFPs[fp]; !ok { // If the alert was previously firing, keep it around for a given // retention time so it is reported as resolved to the AlertManager. - if a.State == StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) { + if a.State == model.StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) { delete(r.active, fp) } - if a.State != StateInactive { - a.State = StateInactive + if a.State != model.StateInactive { + a.State = model.StateInactive a.ResolvedAt = ts itemsToAdd = append(itemsToAdd, v3.RuleStateHistory{ RuleID: r.ID(), RuleName: r.Name(), - State: "normal", + State: model.StateInactive, StateChanged: true, UnixMilli: ts.UnixMilli(), Labels: v3.LabelsString(labelsJSON), Fingerprint: a.QueryResultLables.Hash(), + Value: a.Value, }) } continue } - if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration { - a.State = StateFiring + if a.State == model.StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration { + a.State = model.StateFiring a.FiredAt = ts - state := "firing" + state := model.StateFiring if a.Missing { - state = "no_data" + state = model.StateNoData } itemsToAdd = append(itemsToAdd, v3.RuleStateHistory{ RuleID: r.ID(), @@ -1083,28 +1183,15 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie currentState := r.State() - if currentState != prevState { - for idx := range itemsToAdd { - if currentState == StateInactive { - itemsToAdd[idx].OverallState = "normal" - } else { - itemsToAdd[idx].OverallState = currentState.String() - } - itemsToAdd[idx].OverallStateChanged = true - } - } else { - for idx := range itemsToAdd { - itemsToAdd[idx].OverallState = currentState.String() - itemsToAdd[idx].OverallStateChanged = false - } + overallStateChanged := currentState != prevState + for idx, item := range itemsToAdd { + item.OverallStateChanged = overallStateChanged + item.OverallState = currentState + itemsToAdd[idx] = item } - if len(itemsToAdd) > 0 && r.reader != nil { - err := r.reader.AddRuleStateHistory(ctx, itemsToAdd) - if err != nil { - zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd)) - } - } + r.recordRuleStateHistory(ctx, prevState, currentState, itemsToAdd) + r.health = HealthGood r.lastError = err