diff --git a/cmd/bosun/database/config_data.go b/cmd/bosun/database/config_data.go new file mode 100644 index 0000000000..e1737289d0 --- /dev/null +++ b/cmd/bosun/database/config_data.go @@ -0,0 +1,47 @@ +package database + +import ( + "crypto/md5" + "encoding/base64" + + "bosun.org/_third_party/github.com/garyburd/redigo/redis" + + "bosun.org/collect" + "bosun.org/opentsdb" +) + +type ConfigDataAccess interface { + SaveTempConfig(text string) (hash string, err error) + GetTempConfig(hash string) (text string, err error) +} + +func (d *dataAccess) Configs() ConfigDataAccess { + return d +} + +const configLifetime = 60 * 24 * 14 // 2 weeks + +func (d *dataAccess) SaveTempConfig(text string) (string, error) { + defer collect.StartTimer("redis", opentsdb.TagSet{"op": "SaveTempConfig"})() + conn := d.GetConnection() + defer conn.Close() + + sig := md5.Sum([]byte(text)) + b64 := base64.StdEncoding.EncodeToString(sig[0:8]) + _, err := conn.Do("SET", "tempConfig:"+b64, text, "EX", configLifetime) + return b64, err +} + +func (d *dataAccess) GetTempConfig(hash string) (string, error) { + defer collect.StartTimer("redis", opentsdb.TagSet{"op": "GetTempConfig"})() + conn := d.GetConnection() + defer conn.Close() + + key := "tempConfig:" + hash + dat, err := redis.String(conn.Do("GET", key)) + if err != nil { + return "", err + } + _, err = conn.Do("EXPIRE", key, configLifetime) + return dat, err +} diff --git a/cmd/bosun/database/database.go b/cmd/bosun/database/database.go index a7e1544589..d44c7c8d36 100644 --- a/cmd/bosun/database/database.go +++ b/cmd/bosun/database/database.go @@ -18,10 +18,12 @@ import ( // Core data access interface for everything sched needs type DataAccess interface { Metadata() MetadataDataAccess + Configs() ConfigDataAccess Search() SearchDataAccess Errors() ErrorDataAccess State() StateDataAccess Silence() SilenceDataAccess + Notifications() NotificationDataAccess } type MetadataDataAccess interface { diff --git a/cmd/bosun/database/notification_data.go b/cmd/bosun/database/notification_data.go new file mode 100644 index 0000000000..d5090654b9 --- /dev/null +++ b/cmd/bosun/database/notification_data.go @@ -0,0 +1,133 @@ +package database + +import ( + "fmt" + "strings" + "time" + + "bosun.org/_third_party/github.com/garyburd/redigo/redis" + + "bosun.org/collect" + "bosun.org/models" + "bosun.org/opentsdb" + "bosun.org/slog" +) + +/* + +pendingNotifications: ZSET timestamp ak:notification + +notsByAlert:alert SET of notifications possible per alert. used to clear alerts by alert key + +*/ + +const ( + pendingNotificationsKey = "pendingNotifications" +) + +func notsByAlertKeyKey(ak models.AlertKey) string { + return fmt.Sprintf("notsByAlert:%s", ak.Name()) +} + +type NotificationDataAccess interface { + InsertNotification(ak models.AlertKey, notification string, dueAt time.Time) error + + //Get notifications that are currently due or past due. Does not delete. + GetDueNotifications() (map[models.AlertKey]map[string]time.Time, error) + + //Clear all notifications due on or before a given timestamp. Intended is to use the max returned from GetDueNotifications once you have processed them. + ClearNotificationsBefore(time.Time) error + + ClearNotifications(ak models.AlertKey) error + + GetNextNotificationTime() (time.Time, error) +} + +func (d *dataAccess) Notifications() NotificationDataAccess { + return d +} + +func (d *dataAccess) InsertNotification(ak models.AlertKey, notification string, dueAt time.Time) error { + defer collect.StartTimer("redis", opentsdb.TagSet{"op": "InsertNotification"})() + conn := d.GetConnection() + defer conn.Close() + + _, err := conn.Do("ZADD", pendingNotificationsKey, dueAt.UTC().Unix(), fmt.Sprintf("%s:%s", ak, notification)) + if err != nil { + return slog.Wrap(err) + } + _, err = conn.Do("SADD", notsByAlertKeyKey(ak), notification) + return slog.Wrap(err) +} + +func (d *dataAccess) GetDueNotifications() (map[models.AlertKey]map[string]time.Time, error) { + defer collect.StartTimer("redis", opentsdb.TagSet{"op": "GetDueNotifications"})() + conn := d.GetConnection() + defer conn.Close() + m, err := redis.Int64Map(conn.Do("ZRANGEBYSCORE", pendingNotificationsKey, 0, time.Now().UTC().Unix(), "WITHSCORES")) + if err != nil { + return nil, slog.Wrap(err) + } + results := map[models.AlertKey]map[string]time.Time{} + for key, t := range m { + last := strings.LastIndex(key, ":") + if last == -1 { + continue + } + ak, not := models.AlertKey(key[:last]), key[last+1:] + if results[ak] == nil { + results[ak] = map[string]time.Time{} + } + results[ak][not] = time.Unix(t, 0).UTC() + } + return results, err +} + +func (d *dataAccess) ClearNotificationsBefore(t time.Time) error { + defer collect.StartTimer("redis", opentsdb.TagSet{"op": "ClearNotificationsBefore"})() + conn := d.GetConnection() + defer conn.Close() + + _, err := conn.Do("ZREMRANGEBYSCORE", pendingNotificationsKey, 0, t.UTC().Unix()) + return slog.Wrap(err) +} + +func (d *dataAccess) ClearNotifications(ak models.AlertKey) error { + defer collect.StartTimer("redis", opentsdb.TagSet{"op": "ClearNotifications"})() + conn := d.GetConnection() + defer conn.Close() + + nots, err := redis.Strings(conn.Do("SMEMBERS", notsByAlertKeyKey(ak))) + if err != nil { + return slog.Wrap(err) + } + + if len(nots) == 0 { + return nil + } + + args := []interface{}{pendingNotificationsKey} + for _, not := range nots { + key := fmt.Sprintf("%s:%s", ak, not) + args = append(args, key) + } + _, err = conn.Do("ZREM", args...) + return slog.Wrap(err) +} + +func (d *dataAccess) GetNextNotificationTime() (time.Time, error) { + defer collect.StartTimer("redis", opentsdb.TagSet{"op": "GetNextNotificationTime"})() + conn := d.GetConnection() + defer conn.Close() + + m, err := redis.Int64Map(conn.Do("ZRANGE", pendingNotificationsKey, 0, 0, "WITHSCORES")) + if err != nil { + return time.Time{}, slog.Wrap(err) + } + // default time is one hour from now if no pending notifications exist + t := time.Now().UTC().Add(time.Hour).Truncate(time.Second) + for _, i := range m { + t = time.Unix(i, 0).UTC() + } + return t, nil +} diff --git a/cmd/bosun/database/test/notifications_test.go b/cmd/bosun/database/test/notifications_test.go new file mode 100644 index 0000000000..e374b1689f --- /dev/null +++ b/cmd/bosun/database/test/notifications_test.go @@ -0,0 +1,68 @@ +package dbtest + +import ( + "testing" + "time" + + "bosun.org/models" +) + +func TestNotifications_RoundTrip(t *testing.T) { + + nd := testData.Notifications() + notTime := time.Now().UTC().Add(-10 * time.Hour).Truncate(time.Second) + future := time.Now().UTC().Add(time.Hour).Truncate(time.Second) + oneMin := time.Now().UTC().Add(time.Minute).Truncate(time.Second) + + // with nothing pending, next time should be an hour from now + next, err := nd.GetNextNotificationTime() + check(t, err) + if next != future { + t.Fatalf("wrong next time. %s != %s", next, future) + } + + // add notifications + err = nd.InsertNotification(models.AlertKey("notak{foo=a}"), "chat", notTime) + check(t, err) + err = nd.InsertNotification(models.AlertKey("notak{foo=b}"), "chat", oneMin) + check(t, err) + err = nd.InsertNotification(models.AlertKey("notak{foo=c}"), "chat", future) + check(t, err) + + // next time should be correct + next, err = nd.GetNextNotificationTime() + check(t, err) + if next != notTime { + t.Fatalf("wrong next time. %s != %s", next, notTime) + } + + // make sure only one due + due, err := nd.GetDueNotifications() + check(t, err) + if len(due) != 1 { + t.Fatalf("Wrong number of due notifications. %d != %d", len(due), 1) + } + + // next time should still be correct + next, err = nd.GetNextNotificationTime() + check(t, err) + if next != notTime { + t.Fatalf("wrong next time. %s != %s", next, notTime) + } + + check(t, nd.ClearNotificationsBefore(notTime)) + // next time should be 1 minute + next, err = nd.GetNextNotificationTime() + check(t, err) + if next != oneMin { + t.Fatalf("wrong next time. %s != %s", next, oneMin) + } + + check(t, nd.ClearNotifications(models.AlertKey("notak{foo=b}"))) + // next time should be 1 hour + next, err = nd.GetNextNotificationTime() + check(t, err) + if next != future { + t.Fatalf("wrong next time. %s != %s", next, future) + } +} diff --git a/cmd/bosun/sched/alertRunner.go b/cmd/bosun/sched/alertRunner.go index f6debd1568..74be03025a 100644 --- a/cmd/bosun/sched/alertRunner.go +++ b/cmd/bosun/sched/alertRunner.go @@ -19,7 +19,6 @@ func (s *Schedule) Run() error { go s.PingHosts() } go s.dispatchNotifications() - go s.performSave() go s.updateCheckContext() for _, a := range s.Conf.Alerts { go s.RunAlert(a) @@ -28,7 +27,7 @@ func (s *Schedule) Run() error { } func (s *Schedule) updateCheckContext() { for { - ctx := &checkContext{time.Now(), cache.New(0)} + ctx := &checkContext{utcNow(), cache.New(0)} s.ctx = ctx time.Sleep(s.Conf.CheckFrequency) s.Lock("CollectStates") @@ -40,7 +39,7 @@ func (s *Schedule) RunAlert(a *conf.Alert) { for { wait := time.After(s.Conf.CheckFrequency * time.Duration(a.RunEvery)) s.checkAlert(a) - s.LastCheck = time.Now() + s.LastCheck = utcNow() <-wait } } @@ -51,7 +50,7 @@ func (s *Schedule) checkAlert(a *conf.Alert) { rh := s.NewRunHistory(checkTime, checkCache) s.CheckAlert(nil, rh, a) - start := time.Now() + start := utcNow() s.RunHistory(rh) slog.Infof("runHistory on %s took %v\n", a.Name, time.Since(start)) } diff --git a/cmd/bosun/sched/bolt.go b/cmd/bosun/sched/bolt.go index bcc2a3577b..6efdcfa333 100644 --- a/cmd/bosun/sched/bolt.go +++ b/cmd/bosun/sched/bolt.go @@ -3,109 +3,30 @@ package sched import ( "bytes" "compress/gzip" - "crypto/md5" - "encoding/base64" "encoding/gob" - "encoding/json" "fmt" - "io" - "os" "time" "bosun.org/_third_party/github.com/boltdb/bolt" - "bosun.org/cmd/bosun/conf" "bosun.org/cmd/bosun/database" "bosun.org/cmd/bosun/expr" - "bosun.org/collect" "bosun.org/metadata" "bosun.org/models" "bosun.org/opentsdb" "bosun.org/slog" ) -func (s *Schedule) performSave() { - for { - time.Sleep(60 * 10 * time.Second) // wait 10 minutes to throttle. - s.save() - } -} - -type counterWriter struct { - written int - w io.Writer -} - -func (c *counterWriter) Write(p []byte) (n int, err error) { - n, err = c.w.Write(p) - c.written += n - return n, err -} - const ( dbBucket = "bindata" dbConfigTextBucket = "configText" - dbNotifications = "notifications" ) -func (s *Schedule) save() { - if s.db == nil { - return - } - s.Lock("Save") - store := map[string]interface{}{ - dbNotifications: s.Notifications, - } - tostore := make(map[string][]byte) - for name, data := range store { - f := new(bytes.Buffer) - gz := gzip.NewWriter(f) - cw := &counterWriter{w: gz} - enc := gob.NewEncoder(cw) - if err := enc.Encode(data); err != nil { - slog.Errorf("error saving %s: %v", name, err) - s.Unlock() - return - } - if err := gz.Flush(); err != nil { - slog.Errorf("gzip flush error saving %s: %v", name, err) - } - if err := gz.Close(); err != nil { - slog.Errorf("gzip close error saving %s: %v", name, err) - } - tostore[name] = f.Bytes() - slog.Infof("wrote %s: %v", name, conf.ByteSize(cw.written)) - collect.Put("statefile.size", opentsdb.TagSet{"object": name}, cw.written) - } - s.Unlock() - err := s.db.Update(func(tx *bolt.Tx) error { - b, err := tx.CreateBucketIfNotExists([]byte(dbBucket)) - if err != nil { - return err - } - for name, data := range tostore { - if err := b.Put([]byte(name), data); err != nil { - return err - } - } - return nil - }) - if err != nil { - slog.Errorf("save db update error: %v", err) - return - } - fi, err := os.Stat(s.Conf.StateFile) - if err == nil { - collect.Put("statefile.size", opentsdb.TagSet{"object": "total"}, fi.Size()) - } - slog.Infoln("save to db complete") -} - func decode(db *bolt.DB, name string, dst interface{}) error { var data []byte err := db.View(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(dbBucket)) if b == nil { - return fmt.Errorf("unknown bucket: %v", dbBucket) + return nil } data = b.Get([]byte(name)) return nil @@ -127,81 +48,16 @@ func decode(db *bolt.DB, name string, dst interface{}) error { // RestoreState restores notification and alert state from the file on disk. func (s *Schedule) RestoreState() error { defer func() { - bosunStartupTime = time.Now() + bosunStartupTime = utcNow() }() slog.Infoln("RestoreState") - start := time.Now() + start := utcNow() s.Lock("RestoreState") defer s.Unlock() s.Search.Lock() defer s.Search.Unlock() - s.Notifications = nil - db := s.db - notifications := make(map[models.AlertKey]map[string]time.Time) - if err := decode(db, dbNotifications, ¬ifications); err != nil { - slog.Errorln(dbNotifications, err) - } - s.Notifications = notifications - - //status := make(States) - // if err := decode(db, dbStatus, &status); err != nil { - // slog.Errorln(dbStatus, err) - // } - // clear := func(r *models.Result) { - // if r == nil { - // return - // } - // r.Computations = nil - //} - //TODO: ??? - // for ak, st := range status { - // a, present := s.Conf.Alerts[ak.Name()] - // if !present { - // slog.Errorln("sched: alert no longer present, ignoring:", ak) - // continue - // } else if s.Conf.Squelched(a, st.Group) { - // slog.Infoln("sched: alert now squelched:", ak) - // continue - // } else { - // t := a.Unknown - // if t == 0 { - // t = s.Conf.CheckFrequency - // } - // if t == 0 && st.Last().Status == StUnknown { - // st.Append(&Event{Status: StNormal, IncidentId: st.Last().IncidentId}) - // } - // } - // clear(st.Result) - // newHistory := []Event{} - // for _, e := range st.History { - // clear(e.Warn) - // clear(e.Crit) - // // Remove error events which no longer are a thing. - // if e.Status <= StUnknown { - // newHistory = append(newHistory, e) - // } - // } - // st.History = newHistory - // s.status[ak] = st - // if a.Log && st.Open { - // st.Open = false - // slog.Infof("sched: alert %s is now log, closing, was %s", ak, st.Status()) - // } - // for name, t := range notifications[ak] { - // n, present := s.Conf.Notifications[name] - // if !present { - // slog.Infoln("sched: notification not present during restore:", name) - // continue - // } - // if a.Log { - // slog.Infoln("sched: alert is now log, removing notification:", ak) - // continue - // } - // s.AddNotification(ak, n, t) - // } - //} - if err := migrateOldDataToRedis(db, s.DataAccess); err != nil { + if err := migrateOldDataToRedis(s.db, s.DataAccess, s); err != nil { return err } // delete metrictags if they exist. @@ -215,60 +71,7 @@ type storedConfig struct { LastUsed time.Time } -// Saves the provided config text in state file for later access. -// Returns a hash of the file to be used as a retreival key. -func (s *Schedule) SaveTempConfig(text string) (hash string, err error) { - sig := md5.Sum([]byte(text)) - b64 := base64.StdEncoding.EncodeToString(sig[0:5]) - data := storedConfig{Text: text, LastUsed: time.Now()} - bindata, err := json.Marshal(data) - if err != nil { - return "", err - } - err = s.db.Update(func(tx *bolt.Tx) error { - b, err := tx.CreateBucketIfNotExists([]byte(dbConfigTextBucket)) - if err != nil { - return err - } - return b.Put([]byte(b64), bindata) - }) - if err != nil { - return "", err - } - return b64, nil -} - -// Retreive the specified config text from state file. -func (s *Schedule) LoadTempConfig(hash string) (text string, err error) { - config := storedConfig{} - err = s.db.View(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte(dbConfigTextBucket)) - data := b.Get([]byte(hash)) - if data == nil || len(data) == 0 { - return fmt.Errorf("Config text '%s' not found", hash) - } - return json.Unmarshal(data, &config) - }) - if err != nil { - return "", err - } - go s.SaveTempConfig(config.Text) //refresh timestamp. - return config.Text, nil -} - -func (s *Schedule) GetStateFileBackup() ([]byte, error) { - buf := bytes.Buffer{} - err := s.db.View(func(tx *bolt.Tx) error { - _, err := tx.WriteTo(&buf) - return err - }) - if err != nil { - return nil, err - } - return buf.Bytes(), nil -} - -func migrateOldDataToRedis(db *bolt.DB, data database.DataAccess) error { +func migrateOldDataToRedis(db *bolt.DB, data database.DataAccess, s *Schedule) error { if err := migrateMetricMetadata(db, data); err != nil { return err } @@ -284,6 +87,37 @@ func migrateOldDataToRedis(db *bolt.DB, data database.DataAccess) error { if err := migrateState(db, data); err != nil { return err } + if err := migrateNotifications(db, s); err != nil { + return err + } + return nil +} + +func migrateNotifications(db *bolt.DB, s *Schedule) error { + migrated, err := isMigrated(db, "notifications") + if err != nil { + return err + } + if !migrated { + slog.Info("Migrating notifications to new database format") + nots := map[models.AlertKey]map[string]time.Time{} + err := decode(db, "notifications", ¬s) + if err != nil { + return err + } + for ak, ns := range nots { + for n, t := range ns { + not := s.Conf.Notifications[n] + if not == nil { + continue + } + if err = s.DataAccess.Notifications().InsertNotification(ak, n, t.Add(not.Timeout)); err != nil { + return nil + } + } + } + setMigrated(db, "notifications") + } return nil } @@ -390,7 +224,7 @@ func migrateSearch(db *bolt.DB, data database.DataAccess) error { for tk, time := range v { data.Search().AddTagKeyForMetric(metric, tk, time) } - data.Search().AddMetric(metric, time.Now().Unix()) + data.Search().AddMetric(metric, utcNow().Unix()) } } else { return err diff --git a/cmd/bosun/sched/check.go b/cmd/bosun/sched/check.go index 1ec1c8fa53..aa25b203b2 100644 --- a/cmd/bosun/sched/check.go +++ b/cmd/bosun/sched/check.go @@ -40,7 +40,7 @@ func init() { func NewIncident(ak models.AlertKey) *models.IncidentState { s := &models.IncidentState{} - s.Start = time.Now() + s.Start = utcNow() s.AlertKey = ak s.Alert = ak.Name() s.Tags = ak.Group().Tags() @@ -106,7 +106,7 @@ func (s *Schedule) RunHistory(r *RunHistory) { func (s *Schedule) runHistory(r *RunHistory, ak models.AlertKey, event *models.Event, silenced SilenceTester) (checkNotify bool, err error) { event.Time = r.Start data := s.DataAccess.State() - err = data.TouchAlertKey(ak, time.Now()) + err = data.TouchAlertKey(ak, utcNow()) if err != nil { return } @@ -176,7 +176,7 @@ func (s *Schedule) runHistory(r *RunHistory, ak models.AlertKey, event *models.E notify := func(ns *conf.Notifications) { if a.Log { lastLogTime := s.lastLogTimes[ak] - now := time.Now() + now := utcNow() if now.Before(lastLogTime.Add(a.MaxLogFrequency)) { return } @@ -207,15 +207,14 @@ func (s *Schedule) runHistory(r *RunHistory, ak models.AlertKey, event *models.E notify(a.WarnNotification) } } - clearOld := func() { - incident.NeedAck = false - delete(s.Notifications, ak) - } // lock while we change notifications. s.Lock("RunHistory") if shouldNotify { - clearOld() + incident.NeedAck = false + if err = s.DataAccess.Notifications().ClearNotifications(ak); err != nil { + return + } notifyCurrent() } @@ -394,7 +393,7 @@ func (s *Schedule) CollectStates() { ts := opentsdb.TagSet{"notification": notification} var ago time.Duration if !timeStamp.Equal(time.Unix(1<<63-62135596801, 999999999)) { - ago = time.Now().UTC().Sub(timeStamp) + ago = utcNow().Sub(timeStamp) } err := collect.Put("alerts.oldest_unacked_by_notification", ts, @@ -456,11 +455,11 @@ func (s *Schedule) GetUnknownAndUnevaluatedAlertKeys(alert string) (unknown, une return unknown, uneval } -var bosunStartupTime = time.Now() +var bosunStartupTime = utcNow() func (s *Schedule) findUnknownAlerts(now time.Time, alert string) []models.AlertKey { keys := []models.AlertKey{} - if time.Now().Sub(bosunStartupTime) < s.Conf.CheckFrequency { + if utcNow().Sub(bosunStartupTime) < s.Conf.CheckFrequency { return keys } if !s.AlertSuccessful(alert) { @@ -485,7 +484,7 @@ func (s *Schedule) findUnknownAlerts(now time.Time, alert string) []models.Alert func (s *Schedule) CheckAlert(T miniprofiler.Timer, r *RunHistory, a *conf.Alert) { slog.Infof("check alert %v start", a.Name) - start := time.Now() + start := utcNow() for _, ak := range s.findUnknownAlerts(r.Start, a.Name) { r.Events[ak] = &models.Event{Status: models.StUnknown} } diff --git a/cmd/bosun/sched/check_test.go b/cmd/bosun/sched/check_test.go index f9c68c57e9..9ecf399e18 100644 --- a/cmd/bosun/sched/check_test.go +++ b/cmd/bosun/sched/check_test.go @@ -126,11 +126,11 @@ func TestCheckSilence(t *testing.T) { if err != nil { t.Fatal(err) } - _, err = s.AddSilence(time.Now().Add(-time.Hour), time.Now().Add(time.Hour), "a", "", false, true, "", "user", "message") + _, err = s.AddSilence(utcNow().Add(-time.Hour), utcNow().Add(time.Hour), "a", "", false, true, "", "user", "message") if err != nil { t.Fatal(err) } - check(s, time.Now()) + check(s, utcNow()) s.CheckNotifications() select { case <-done: @@ -220,7 +220,7 @@ func TestCheckNotify(t *testing.T) { if err != nil { t.Fatal(err) } - check(s, time.Now()) + check(s, utcNow()) s.CheckNotifications() select { case r := <-nc: @@ -396,7 +396,7 @@ func TestCheckNotifyLog(t *testing.T) { if err != nil { t.Fatal(err) } - check(s, time.Now()) + check(s, utcNow()) s.CheckNotifications() gotA := false gotB := false diff --git a/cmd/bosun/sched/host.go b/cmd/bosun/sched/host.go index 8284a84e18..bb17e0a673 100644 --- a/cmd/bosun/sched/host.go +++ b/cmd/bosun/sched/host.go @@ -43,7 +43,7 @@ func (s *Schedule) Host(filter string) (map[string]*HostData, error) { } return byKey, nil } - oldTimestamp := time.Now().Add(-timeFilterAge).Unix() + oldTimestamp := utcNow().Add(-timeFilterAge).Unix() oldOrErr := func(ts int64, err error) bool { if ts < oldTimestamp || err != nil { return true @@ -256,7 +256,7 @@ func (s *Schedule) Host(filter string) (map[string]*HostData, error) { ps.Status = statusString(int64(fstatus), 0, "Ok", "Bad") host.Hardware.PowerSupplies[id] = ps for _, m := range hostMetadata { - if m.Name != "psMeta" || m.Time.Before(time.Now().Add(-timeFilterAge)) || !m.Tags.Equal(ts) { + if m.Name != "psMeta" || m.Time.Before(utcNow().Add(-timeFilterAge)) || !m.Tags.Equal(ts) { continue } if val, ok := m.Value.(string); ok { @@ -305,7 +305,7 @@ func (s *Schedule) Host(filter string) (map[string]*HostData, error) { host.Hardware.Storage.PhysicalDisks[id] = pd } for _, m := range hostMetadata { - if m.Name != "physicalDiskMeta" || m.Time.Before(time.Now().Add(-timeFilterAge)) || !m.Tags.Equal(ts) { + if m.Name != "physicalDiskMeta" || m.Time.Before(utcNow().Add(-timeFilterAge)) || !m.Tags.Equal(ts) { continue } if val, ok := m.Value.(string); ok { @@ -344,7 +344,7 @@ func (s *Schedule) Host(filter string) (map[string]*HostData, error) { host.Hardware.Storage.Controllers[id] = c } for _, m := range hostMetadata { - if m.Name != "controllerMeta" || m.Time.Before(time.Now().Add(-timeFilterAge)) || !m.Tags.Equal(ts) { + if m.Name != "controllerMeta" || m.Time.Before(utcNow().Add(-timeFilterAge)) || !m.Tags.Equal(ts) { continue } if val, ok := m.Value.(string); ok { @@ -370,7 +370,7 @@ func (s *Schedule) Host(filter string) (map[string]*HostData, error) { } host.Disks[disk] = d for _, m := range hostMetadata { - if m.Name != "label" || m.Time.Before(time.Now().Add(-timeFilterAge)) || !m.Tags.Equal(ts) { + if m.Name != "label" || m.Time.Before(utcNow().Add(-timeFilterAge)) || !m.Tags.Equal(ts) { continue } if label, ok := m.Value.(string); ok { @@ -391,7 +391,7 @@ func (s *Schedule) Host(filter string) (map[string]*HostData, error) { host.Memory.UsedBytes, _, _ = s.Search.GetLast("os.mem.used", hostTagSet.String(), false) host.UptimeSeconds, _, _ = s.Search.GetLastInt64("os.system.uptime", hostTagSet.String(), false) for _, m := range hostMetadata { - if m.Time.Before(time.Now().Add(-timeFilterAge)) { + if m.Time.Before(utcNow().Add(-timeFilterAge)) { continue } var iface *HostInterface diff --git a/cmd/bosun/sched/notify.go b/cmd/bosun/sched/notify.go index aa00e5ee12..dfda390d91 100644 --- a/cmd/bosun/sched/notify.go +++ b/cmd/bosun/sched/notify.go @@ -15,13 +15,21 @@ import ( func (s *Schedule) dispatchNotifications() { ticker := time.NewTicker(s.Conf.CheckFrequency * 2) - nextScheduled := time.After(s.CheckNotifications()) + var next <-chan time.Time + nextAt := func(t time.Time) { + diff := t.Sub(utcNow()) + if diff <= 0 { + diff = time.Millisecond + } + next = time.After(diff) + } + nextAt(utcNow()) for { select { - case <-nextScheduled: - nextScheduled = time.After(s.CheckNotifications()) + case <-next: + nextAt(s.CheckNotifications()) case <-s.nc: - nextScheduled = time.After(s.CheckNotifications()) + nextAt(s.CheckNotifications()) case <-ticker.C: s.sendUnknownNotifications() } @@ -36,14 +44,17 @@ func (s *Schedule) Notify(st *models.IncidentState, n *conf.Notification) { s.pendingNotifications[n] = append(s.pendingNotifications[n], st) } -// CheckNotifications processes past notification events. It returns the -// duration until the soonest notification triggers. -func (s *Schedule) CheckNotifications() time.Duration { +// CheckNotifications processes past notification events. It returns the next time a notification is needed. +func (s *Schedule) CheckNotifications() time.Time { silenced := s.Silenced() s.Lock("CheckNotifications") defer s.Unlock() - notifications := s.Notifications - s.Notifications = nil + latestTime := utcNow() + notifications, err := s.DataAccess.Notifications().GetDueNotifications() + if err != nil { + slog.Error("Error getting notifications", err) + return utcNow().Add(time.Minute) + } for ak, ns := range notifications { if si := silenced(ak); si != nil { slog.Infoln("silencing", ak) @@ -54,12 +65,6 @@ func (s *Schedule) CheckNotifications() time.Duration { if !present { continue } - remaining := t.Add(n.Timeout).Sub(time.Now()) - if remaining > 0 { - s.AddNotification(ak, n, t) - continue - } - //If alert is currently unevaluated because of a dependency, //simply requeue it until the dependency resolves itself. _, uneval := s.GetUnknownAndUnevaluatedAlertKeys(ak.Name()) @@ -71,7 +76,7 @@ func (s *Schedule) CheckNotifications() time.Duration { } } if unevaluated { - s.AddNotification(ak, n, t) + s.QueueNotification(ak, n, t.Add(time.Minute)) continue } st, err := s.DataAccess.State().GetLatestIncident(ak) @@ -82,25 +87,20 @@ func (s *Schedule) CheckNotifications() time.Duration { if st == nil { continue } - s.Notify(st, n) } } s.sendNotifications(silenced) s.pendingNotifications = nil - timeout := time.Hour - now := time.Now() - for _, ns := range s.Notifications { - for name, t := range ns { - n, present := s.Conf.Notifications[name] - if !present { - continue - } - remaining := t.Add(n.Timeout).Sub(now) - if remaining < timeout { - timeout = remaining - } - } + err = s.DataAccess.Notifications().ClearNotificationsBefore(latestTime) + if err != nil { + slog.Error("Error clearing notifications", err) + return utcNow().Add(time.Minute) + } + timeout, err := s.DataAccess.Notifications().GetNextNotificationTime() + if err != nil { + slog.Error("Error getting next notification time", err) + return utcNow().Add(time.Minute) } return timeout } @@ -126,7 +126,7 @@ func (s *Schedule) sendNotifications(silenced SilenceTester) { s.notify(st, n) } if n.Next != nil { - s.AddNotification(ak, n.Next, time.Now().UTC()) + s.QueueNotification(ak, n.Next, utcNow()) } } } @@ -195,7 +195,7 @@ func (s *Schedule) notify(st *models.IncidentState, n *conf.Notification) { // utnotify is single notification for N unknown groups into a single notification func (s *Schedule) utnotify(groups map[string]models.AlertKeys, n *conf.Notification) { var total int - now := time.Now().UTC() + now := utcNow() for _, group := range groups { // Don't know what the following line does, just copied from unotify s.Group[now] = group @@ -230,7 +230,7 @@ var defaultUnknownTemplate = &conf.Template{ func (s *Schedule) unotify(name string, group models.AlertKeys, n *conf.Notification) { subject := new(bytes.Buffer) body := new(bytes.Buffer) - now := time.Now().UTC() + now := utcNow() s.Group[now] = group t := s.Conf.UnknownTemplate if t == nil { @@ -250,14 +250,8 @@ func (s *Schedule) unotify(name string, group models.AlertKeys, n *conf.Notifica n.Notify(subject.String(), body.String(), subject.Bytes(), body.Bytes(), s.Conf, name) } -func (s *Schedule) AddNotification(ak models.AlertKey, n *conf.Notification, started time.Time) { - if s.Notifications == nil { - s.Notifications = make(map[models.AlertKey]map[string]time.Time) - } - if s.Notifications[ak] == nil { - s.Notifications[ak] = make(map[string]time.Time) - } - s.Notifications[ak][n.Name] = started +func (s *Schedule) QueueNotification(ak models.AlertKey, n *conf.Notification, started time.Time) error { + return s.DataAccess.Notifications().InsertNotification(ak, n.Name, started.Add(n.Timeout)) } var actionNotificationSubjectTemplate *ttemplate.Template diff --git a/cmd/bosun/sched/sched.go b/cmd/bosun/sched/sched.go index 84be85738e..92450f05fd 100644 --- a/cmd/bosun/sched/sched.go +++ b/cmd/bosun/sched/sched.go @@ -25,6 +25,10 @@ import ( "bosun.org/slog" ) +func utcNow() time.Time { + return time.Now().UTC() +} + func init() { gob.Register(expr.Number(0)) gob.Register(expr.Scalar(0)) @@ -45,8 +49,7 @@ type Schedule struct { nc chan interface{} //notifications to be sent immediately pendingNotifications map[*conf.Notification][]*models.IncidentState - //notifications we are currently tracking, potentially with future or repeated actions. - Notifications map[models.AlertKey]map[string]time.Time + //unknown states that need to be notified about. Collected and sent in batches. pendingUnknowns map[*conf.Notification][]*models.IncidentState @@ -70,8 +73,8 @@ func (s *Schedule) Init(c *conf.Conf) error { s.Group = make(map[time.Time]models.AlertKeys) s.pendingUnknowns = make(map[*conf.Notification][]*models.IncidentState) s.lastLogTimes = make(map[models.AlertKey]time.Time) - s.LastCheck = time.Now() - s.ctx = &checkContext{time.Now(), cache.New(0)} + s.LastCheck = utcNow() + s.ctx = &checkContext{utcNow(), cache.New(0)} if s.DataAccess == nil { if c.RedisHost != "" { s.DataAccess = database.NewDataAccess(c.RedisHost, true, c.RedisDb, c.RedisPassword) @@ -111,9 +114,9 @@ func init() { } func (s *Schedule) Lock(method string) { - start := time.Now() + start := utcNow() s.mutex.Lock() - s.mutexAquired = time.Now() + s.mutexAquired = utcNow() s.mutexHolder = method s.mutexWaitTime = int64(s.mutexAquired.Sub(start) / time.Millisecond) // remember this so we don't have to call put until we leave the critical section. } @@ -137,7 +140,7 @@ func (s *Schedule) PutMetadata(k metadata.Metakey, v interface{}) error { isCoreMeta := (k.Name == "desc" || k.Name == "unit" || k.Name == "rate") if !isCoreMeta { - s.DataAccess.Metadata().PutTagMetadata(k.TagSet(), k.Name, fmt.Sprint(v), time.Now().UTC()) + s.DataAccess.Metadata().PutTagMetadata(k.TagSet(), k.Name, fmt.Sprint(v), utcNow()) return nil } if k.Metric == "" { @@ -505,12 +508,6 @@ func Close() { } func (s *Schedule) Close() { - s.save() - s.Lock("Close") - if s.db != nil { - s.db.Close() - } - s.Unlock() err := s.Search.BackupLast() if err != nil { slog.Error(err) @@ -585,12 +582,8 @@ func (s *Schedule) Action(user, message string, t models.ActionType, ak models.A if st == nil { return fmt.Errorf("no such alert key: %v", ak) } - ack := func() { - delete(s.Notifications, ak) - st.NeedAck = false - } isUnknown := st.LastAbnormalStatus == models.StUnknown - timestamp := time.Now().UTC() + timestamp := utcNow() switch t { case models.ActionAcknowledge: if !st.NeedAck { @@ -599,11 +592,11 @@ func (s *Schedule) Action(user, message string, t models.ActionType, ak models.A if !st.Open { return fmt.Errorf("cannot acknowledge closed alert") } - ack() - case models.ActionClose: - if st.NeedAck { - ack() + st.NeedAck = false + if err := s.DataAccess.Notifications().ClearNotifications(ak); err != nil { + return err } + case models.ActionClose: if st.IsActive() { return fmt.Errorf("cannot close active alert") } diff --git a/cmd/bosun/sched/silence.go b/cmd/bosun/sched/silence.go index f380fb2a4b..522022e10b 100644 --- a/cmd/bosun/sched/silence.go +++ b/cmd/bosun/sched/silence.go @@ -14,7 +14,7 @@ type SilenceTester func(models.AlertKey) *models.Silence // Silenced returns a function that will determine if the given alert key is silenced at the current time. // A function is returned to avoid needing to enumerate all alert keys unneccesarily. func (s *Schedule) Silenced() SilenceTester { - now := time.Now() + now := utcNow() silences, err := s.DataAccess.Silence().GetActiveSilences() if err != nil { slog.Error("Error fetching silences.", err) diff --git a/cmd/bosun/web/expr.go b/cmd/bosun/web/expr.go index 0991a6f181..620fd8384a 100644 --- a/cmd/bosun/web/expr.go +++ b/cmd/bosun/web/expr.go @@ -446,7 +446,7 @@ func buildConfig(r *http.Request) (c *conf.Conf, a *conf.Alert, hash string, err } c.StateFile = "" - hash, err = sched.DefaultSched.SaveTempConfig(string(config)) + hash, err = sched.DefaultSched.DataAccess.Configs().SaveTempConfig(string(config)) if err != nil { return nil, nil, "", err } diff --git a/cmd/bosun/web/web.go b/cmd/bosun/web/web.go index ddddefb829..b3aa67355b 100644 --- a/cmd/bosun/web/web.go +++ b/cmd/bosun/web/web.go @@ -87,7 +87,6 @@ func Listen(listenAddr string, devMode bool, tsdbHost string) error { router.HandleFunc("/api/", APIRedirect) router.Handle("/api/action", JSON(Action)) router.Handle("/api/alerts", JSON(Alerts)) - router.Handle("/api/backup", JSON(Backup)) router.Handle("/api/config", miniprofiler.NewHandler(Config)) router.Handle("/api/config_test", miniprofiler.NewHandler(ConfigTest)) router.Handle("/api/egraph/{bs}.svg", JSON(ExprGraph)) @@ -419,15 +418,6 @@ func Alerts(t miniprofiler.Timer, w http.ResponseWriter, r *http.Request) (inter return schedule.MarshalGroups(t, r.FormValue("filter")) } -func Backup(t miniprofiler.Timer, w http.ResponseWriter, r *http.Request) (interface{}, error) { - data, err := schedule.GetStateFileBackup() - if err != nil { - return nil, err - } - _, err = w.Write(data) - return nil, err -} - func IncidentEvents(t miniprofiler.Timer, w http.ResponseWriter, r *http.Request) (interface{}, error) { id := r.FormValue("id") if id == "" { @@ -659,7 +649,7 @@ func Config(t miniprofiler.Timer, w http.ResponseWriter, r *http.Request) { var text string var err error if hash := r.FormValue("hash"); hash != "" { - text, err = schedule.LoadTempConfig(hash) + text, err = schedule.DataAccess.Configs().GetTempConfig(hash) if err != nil { serveError(w, err) return