From 67779d6c2c88d2354ab68ccaa658eaecb8ad25ab Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Fri, 31 May 2024 17:43:13 +0530 Subject: [PATCH] chore: scheduled maintenance query-service impl (#4834) --- pkg/query-service/app/dashboards/model.go | 16 + pkg/query-service/app/http_handler.go | 102 +++++ pkg/query-service/rules/db.go | 93 +++++ pkg/query-service/rules/maintenance.go | 423 ++++++++++++++++++++ pkg/query-service/rules/maintenance_test.go | 230 +++++++++++ pkg/query-service/rules/manager.go | 8 +- pkg/query-service/rules/promRuleTask.go | 34 +- pkg/query-service/rules/ruleTask.go | 26 +- pkg/query-service/rules/task.go | 6 +- 9 files changed, 927 insertions(+), 11 deletions(-) create mode 100644 pkg/query-service/rules/maintenance.go create mode 100644 pkg/query-service/rules/maintenance_test.go diff --git a/pkg/query-service/app/dashboards/model.go b/pkg/query-service/app/dashboards/model.go index e7f48f8f87..64e4abcf3e 100644 --- a/pkg/query-service/app/dashboards/model.go +++ b/pkg/query-service/app/dashboards/model.go @@ -83,6 +83,22 @@ func InitDB(dataSourceName string) (*sqlx.DB, error) { return nil, fmt.Errorf("error in creating notification_channles table: %s", err.Error()) } + tableSchema := `CREATE TABLE IF NOT EXISTS planned_maintenance ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + description TEXT, + alert_ids TEXT, + schedule TEXT NOT NULL, + created_at datetime NOT NULL, + created_by TEXT NOT NULL, + updated_at datetime NOT NULL, + updated_by TEXT NOT NULL + );` + _, err = db.Exec(tableSchema) + if err != nil { + return nil, fmt.Errorf("error in creating planned_maintenance table: %s", err.Error()) + } + table_schema = `CREATE TABLE IF NOT EXISTS ttl_status ( id INTEGER PRIMARY KEY AUTOINCREMENT, transaction_id TEXT NOT NULL, diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 5da041edc7..6792e58008 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -375,6 +375,12 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) { router.HandleFunc("/api/v1/rules/{id}", am.EditAccess(aH.patchRule)).Methods(http.MethodPatch) router.HandleFunc("/api/v1/testRule", am.EditAccess(aH.testRule)).Methods(http.MethodPost) + router.HandleFunc("/api/v1/downtime_schedules", am.OpenAccess(aH.listDowntimeSchedules)).Methods(http.MethodGet) + router.HandleFunc("/api/v1/downtime_schedules/{id}", am.OpenAccess(aH.getDowntimeSchedule)).Methods(http.MethodGet) + router.HandleFunc("/api/v1/downtime_schedules", am.OpenAccess(aH.createDowntimeSchedule)).Methods(http.MethodPost) + router.HandleFunc("/api/v1/downtime_schedules/{id}", am.OpenAccess(aH.editDowntimeSchedule)).Methods(http.MethodPut) + router.HandleFunc("/api/v1/downtime_schedules/{id}", am.OpenAccess(aH.deleteDowntimeSchedule)).Methods(http.MethodDelete) + router.HandleFunc("/api/v1/dashboards", am.ViewAccess(aH.getDashboards)).Methods(http.MethodGet) router.HandleFunc("/api/v1/dashboards", am.EditAccess(aH.createDashboards)).Methods(http.MethodPost) router.HandleFunc("/api/v1/dashboards/grafana", am.EditAccess(aH.createDashboardsTransform)).Methods(http.MethodPost) @@ -535,6 +541,102 @@ func (aH *APIHandler) populateTemporality(ctx context.Context, qp *v3.QueryRange return nil } +func (aH *APIHandler) listDowntimeSchedules(w http.ResponseWriter, r *http.Request) { + schedules, err := aH.ruleManager.RuleDB().GetAllPlannedMaintenance(r.Context()) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + // The schedules are stored as JSON in the database, so we need to filter them here + // Since the number of schedules is expected to be small, this should be fine + + if r.URL.Query().Get("active") != "" { + activeSchedules := make([]rules.PlannedMaintenance, 0) + active, _ := strconv.ParseBool(r.URL.Query().Get("active")) + for _, schedule := range schedules { + now := time.Now().In(time.FixedZone(schedule.Schedule.Timezone, 0)) + if schedule.IsActive(now) == active { + activeSchedules = append(activeSchedules, schedule) + } + } + schedules = activeSchedules + } + + if r.URL.Query().Get("recurring") != "" { + recurringSchedules := make([]rules.PlannedMaintenance, 0) + recurring, _ := strconv.ParseBool(r.URL.Query().Get("recurring")) + for _, schedule := range schedules { + if schedule.IsRecurring() == recurring { + recurringSchedules = append(recurringSchedules, schedule) + } + } + schedules = recurringSchedules + } + + aH.Respond(w, schedules) +} + +func (aH *APIHandler) getDowntimeSchedule(w http.ResponseWriter, r *http.Request) { + id := mux.Vars(r)["id"] + schedule, err := aH.ruleManager.RuleDB().GetPlannedMaintenanceByID(r.Context(), id) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + aH.Respond(w, schedule) +} + +func (aH *APIHandler) createDowntimeSchedule(w http.ResponseWriter, r *http.Request) { + var schedule rules.PlannedMaintenance + err := json.NewDecoder(r.Body).Decode(&schedule) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + if err := schedule.Validate(); err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + + _, err = aH.ruleManager.RuleDB().CreatePlannedMaintenance(r.Context(), schedule) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + aH.Respond(w, nil) +} + +func (aH *APIHandler) editDowntimeSchedule(w http.ResponseWriter, r *http.Request) { + id := mux.Vars(r)["id"] + var schedule rules.PlannedMaintenance + err := json.NewDecoder(r.Body).Decode(&schedule) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + if err := schedule.Validate(); err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + _, err = aH.ruleManager.RuleDB().EditPlannedMaintenance(r.Context(), schedule, id) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + aH.Respond(w, nil) +} + +func (aH *APIHandler) deleteDowntimeSchedule(w http.ResponseWriter, r *http.Request) { + id := mux.Vars(r)["id"] + _, err := aH.ruleManager.RuleDB().DeletePlannedMaintenance(r.Context(), id) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + aH.Respond(w, nil) +} + func (aH *APIHandler) listRules(w http.ResponseWriter, r *http.Request) { rules, err := aH.ruleManager.ListRuleStates(r.Context()) diff --git a/pkg/query-service/rules/db.go b/pkg/query-service/rules/db.go index 23372ce911..abf584a375 100644 --- a/pkg/query-service/rules/db.go +++ b/pkg/query-service/rules/db.go @@ -7,6 +7,7 @@ import ( "time" "github.com/jmoiron/sqlx" + "go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/common" "go.uber.org/zap" ) @@ -27,6 +28,21 @@ type RuleDB interface { // GetStoredRule for a given ID from DB GetStoredRule(ctx context.Context, id string) (*StoredRule, error) + + // CreatePlannedMaintenance stores a given maintenance in db + CreatePlannedMaintenance(ctx context.Context, maintenance PlannedMaintenance) (int64, error) + + // DeletePlannedMaintenance deletes the given maintenance in the db + DeletePlannedMaintenance(ctx context.Context, id string) (string, error) + + // GetPlannedMaintenanceByID fetches the maintenance definition from db by id + GetPlannedMaintenanceByID(ctx context.Context, id string) (*PlannedMaintenance, error) + + // EditPlannedMaintenance updates the given maintenance in the db + EditPlannedMaintenance(ctx context.Context, maintenance PlannedMaintenance, id string) (string, error) + + // GetAllPlannedMaintenance fetches the maintenance definitions from db + GetAllPlannedMaintenance(ctx context.Context) ([]PlannedMaintenance, error) } type StoredRule struct { @@ -202,3 +218,80 @@ func (r *ruleDB) GetStoredRule(ctx context.Context, id string) (*StoredRule, err return rule, nil } + +func (r *ruleDB) GetAllPlannedMaintenance(ctx context.Context) ([]PlannedMaintenance, error) { + maintenances := []PlannedMaintenance{} + + query := "SELECT id, name, description, schedule, alert_ids, created_at, created_by, updated_at, updated_by FROM planned_maintenance" + + err := r.Select(&maintenances, query) + + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return nil, err + } + + return maintenances, nil +} + +func (r *ruleDB) GetPlannedMaintenanceByID(ctx context.Context, id string) (*PlannedMaintenance, error) { + maintenance := &PlannedMaintenance{} + + query := "SELECT id, name, description, schedule, alert_ids, created_at, created_by, updated_at, updated_by FROM planned_maintenance WHERE id=$1" + err := r.Get(maintenance, query, id) + + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return nil, err + } + + return maintenance, nil +} + +func (r *ruleDB) CreatePlannedMaintenance(ctx context.Context, maintenance PlannedMaintenance) (int64, error) { + + email, _ := auth.GetEmailFromJwt(ctx) + maintenance.CreatedBy = email + maintenance.CreatedAt = time.Now() + maintenance.UpdatedBy = email + maintenance.UpdatedAt = time.Now() + + query := "INSERT INTO planned_maintenance (name, description, schedule, alert_ids, created_at, created_by, updated_at, updated_by) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" + + result, err := r.Exec(query, maintenance.Name, maintenance.Description, maintenance.Schedule, maintenance.AlertIds, maintenance.CreatedAt, maintenance.CreatedBy, maintenance.UpdatedAt, maintenance.UpdatedBy) + + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return 0, err + } + + return result.LastInsertId() +} + +func (r *ruleDB) DeletePlannedMaintenance(ctx context.Context, id string) (string, error) { + query := "DELETE FROM planned_maintenance WHERE id=$1" + _, err := r.Exec(query, id) + + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return "", err + } + + return "", nil +} + +func (r *ruleDB) EditPlannedMaintenance(ctx context.Context, maintenance PlannedMaintenance, id string) (string, error) { + email, _ := auth.GetEmailFromJwt(ctx) + maintenance.UpdatedBy = email + maintenance.UpdatedAt = time.Now() + + query := "UPDATE planned_maintenance SET name=$1, description=$2, schedule=$3, alert_ids=$4, updated_at=$5, updated_by=$6 WHERE id=$7" + _, err := r.Exec(query, maintenance.Name, maintenance.Description, maintenance.Schedule, maintenance.AlertIds, maintenance.UpdatedAt, maintenance.UpdatedBy, id) + + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return "", err + } + + return "", nil +} diff --git a/pkg/query-service/rules/maintenance.go b/pkg/query-service/rules/maintenance.go new file mode 100644 index 0000000000..014e8ac669 --- /dev/null +++ b/pkg/query-service/rules/maintenance.go @@ -0,0 +1,423 @@ +package rules + +import ( + "database/sql/driver" + "encoding/json" + "slices" + "strings" + "time" + + "github.com/pkg/errors" + "go.uber.org/zap" +) + +var ( + ErrMissingName = errors.New("missing name") + ErrMissingSchedule = errors.New("missing schedule") + ErrMissingTimezone = errors.New("missing timezone") + ErrMissingRepeatType = errors.New("missing repeat type") + ErrMissingDuration = errors.New("missing duration") +) + +type PlannedMaintenance struct { + Id int64 `json:"id" db:"id"` + Name string `json:"name" db:"name"` + Description string `json:"description" db:"description"` + Schedule *Schedule `json:"schedule" db:"schedule"` + AlertIds *AlertIds `json:"alertIds" db:"alert_ids"` + CreatedAt time.Time `json:"createdAt" db:"created_at"` + CreatedBy string `json:"createdBy" db:"created_by"` + UpdatedAt time.Time `json:"updatedAt" db:"updated_at"` + UpdatedBy string `json:"updatedBy" db:"updated_by"` + Status string `json:"status"` + Kind string `json:"kind"` +} + +type AlertIds []string + +func (a *AlertIds) Scan(src interface{}) error { + if data, ok := src.([]byte); ok { + return json.Unmarshal(data, a) + } + return nil +} + +func (a *AlertIds) Value() (driver.Value, error) { + return json.Marshal(a) +} + +type Schedule struct { + Timezone string `json:"timezone"` + StartTime time.Time `json:"startTime,omitempty"` + EndTime time.Time `json:"endTime,omitempty"` + Recurrence *Recurrence `json:"recurrence"` +} + +func (s *Schedule) Scan(src interface{}) error { + if data, ok := src.([]byte); ok { + return json.Unmarshal(data, s) + } + return nil +} + +func (s *Schedule) Value() (driver.Value, error) { + return json.Marshal(s) +} + +type RepeatType string + +const ( + RepeatTypeDaily RepeatType = "daily" + RepeatTypeWeekly RepeatType = "weekly" + RepeatTypeMonthly RepeatType = "monthly" +) + +type RepeatOn string + +const ( + RepeatOnSunday RepeatOn = "sunday" + RepeatOnMonday RepeatOn = "monday" + RepeatOnTuesday RepeatOn = "tuesday" + RepeatOnWednesday RepeatOn = "wednesday" + RepeatOnThursday RepeatOn = "thursday" + RepeatOnFriday RepeatOn = "friday" + RepeatOnSaturday RepeatOn = "saturday" +) + +type Recurrence struct { + StartTime time.Time `json:"startTime"` + EndTime *time.Time `json:"endTime,omitempty"` + Duration Duration `json:"duration"` + RepeatType RepeatType `json:"repeatType"` + RepeatOn []RepeatOn `json:"repeatOn"` +} + +func (r *Recurrence) Scan(src interface{}) error { + if data, ok := src.([]byte); ok { + return json.Unmarshal(data, r) + } + return nil +} + +func (r *Recurrence) Value() (driver.Value, error) { + return json.Marshal(r) +} + +func (s Schedule) MarshalJSON() ([]byte, error) { + loc, err := time.LoadLocation(s.Timezone) + if err != nil { + return nil, err + } + + var startTime, endTime time.Time + if !s.StartTime.IsZero() { + startTime = time.Date(s.StartTime.Year(), s.StartTime.Month(), s.StartTime.Day(), s.StartTime.Hour(), s.StartTime.Minute(), s.StartTime.Second(), s.StartTime.Nanosecond(), loc) + } + if !s.EndTime.IsZero() { + endTime = time.Date(s.EndTime.Year(), s.EndTime.Month(), s.EndTime.Day(), s.EndTime.Hour(), s.EndTime.Minute(), s.EndTime.Second(), s.EndTime.Nanosecond(), loc) + } + + var recurrence *Recurrence + if s.Recurrence != nil { + recStartTime := time.Date(s.Recurrence.StartTime.Year(), s.Recurrence.StartTime.Month(), s.Recurrence.StartTime.Day(), s.Recurrence.StartTime.Hour(), s.Recurrence.StartTime.Minute(), s.Recurrence.StartTime.Second(), s.Recurrence.StartTime.Nanosecond(), loc) + var recEndTime *time.Time + if s.Recurrence.EndTime != nil { + end := time.Date(s.Recurrence.EndTime.Year(), s.Recurrence.EndTime.Month(), s.Recurrence.EndTime.Day(), s.Recurrence.EndTime.Hour(), s.Recurrence.EndTime.Minute(), s.Recurrence.EndTime.Second(), s.Recurrence.EndTime.Nanosecond(), loc) + recEndTime = &end + } + recurrence = &Recurrence{ + StartTime: recStartTime, + EndTime: recEndTime, + Duration: s.Recurrence.Duration, + RepeatType: s.Recurrence.RepeatType, + RepeatOn: s.Recurrence.RepeatOn, + } + } + + return json.Marshal(&struct { + Timezone string `json:"timezone"` + StartTime string `json:"startTime"` + EndTime string `json:"endTime"` + Recurrence *Recurrence `json:"recurrence,omitempty"` + }{ + Timezone: s.Timezone, + StartTime: startTime.Format(time.RFC3339), + EndTime: endTime.Format(time.RFC3339), + Recurrence: recurrence, + }) +} + +func (s *Schedule) UnmarshalJSON(data []byte) error { + aux := &struct { + Timezone string `json:"timezone"` + StartTime string `json:"startTime"` + EndTime string `json:"endTime"` + Recurrence *Recurrence `json:"recurrence,omitempty"` + }{} + if err := json.Unmarshal(data, aux); err != nil { + return err + } + + loc, err := time.LoadLocation(aux.Timezone) + if err != nil { + return err + } + + var startTime time.Time + if aux.StartTime != "" { + startTime, err = time.Parse(time.RFC3339, aux.StartTime) + if err != nil { + return err + } + s.StartTime = time.Date(startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), startTime.Second(), startTime.Nanosecond(), loc) + } + + var endTime time.Time + if aux.EndTime != "" { + endTime, err = time.Parse(time.RFC3339, aux.EndTime) + if err != nil { + return err + } + s.EndTime = time.Date(endTime.Year(), endTime.Month(), endTime.Day(), endTime.Hour(), endTime.Minute(), endTime.Second(), endTime.Nanosecond(), loc) + } + + s.Timezone = aux.Timezone + + if aux.Recurrence != nil { + recStartTime, err := time.Parse(time.RFC3339, aux.Recurrence.StartTime.Format(time.RFC3339)) + if err != nil { + return err + } + + var recEndTime *time.Time + if aux.Recurrence.EndTime != nil { + end, err := time.Parse(time.RFC3339, aux.Recurrence.EndTime.Format(time.RFC3339)) + if err != nil { + return err + } + endConverted := time.Date(end.Year(), end.Month(), end.Day(), end.Hour(), end.Minute(), end.Second(), end.Nanosecond(), loc) + recEndTime = &endConverted + } + + s.Recurrence = &Recurrence{ + StartTime: time.Date(recStartTime.Year(), recStartTime.Month(), recStartTime.Day(), recStartTime.Hour(), recStartTime.Minute(), recStartTime.Second(), recStartTime.Nanosecond(), loc), + EndTime: recEndTime, + Duration: aux.Recurrence.Duration, + RepeatType: aux.Recurrence.RepeatType, + RepeatOn: aux.Recurrence.RepeatOn, + } + } + return nil +} + +func (m *PlannedMaintenance) shouldSkip(ruleID string, now time.Time) bool { + + found := false + if m.AlertIds != nil { + for _, alertID := range *m.AlertIds { + if alertID == ruleID { + found = true + break + } + } + } + + // If no alert ids, then skip all alerts + if m.AlertIds == nil || len(*m.AlertIds) == 0 { + found = true + } + + if found { + + zap.L().Info("alert found in maintenance", zap.String("alert", ruleID), zap.Any("maintenance", m.Name)) + + // If alert is found, we check if it should be skipped based on the schedule + // If it should be skipped, we return true + // If it should not be skipped, we return false + + // fixed schedule + if !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() { + // if the current time in the timezone is between the start and end time + loc, err := time.LoadLocation(m.Schedule.Timezone) + if err != nil { + zap.L().Error("Error loading location", zap.String("timezone", m.Schedule.Timezone), zap.Error(err)) + return false + } + + currentTime := now.In(loc) + zap.L().Info("checking fixed schedule", zap.Any("rule", ruleID), zap.String("maintenance", m.Name), zap.Time("currentTime", currentTime), zap.Time("startTime", m.Schedule.StartTime), zap.Time("endTime", m.Schedule.EndTime)) + if currentTime.After(m.Schedule.StartTime) && currentTime.Before(m.Schedule.EndTime) { + return true + } + } + + // recurring schedule + if m.Schedule.Recurrence != nil { + zap.L().Info("evaluating recurrence schedule") + start := m.Schedule.Recurrence.StartTime + end := m.Schedule.Recurrence.StartTime.Add(time.Duration(m.Schedule.Recurrence.Duration)) + // if the current time in the timezone is between the start and end time + loc, err := time.LoadLocation(m.Schedule.Timezone) + if err != nil { + zap.L().Error("Error loading location", zap.String("timezone", m.Schedule.Timezone), zap.Error(err)) + return false + } + currentTime := now.In(loc) + + zap.L().Info("checking recurring schedule", zap.Any("rule", ruleID), zap.String("maintenance", m.Name), zap.Time("currentTime", currentTime), zap.Time("startTime", start), zap.Time("endTime", end)) + + // make sure the start time is not after the current time + if currentTime.Before(start.In(loc)) { + zap.L().Info("current time is before start time", zap.Any("rule", ruleID), zap.String("maintenance", m.Name), zap.Time("currentTime", currentTime), zap.Time("startTime", start.In(loc))) + return false + } + + var endTime time.Time + if m.Schedule.Recurrence.EndTime != nil { + endTime = *m.Schedule.Recurrence.EndTime + } + if !endTime.IsZero() && currentTime.After(endTime.In(loc)) { + zap.L().Info("current time is after end time", zap.Any("rule", ruleID), zap.String("maintenance", m.Name), zap.Time("currentTime", currentTime), zap.Time("endTime", end.In(loc))) + return false + } + + switch m.Schedule.Recurrence.RepeatType { + case RepeatTypeDaily: + // take the hours and minutes from the start time and add them to the current time + startTime := time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(), start.Hour(), start.Minute(), 0, 0, loc) + endTime := time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(), end.Hour(), end.Minute(), 0, 0, loc) + zap.L().Info("checking daily schedule", zap.Any("rule", ruleID), zap.String("maintenance", m.Name), zap.Time("currentTime", currentTime), zap.Time("startTime", startTime), zap.Time("endTime", endTime)) + + if currentTime.After(startTime) && currentTime.Before(endTime) { + return true + } + case RepeatTypeWeekly: + // if the current time in the timezone is between the start and end time on the RepeatOn day + startTime := time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(), start.Hour(), start.Minute(), 0, 0, loc) + endTime := time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(), end.Hour(), end.Minute(), 0, 0, loc) + zap.L().Info("checking weekly schedule", zap.Any("rule", ruleID), zap.String("maintenance", m.Name), zap.Time("currentTime", currentTime), zap.Time("startTime", startTime), zap.Time("endTime", endTime)) + if currentTime.After(startTime) && currentTime.Before(endTime) { + if len(m.Schedule.Recurrence.RepeatOn) == 0 { + return true + } else if slices.Contains(m.Schedule.Recurrence.RepeatOn, RepeatOn(strings.ToLower(currentTime.Weekday().String()))) { + return true + } + } + case RepeatTypeMonthly: + // if the current time in the timezone is between the start and end time on the day of the current month + startTime := time.Date(currentTime.Year(), currentTime.Month(), start.Day(), start.Hour(), start.Minute(), 0, 0, loc) + endTime := time.Date(currentTime.Year(), currentTime.Month(), end.Day(), end.Hour(), end.Minute(), 0, 0, loc) + zap.L().Info("checking monthly schedule", zap.Any("rule", ruleID), zap.String("maintenance", m.Name), zap.Time("currentTime", currentTime), zap.Time("startTime", startTime), zap.Time("endTime", endTime)) + if currentTime.After(startTime) && currentTime.Before(endTime) && currentTime.Day() == start.Day() { + return true + } + } + } + } + // If alert is not found, we return false + return false +} + +func (m *PlannedMaintenance) IsActive(now time.Time) bool { + ruleID := "maintenance" + if m.AlertIds != nil && len(*m.AlertIds) > 0 { + ruleID = (*m.AlertIds)[0] + } + return m.shouldSkip(ruleID, now) +} + +func (m *PlannedMaintenance) IsUpcoming() bool { + now := time.Now().In(time.FixedZone(m.Schedule.Timezone, 0)) + if !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() { + return now.Before(m.Schedule.StartTime) + } + if m.Schedule.Recurrence != nil { + return now.Before(m.Schedule.Recurrence.StartTime) + } + return false +} + +func (m *PlannedMaintenance) IsRecurring() bool { + return m.Schedule.Recurrence != nil +} + +func (m *PlannedMaintenance) Validate() error { + if m.Name == "" { + return ErrMissingName + } + if m.Schedule == nil { + return ErrMissingSchedule + } + if m.Schedule.Timezone == "" { + return ErrMissingTimezone + } + + _, err := time.LoadLocation(m.Schedule.Timezone) + if err != nil { + return errors.New("invalid timezone") + } + + if !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() { + if m.Schedule.StartTime.After(m.Schedule.EndTime) { + return errors.New("start time cannot be after end time") + } + } + + if m.Schedule.Recurrence != nil { + if m.Schedule.Recurrence.RepeatType == "" { + return ErrMissingRepeatType + } + if m.Schedule.Recurrence.Duration == 0 { + return ErrMissingDuration + } + if m.Schedule.Recurrence.EndTime != nil && m.Schedule.Recurrence.EndTime.Before(m.Schedule.Recurrence.StartTime) { + return errors.New("end time cannot be before start time") + } + } + return nil +} + +func (m PlannedMaintenance) MarshalJSON() ([]byte, error) { + now := time.Now().In(time.FixedZone(m.Schedule.Timezone, 0)) + var status string + if m.IsActive(now) { + status = "active" + } else if m.IsUpcoming() { + status = "upcoming" + } else { + status = "expired" + } + var kind string + + if !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() && m.Schedule.EndTime.After(m.Schedule.StartTime) { + kind = "fixed" + } else { + kind = "recurring" + } + + return json.Marshal(struct { + Id int64 `json:"id" db:"id"` + Name string `json:"name" db:"name"` + Description string `json:"description" db:"description"` + Schedule *Schedule `json:"schedule" db:"schedule"` + AlertIds *AlertIds `json:"alertIds" db:"alert_ids"` + CreatedAt time.Time `json:"createdAt" db:"created_at"` + CreatedBy string `json:"createdBy" db:"created_by"` + UpdatedAt time.Time `json:"updatedAt" db:"updated_at"` + UpdatedBy string `json:"updatedBy" db:"updated_by"` + Status string `json:"status"` + Kind string `json:"kind"` + }{ + Id: m.Id, + Name: m.Name, + Description: m.Description, + Schedule: m.Schedule, + AlertIds: m.AlertIds, + CreatedAt: m.CreatedAt, + CreatedBy: m.CreatedBy, + UpdatedAt: m.UpdatedAt, + UpdatedBy: m.UpdatedBy, + Status: status, + Kind: kind, + }) +} diff --git a/pkg/query-service/rules/maintenance_test.go b/pkg/query-service/rules/maintenance_test.go new file mode 100644 index 0000000000..aaf5edbb91 --- /dev/null +++ b/pkg/query-service/rules/maintenance_test.go @@ -0,0 +1,230 @@ +package rules + +import ( + "testing" + "time" +) + +func TestShouldSkipMaintenance(t *testing.T) { + + cases := []struct { + name string + maintenance *PlannedMaintenance + ts time.Time + expected bool + }{ + { + name: "fixed planned maintenance start <= ts <= end", + maintenance: &PlannedMaintenance{ + Schedule: &Schedule{ + Timezone: "UTC", + StartTime: time.Now().UTC().Add(-time.Hour), + EndTime: time.Now().UTC().Add(time.Hour * 2), + }, + }, + ts: time.Now().UTC(), + expected: true, + }, + { + name: "fixed planned maintenance start >= ts", + maintenance: &PlannedMaintenance{ + Schedule: &Schedule{ + Timezone: "UTC", + StartTime: time.Now().UTC().Add(time.Hour), + EndTime: time.Now().UTC().Add(time.Hour * 2), + }, + }, + ts: time.Now().UTC(), + expected: false, + }, + { + name: "fixed planned maintenance ts < start", + maintenance: &PlannedMaintenance{ + Schedule: &Schedule{ + Timezone: "UTC", + StartTime: time.Now().UTC().Add(time.Hour), + EndTime: time.Now().UTC().Add(time.Hour * 2), + }, + }, + ts: time.Now().UTC().Add(-time.Hour), + expected: false, + }, + { + name: "recurring maintenance, repeat daily from 12:00 to 14:00", + maintenance: &PlannedMaintenance{ + Schedule: &Schedule{ + Timezone: "UTC", + Recurrence: &Recurrence{ + StartTime: time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC), + Duration: Duration(time.Hour * 2), + RepeatType: RepeatTypeDaily, + }, + }, + }, + ts: time.Date(2024, 1, 1, 12, 10, 0, 0, time.UTC), + expected: true, + }, + { + name: "recurring maintenance, repeat daily from 12:00 to 14:00", + maintenance: &PlannedMaintenance{ + Schedule: &Schedule{ + Timezone: "UTC", + Recurrence: &Recurrence{ + StartTime: time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC), + Duration: Duration(time.Hour * 2), + RepeatType: RepeatTypeDaily, + }, + }, + }, + ts: time.Date(2024, 1, 1, 14, 0, 0, 0, time.UTC), + expected: false, + }, + { + name: "recurring maintenance, repeat daily from 12:00 to 14:00", + maintenance: &PlannedMaintenance{ + Schedule: &Schedule{ + Timezone: "UTC", + Recurrence: &Recurrence{ + StartTime: time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC), + Duration: Duration(time.Hour * 2), + RepeatType: RepeatTypeDaily, + }, + }, + }, + ts: time.Date(2024, 04, 1, 12, 10, 0, 0, time.UTC), + expected: true, + }, + { + name: "recurring maintenance, repeat weekly on monday from 12:00 to 14:00", + maintenance: &PlannedMaintenance{ + Schedule: &Schedule{ + Timezone: "UTC", + Recurrence: &Recurrence{ + StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC), + Duration: Duration(time.Hour * 2), + RepeatType: RepeatTypeWeekly, + RepeatOn: []RepeatOn{RepeatOnMonday}, + }, + }, + }, + ts: time.Date(2024, 04, 15, 12, 10, 0, 0, time.UTC), + expected: true, + }, + { + name: "recurring maintenance, repeat weekly on monday from 12:00 to 14:00", + maintenance: &PlannedMaintenance{ + Schedule: &Schedule{ + Timezone: "UTC", + Recurrence: &Recurrence{ + StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC), + Duration: Duration(time.Hour * 2), + RepeatType: RepeatTypeWeekly, + RepeatOn: []RepeatOn{RepeatOnMonday}, + }, + }, + }, + ts: time.Date(2024, 04, 14, 12, 10, 0, 0, time.UTC), // 14th 04 is sunday + expected: false, + }, + { + name: "recurring maintenance, repeat weekly on monday from 12:00 to 14:00", + maintenance: &PlannedMaintenance{ + Schedule: &Schedule{ + Timezone: "UTC", + Recurrence: &Recurrence{ + StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC), + Duration: Duration(time.Hour * 2), + RepeatType: RepeatTypeWeekly, + RepeatOn: []RepeatOn{RepeatOnMonday}, + }, + }, + }, + ts: time.Date(2024, 04, 16, 12, 10, 0, 0, time.UTC), // 16th 04 is tuesday + expected: false, + }, + { + name: "recurring maintenance, repeat weekly on monday from 12:00 to 14:00", + maintenance: &PlannedMaintenance{ + Schedule: &Schedule{ + Timezone: "UTC", + Recurrence: &Recurrence{ + StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC), + Duration: Duration(time.Hour * 2), + RepeatType: RepeatTypeWeekly, + RepeatOn: []RepeatOn{RepeatOnMonday}, + }, + }, + }, + ts: time.Date(2024, 05, 06, 12, 10, 0, 0, time.UTC), + expected: true, + }, + { + name: "recurring maintenance, repeat weekly on monday from 12:00 to 14:00", + maintenance: &PlannedMaintenance{ + Schedule: &Schedule{ + Timezone: "UTC", + Recurrence: &Recurrence{ + StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC), + Duration: Duration(time.Hour * 2), + RepeatType: RepeatTypeWeekly, + RepeatOn: []RepeatOn{RepeatOnMonday}, + }, + }, + }, + ts: time.Date(2024, 05, 06, 14, 00, 0, 0, time.UTC), + expected: false, + }, + { + name: "recurring maintenance, repeat monthly on 4th from 12:00 to 14:00", + maintenance: &PlannedMaintenance{ + Schedule: &Schedule{ + Timezone: "UTC", + Recurrence: &Recurrence{ + StartTime: time.Date(2024, 04, 04, 12, 0, 0, 0, time.UTC), + Duration: Duration(time.Hour * 2), + RepeatType: RepeatTypeMonthly, + }, + }, + }, + ts: time.Date(2024, 04, 04, 12, 10, 0, 0, time.UTC), + expected: true, + }, + { + name: "recurring maintenance, repeat monthly on 4th from 12:00 to 14:00", + maintenance: &PlannedMaintenance{ + Schedule: &Schedule{ + Timezone: "UTC", + Recurrence: &Recurrence{ + StartTime: time.Date(2024, 04, 04, 12, 0, 0, 0, time.UTC), + Duration: Duration(time.Hour * 2), + RepeatType: RepeatTypeMonthly, + }, + }, + }, + ts: time.Date(2024, 04, 04, 14, 10, 0, 0, time.UTC), + expected: false, + }, + { + name: "recurring maintenance, repeat monthly on 4th from 12:00 to 14:00", + maintenance: &PlannedMaintenance{ + Schedule: &Schedule{ + Timezone: "UTC", + Recurrence: &Recurrence{ + StartTime: time.Date(2024, 04, 04, 12, 0, 0, 0, time.UTC), + Duration: Duration(time.Hour * 2), + RepeatType: RepeatTypeMonthly, + }, + }, + }, + ts: time.Date(2024, 05, 04, 12, 10, 0, 0, time.UTC), + expected: true, + }, + } + + for _, c := range cases { + result := c.maintenance.shouldSkip(c.name, c.ts) + if result != c.expected { + t.Errorf("expected %v, got %v", c.expected, result) + } + } +} diff --git a/pkg/query-service/rules/manager.go b/pkg/query-service/rules/manager.go index e940bd9b13..20951f56a0 100644 --- a/pkg/query-service/rules/manager.go +++ b/pkg/query-service/rules/manager.go @@ -133,6 +133,10 @@ func (m *Manager) Start() { m.run() } +func (m *Manager) RuleDB() RuleDB { + return m.ruleDB +} + func (m *Manager) Pause(b bool) { m.mtx.Lock() defer m.mtx.Unlock() @@ -529,7 +533,7 @@ func (m *Manager) prepareTask(acquireLock bool, r *PostableRule, taskName string rules = append(rules, tr) // create ch rule task for evalution - task = newTask(TaskTypeCh, taskName, taskNamesuffix, time.Duration(r.Frequency), rules, m.opts, m.prepareNotifyFunc()) + task = newTask(TaskTypeCh, taskName, taskNamesuffix, time.Duration(r.Frequency), rules, m.opts, m.prepareNotifyFunc(), m.ruleDB) // add rule to memory m.rules[ruleId] = tr @@ -551,7 +555,7 @@ func (m *Manager) prepareTask(acquireLock bool, r *PostableRule, taskName string rules = append(rules, pr) // create promql rule task for evalution - task = newTask(TaskTypeProm, taskName, taskNamesuffix, time.Duration(r.Frequency), rules, m.opts, m.prepareNotifyFunc()) + task = newTask(TaskTypeProm, taskName, taskNamesuffix, time.Duration(r.Frequency), rules, m.opts, m.prepareNotifyFunc(), m.ruleDB) // add rule to memory m.rules[ruleId] = pr diff --git a/pkg/query-service/rules/promRuleTask.go b/pkg/query-service/rules/promRuleTask.go index e74106266d..57b7a58dc7 100644 --- a/pkg/query-service/rules/promRuleTask.go +++ b/pkg/query-service/rules/promRuleTask.go @@ -27,19 +27,20 @@ type PromRuleTask struct { evaluationTime time.Duration lastEvaluation time.Time - markStale bool - done chan struct{} - terminated chan struct{} - managerDone chan struct{} + markStale bool + done chan struct{} + terminated chan struct{} pause bool logger log.Logger notify NotifyFunc + + ruleDB RuleDB } // newPromRuleTask holds rules that have promql condition // and evalutes the rule at a given frequency -func newPromRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc) *PromRuleTask { +func newPromRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, ruleDB RuleDB) *PromRuleTask { zap.L().Info("Initiating a new rule group", zap.String("name", name), zap.Duration("frequency", frequency)) if time.Now() == time.Now().Add(frequency) { @@ -57,6 +58,7 @@ func newPromRuleTask(name, file string, frequency time.Duration, rules []Rule, o done: make(chan struct{}), terminated: make(chan struct{}), notify: notify, + ruleDB: ruleDB, logger: log.With(opts.Logger, "group", name), } } @@ -313,10 +315,32 @@ func (g *PromRuleTask) CopyState(fromTask Task) error { // Eval runs a single evaluation cycle in which all rules are evaluated sequentially. func (g *PromRuleTask) Eval(ctx context.Context, ts time.Time) { zap.L().Info("promql rule task", zap.String("name", g.name), zap.Time("eval started at", ts)) + + maintenance, err := g.ruleDB.GetAllPlannedMaintenance(ctx) + + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + } + for i, rule := range g.rules { if rule == nil { continue } + + shouldSkip := false + for _, m := range maintenance { + zap.L().Info("checking if rule should be skipped", zap.String("rule", rule.ID()), zap.Any("maintenance", m)) + if m.shouldSkip(rule.ID(), ts) { + shouldSkip = true + break + } + } + + if shouldSkip { + zap.L().Info("rule should be skipped", zap.String("rule", rule.ID())) + continue + } + select { case <-g.done: return diff --git a/pkg/query-service/rules/ruleTask.go b/pkg/query-service/rules/ruleTask.go index edf3957a6f..577bd453a5 100644 --- a/pkg/query-service/rules/ruleTask.go +++ b/pkg/query-service/rules/ruleTask.go @@ -30,12 +30,14 @@ type RuleTask struct { pause bool notify NotifyFunc + + ruleDB RuleDB } const DefaultFrequency = 1 * time.Minute // newRuleTask makes a new RuleTask with the given name, options, and rules. -func newRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc) *RuleTask { +func newRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, ruleDB RuleDB) *RuleTask { if time.Now() == time.Now().Add(frequency) { frequency = DefaultFrequency @@ -52,6 +54,7 @@ func newRuleTask(name, file string, frequency time.Duration, rules []Rule, opts done: make(chan struct{}), terminated: make(chan struct{}), notify: notify, + ruleDB: ruleDB, } } @@ -294,10 +297,31 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) { zap.L().Debug("rule task eval started", zap.String("name", g.name), zap.Time("start time", ts)) + maintenance, err := g.ruleDB.GetAllPlannedMaintenance(ctx) + + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + } + for i, rule := range g.rules { if rule == nil { continue } + + shouldSkip := false + for _, m := range maintenance { + zap.L().Info("checking if rule should be skipped", zap.String("rule", rule.ID()), zap.Any("maintenance", m)) + if m.shouldSkip(rule.ID(), ts) { + shouldSkip = true + break + } + } + + if shouldSkip { + zap.L().Info("rule should be skipped", zap.String("rule", rule.ID())) + continue + } + select { case <-g.done: return diff --git a/pkg/query-service/rules/task.go b/pkg/query-service/rules/task.go index bec4ff1c13..64acf6c76e 100644 --- a/pkg/query-service/rules/task.go +++ b/pkg/query-service/rules/task.go @@ -29,9 +29,9 @@ type Task interface { // newTask returns an appropriate group for // rule type -func newTask(taskType TaskType, name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc) Task { +func newTask(taskType TaskType, name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, ruleDB RuleDB) Task { if taskType == TaskTypeCh { - return newRuleTask(name, file, frequency, rules, opts, notify) + return newRuleTask(name, file, frequency, rules, opts, notify, ruleDB) } - return newPromRuleTask(name, file, frequency, rules, opts, notify) + return newPromRuleTask(name, file, frequency, rules, opts, notify, ruleDB) }