Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: alert state change and overall status #5845

Merged
merged 10 commits into from
Sep 9, 2024
154 changes: 137 additions & 17 deletions pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const (
archiveNamespace = "clickhouse-archive"
signozTraceDBName = "signoz_traces"
signozHistoryDBName = "signoz_analytics"
ruleStateHistoryTableName = "distributed_rule_state_history"
ruleStateHistoryTableName = "distributed_rule_state_history_v0"
signozDurationMVTable = "distributed_durationSort"
signozUsageExplorerTable = "distributed_usage_explorer"
signozSpansTable = "distributed_signoz_spans"
Expand Down Expand Up @@ -5222,6 +5222,18 @@ func (r *ClickHouseReader) AddRuleStateHistory(ctx context.Context, ruleStateHis
return nil
}

func (r *ClickHouseReader) GetLastSavedRuleStateHistory(ctx context.Context, ruleID string) ([]v3.RuleStateHistory, error) {
query := fmt.Sprintf("SELECT * FROM %s.%s WHERE rule_id = '%s' AND state_changed = true ORDER BY unix_milli DESC LIMIT 1 BY fingerprint",
signozHistoryDBName, ruleStateHistoryTableName, ruleID)

history := []v3.RuleStateHistory{}
err := r.db.Select(ctx, &history, query)
if err != nil {
return nil, err
}
return history, nil
}

func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID(
ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.RuleStateTimeline, error) {

Expand Down Expand Up @@ -5287,22 +5299,51 @@ func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID(
signozHistoryDBName, ruleStateHistoryTableName, whereClause, params.Order, params.Limit, params.Offset)

history := []v3.RuleStateHistory{}
zap.L().Debug("rule state history query", zap.String("query", query))
err := r.db.Select(ctx, &history, query)
if err != nil {
zap.L().Error("Error while reading rule state history", zap.Error(err))
return nil, err
}

var total uint64
zap.L().Debug("rule state history total query", zap.String("query", fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE %s",
signozHistoryDBName, ruleStateHistoryTableName, whereClause)))
err = r.db.QueryRow(ctx, fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE %s",
signozHistoryDBName, ruleStateHistoryTableName, whereClause)).Scan(&total)
if err != nil {
return nil, err
}

labelsQuery := fmt.Sprintf("SELECT DISTINCT labels FROM %s.%s WHERE rule_id = $1",
signozHistoryDBName, ruleStateHistoryTableName)
rows, err := r.db.Query(ctx, labelsQuery, ruleID)
if err != nil {
return nil, err
}
defer rows.Close()

labelsMap := make(map[string][]string)
for rows.Next() {
var rawLabel string
err = rows.Scan(&rawLabel)
if err != nil {
return nil, err
}
label := map[string]string{}
err = json.Unmarshal([]byte(rawLabel), &label)
if err != nil {
return nil, err
}
for k, v := range label {
labelsMap[k] = append(labelsMap[k], v)
}
}

timeline := &v3.RuleStateTimeline{
Items: history,
Total: total,
Items: history,
Total: total,
Labels: labelsMap,
}

return timeline, nil
Expand All @@ -5315,11 +5356,13 @@ func (r *ClickHouseReader) ReadRuleStateHistoryTopContributorsByRuleID(
any(labels) as labels,
count(*) as count
FROM %s.%s
WHERE rule_id = '%s' AND (state_changed = true) AND (state = 'firing') AND unix_milli >= %d AND unix_milli <= %d
WHERE rule_id = '%s' AND (state_changed = true) AND (state = '%s') AND unix_milli >= %d AND unix_milli <= %d
GROUP BY fingerprint
HAVING labels != '{}'
ORDER BY count DESC`,
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)
signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.Start, params.End)

zap.L().Debug("rule state history top contributors query", zap.String("query", query))
contributors := []v3.RuleStateHistoryContributor{}
err := r.db.Select(ctx, &contributors, query)
if err != nil {
Expand All @@ -5330,15 +5373,15 @@ func (r *ClickHouseReader) ReadRuleStateHistoryTopContributorsByRuleID(
return contributors, nil
}

func (r *ClickHouseReader) GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateTransition, error) {
func (r *ClickHouseReader) GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.ReleStateItem, error) {

tmpl := `WITH firing_events AS (
SELECT
rule_id,
state,
unix_milli AS firing_time
FROM %s.%s
WHERE overall_state = 'firing'
WHERE overall_state = '` + model.StateFiring.String() + `'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
Expand All @@ -5349,7 +5392,7 @@ resolution_events AS (
state,
unix_milli AS resolution_time
FROM %s.%s
WHERE overall_state = 'normal'
WHERE overall_state = '` + model.StateInactive.String() + `'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
Expand All @@ -5374,13 +5417,87 @@ ORDER BY firing_time ASC;`
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End,
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)

zap.L().Debug("overall state transitions query", zap.String("query", query))

transitions := []v3.RuleStateTransition{}
err := r.db.Select(ctx, &transitions, query)
if err != nil {
return nil, err
}

return transitions, nil
stateItems := []v3.ReleStateItem{}

for idx, item := range transitions {
start := item.FiringTime
end := item.ResolutionTime
stateItems = append(stateItems, v3.ReleStateItem{
State: item.State,
Start: start,
End: end,
})
if idx < len(transitions)-1 {
nextStart := transitions[idx+1].FiringTime
if nextStart > end {
stateItems = append(stateItems, v3.ReleStateItem{
State: model.StateInactive,
Start: end,
End: nextStart,
})
}
}
}

// fetch the most recent overall_state from the table
var state model.AlertState
stateQuery := fmt.Sprintf("SELECT state FROM %s.%s WHERE rule_id = '%s' AND unix_milli <= %d ORDER BY unix_milli DESC LIMIT 1",
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.End)
if err := r.db.QueryRow(ctx, stateQuery).Scan(&state); err != nil {
if err != sql.ErrNoRows {
return nil, err
}
state = model.StateInactive
}

if len(transitions) == 0 {
// no transitions found, it is either firing or inactive for whole time range
stateItems = append(stateItems, v3.ReleStateItem{
State: state,
Start: params.Start,
End: params.End,
})
} else {
// there were some transitions, we need to add the last state at the end
if state == model.StateInactive {
stateItems = append(stateItems, v3.ReleStateItem{
State: model.StateInactive,
Start: transitions[len(transitions)-1].ResolutionTime,
End: params.End,
})
} else {
// fetch the most recent firing event from the table in the given time range
var firingTime int64
firingQuery := fmt.Sprintf(`
SELECT
unix_milli
FROM %s.%s
WHERE rule_id = '%s' AND overall_state_changed = true AND overall_state = '%s' AND unix_milli <= %d
ORDER BY unix_milli DESC LIMIT 1`, signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.End)
if err := r.db.QueryRow(ctx, firingQuery).Scan(&firingTime); err != nil {
return nil, err
}
stateItems = append(stateItems, v3.ReleStateItem{
State: model.StateInactive,
Start: transitions[len(transitions)-1].ResolutionTime,
End: firingTime,
})
stateItems = append(stateItems, v3.ReleStateItem{
State: model.StateFiring,
Start: firingTime,
End: params.End,
})
}
}
return stateItems, nil
}

func (r *ClickHouseReader) GetAvgResolutionTime(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (float64, error) {
Expand All @@ -5392,7 +5509,7 @@ WITH firing_events AS (
state,
unix_milli AS firing_time
FROM %s.%s
WHERE overall_state = 'firing'
WHERE overall_state = '` + model.StateFiring.String() + `'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
Expand All @@ -5403,7 +5520,7 @@ resolution_events AS (
state,
unix_milli AS resolution_time
FROM %s.%s
WHERE overall_state = 'normal'
WHERE overall_state = '` + model.StateInactive.String() + `'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
Expand All @@ -5428,6 +5545,7 @@ FROM matched_events;
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End,
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)

zap.L().Debug("avg resolution time query", zap.String("query", query))
var avgResolutionTime float64
err := r.db.QueryRow(ctx, query).Scan(&avgResolutionTime)
if err != nil {
Expand All @@ -5448,7 +5566,7 @@ WITH firing_events AS (
state,
unix_milli AS firing_time
FROM %s.%s
WHERE overall_state = 'firing'
WHERE overall_state = '` + model.StateFiring.String() + `'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
Expand All @@ -5459,7 +5577,7 @@ resolution_events AS (
state,
unix_milli AS resolution_time
FROM %s.%s
WHERE overall_state = 'normal'
WHERE overall_state = '` + model.StateInactive.String() + `'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
Expand All @@ -5485,6 +5603,7 @@ ORDER BY ts ASC;`
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End,
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End, step)

zap.L().Debug("avg resolution time by interval query", zap.String("query", query))
result, err := r.GetTimeSeriesResultV3(ctx, query)
if err != nil || len(result) == 0 {
return nil, err
Expand All @@ -5494,10 +5613,11 @@ ORDER BY ts ASC;`
}

func (r *ClickHouseReader) GetTotalTriggers(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (uint64, error) {
query := fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = 'firing') AND unix_milli >= %d AND unix_milli <= %d",
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)
query := fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = '%s') AND unix_milli >= %d AND unix_milli <= %d",
signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.Start, params.End)

var totalTriggers uint64

err := r.db.QueryRow(ctx, query).Scan(&totalTriggers)
if err != nil {
return 0, err
Expand All @@ -5509,8 +5629,8 @@ func (r *ClickHouseReader) GetTotalTriggers(ctx context.Context, ruleID string,
func (r *ClickHouseReader) GetTriggersByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error) {
step := common.MinAllowedStepInterval(params.Start, params.End)

query := fmt.Sprintf("SELECT count(*), toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = 'firing') AND unix_milli >= %d AND unix_milli <= %d GROUP BY ts ORDER BY ts ASC",
step, signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)
query := fmt.Sprintf("SELECT count(*), toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = '%s') AND unix_milli >= %d AND unix_milli <= %d GROUP BY ts ORDER BY ts ASC",
step, signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.Start, params.End)

result, err := r.GetTimeSeriesResultV3(ctx, query)
if err != nil || len(result) == 0 {
Expand Down
24 changes: 1 addition & 23 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,34 +746,12 @@ func (aH *APIHandler) getOverallStateTransitions(w http.ResponseWriter, r *http.
return
}

res, err := aH.reader.GetOverallStateTransitions(r.Context(), ruleID, &params)
stateItems, err := aH.reader.GetOverallStateTransitions(r.Context(), ruleID, &params)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}

stateItems := []v3.ReleStateItem{}

for idx, item := range res {
start := item.FiringTime
end := item.ResolutionTime
stateItems = append(stateItems, v3.ReleStateItem{
State: item.State,
Start: start,
End: end,
})
if idx < len(res)-1 {
nextStart := res[idx+1].FiringTime
if nextStart > end {
stateItems = append(stateItems, v3.ReleStateItem{
State: "normal",
Start: end,
End: nextStart,
})
}
}
}

aH.Respond(w, stateItems)
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/query-service/interfaces/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,15 @@ type Reader interface {
GetMetricMetadata(context.Context, string, string) (*v3.MetricMetadataResponse, error)

AddRuleStateHistory(ctx context.Context, ruleStateHistory []v3.RuleStateHistory) error
GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateTransition, error)
GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.ReleStateItem, error)
ReadRuleStateHistoryByRuleID(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.RuleStateTimeline, error)
GetTotalTriggers(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (uint64, error)
GetTriggersByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error)
GetAvgResolutionTime(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (float64, error)
GetAvgResolutionTimeByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error)
ReadRuleStateHistoryTopContributorsByRuleID(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateHistoryContributor, error)
GetLastSavedRuleStateHistory(ctx context.Context, ruleID string) ([]v3.RuleStateHistory, error)

GetMinAndMaxTimestampForTraceID(ctx context.Context, traceID []string) (int64, int64, error)

// Query Progress tracking helpers.
Expand Down
10 changes: 5 additions & 5 deletions pkg/query-service/migrate/migate.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func ClickHouseMigrate(conn driver.Conn, cluster string) error {

database := "CREATE DATABASE IF NOT EXISTS signoz_analytics ON CLUSTER %s"

localTable := `CREATE TABLE IF NOT EXISTS signoz_analytics.rule_state_history ON CLUSTER %s
localTable := `CREATE TABLE IF NOT EXISTS signoz_analytics.rule_state_history_v0 ON CLUSTER %s
(
_retention_days UInt32 DEFAULT 180,
rule_id LowCardinality(String),
Expand All @@ -80,7 +80,7 @@ ORDER BY (rule_id, unix_milli)
TTL toDateTime(unix_milli / 1000) + toIntervalDay(_retention_days)
SETTINGS ttl_only_drop_parts = 1, index_granularity = 8192`

distributedTable := `CREATE TABLE IF NOT EXISTS signoz_analytics.distributed_rule_state_history ON CLUSTER %s
distributedTable := `CREATE TABLE IF NOT EXISTS signoz_analytics.distributed_rule_state_history_v0 ON CLUSTER %s
(
rule_id LowCardinality(String),
rule_name LowCardinality(String),
Expand All @@ -93,7 +93,7 @@ SETTINGS ttl_only_drop_parts = 1, index_granularity = 8192`
value Float64 CODEC(Gorilla, ZSTD(1)),
labels String CODEC(ZSTD(5)),
)
ENGINE = Distributed(%s, signoz_analytics, rule_state_history, cityHash64(rule_id, rule_name, fingerprint))`
ENGINE = Distributed(%s, signoz_analytics, rule_state_history_v0, cityHash64(rule_id, rule_name, fingerprint))`

// check if db exists
dbExists := `SELECT count(*) FROM system.databases WHERE name = 'signoz_analytics'`
Expand All @@ -111,7 +111,7 @@ ENGINE = Distributed(%s, signoz_analytics, rule_state_history, cityHash64(rule_i
}

// check if table exists
tableExists := `SELECT count(*) FROM system.tables WHERE name = 'rule_state_history' AND database = 'signoz_analytics'`
tableExists := `SELECT count(*) FROM system.tables WHERE name = 'rule_state_history_v0' AND database = 'signoz_analytics'`
var tableCount uint64
err = conn.QueryRow(context.Background(), tableExists).Scan(&tableCount)
if err != nil {
Expand All @@ -126,7 +126,7 @@ ENGINE = Distributed(%s, signoz_analytics, rule_state_history, cityHash64(rule_i
}

// check if distributed table exists
distributedTableExists := `SELECT count(*) FROM system.tables WHERE name = 'distributed_rule_state_history' AND database = 'signoz_analytics'`
distributedTableExists := `SELECT count(*) FROM system.tables WHERE name = 'distributed_rule_state_history_v0' AND database = 'signoz_analytics'`
var distributedTableCount uint64
err = conn.QueryRow(context.Background(), distributedTableExists).Scan(&distributedTableCount)
if err != nil {
Expand Down
Loading
Loading