Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moving silence to redis #1458

Merged
merged 1 commit into from
Dec 14, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/bosun/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type DataAccess interface {
Metadata() MetadataDataAccess
Search() SearchDataAccess
Errors() ErrorDataAccess
Silence() SilenceDataAccess
Incidents() IncidentDataAccess
}

Expand Down
141 changes: 141 additions & 0 deletions cmd/bosun/database/silence_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package database

import (
"encoding/json"
"log"
"time"

"bosun.org/_third_party/github.com/garyburd/redigo/redis"
"bosun.org/collect"
"bosun.org/models"
"bosun.org/opentsdb"
)

/*

Silences : hash of Id - json of silence. Id is sha of fields

SilencesByEnd : zlist of end-time to id.

Easy to find active. Find all with end time in future, and filter to those with start time in the past.

*/

const (
silenceHash = "Silences"
silenceIdx = "SilencesByEnd"
)

type SilenceDataAccess interface {
GetActiveSilences() ([]*models.Silence, error)
AddSilence(*models.Silence) error
DeleteSilence(id string) error

ListSilences(endingAfter int64) (map[string]*models.Silence, error)
}

func (d *dataAccess) Silence() SilenceDataAccess {
return d
}

func (d *dataAccess) GetActiveSilences() ([]*models.Silence, error) {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "GetActiveSilences"})()
conn := d.GetConnection()
defer conn.Close()

now := time.Now().UTC()
vals, err := redis.Strings(conn.Do("ZRANGEBYSCORE", silenceIdx, now.Unix(), "+inf"))
if err != nil {
return nil, err
}
if len(vals) == 0 {
return nil, nil
}
silences, err := getSilences(vals, conn)
if err != nil {
return nil, err
}
filtered := make([]*models.Silence, 0, len(silences))
for _, s := range silences {
if s.Start.After(now) {
continue
}
filtered = append(filtered, s)
}
return filtered, nil
}

func getSilences(ids []string, conn redis.Conn) ([]*models.Silence, error) {
args := make([]interface{}, len(ids)+1)
args[0] = silenceHash
for i := range ids {
args[i+1] = ids[i]
}
jsons, err := redis.Strings(conn.Do("HMGET", args...))
if err != nil {
log.Fatal(err, args)
return nil, err
}
silences := make([]*models.Silence, 0, len(jsons))
for _, j := range jsons {
s := &models.Silence{}
if err := json.Unmarshal([]byte(j), s); err != nil {
return nil, err
}
silences = append(silences, s)
}
return silences, nil
}

func (d *dataAccess) AddSilence(s *models.Silence) error {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "AddSilence"})()
conn := d.GetConnection()
defer conn.Close()

if _, err := conn.Do("ZADD", silenceIdx, s.End.UTC().Unix(), s.ID()); err != nil {
return err
}
dat, err := json.Marshal(s)
if err != nil {
return err
}
_, err = conn.Do("HSET", silenceHash, s.ID(), dat)
return err
}

func (d *dataAccess) DeleteSilence(id string) error {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "DeleteSilence"})()
conn := d.GetConnection()
defer conn.Close()

if _, err := conn.Do("ZREM", silenceIdx, id); err != nil {
return err
}
if _, err := conn.Do("HDEL", silenceHash, id); err != nil {
return err
}
return nil
}

func (d *dataAccess) ListSilences(endingAfter int64) (map[string]*models.Silence, error) {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "ListSilences"})()
conn := d.GetConnection()
defer conn.Close()

ids, err := redis.Strings(conn.Do("ZRANGEBYSCORE", silenceIdx, endingAfter, "+inf"))
if err != nil {
return nil, err
}
if len(ids) == 0 {
return map[string]*models.Silence{}, nil
}
silences, err := getSilences(ids, conn)
if err != nil {
return nil, err
}
m := make(map[string]*models.Silence, len(silences))
for _, s := range silences {
m[s.ID()] = s
}
return m, nil
}
9 changes: 8 additions & 1 deletion cmd/bosun/database/test/database_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package dbtest

import (
"fmt"
"math/rand"
"os"
"path/filepath"
"runtime"
"testing"
"time"

Expand Down Expand Up @@ -34,6 +37,10 @@ func randString(l int) string {

func check(t *testing.T, err error) {
if err != nil {
t.Fatal(err)
s := err.Error()
if _, filename, line, ok := runtime.Caller(1); ok {
s = fmt.Sprintf("%s:%d: %v", filepath.Base(filename), line, s)
}
t.Fatal(s)
}
}
39 changes: 39 additions & 0 deletions cmd/bosun/database/test/silence_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package dbtest

import (
"testing"
"time"

"bosun.org/models"
)

func TestSilence(t *testing.T) {
sd := testData.Silence()

silence := &models.Silence{
Start: time.Now().Add(-48 * time.Hour),
End: time.Now().Add(5 * time.Hour),
Alert: "Foo",
}
future := &models.Silence{
Start: time.Now().Add(1 * time.Hour),
End: time.Now().Add(2 * time.Hour),
Alert: "Foo",
}
past := &models.Silence{
Start: time.Now().Add(-48 * time.Hour),
End: time.Now().Add(-5 * time.Hour),
Alert: "Foo",
}

check(t, sd.AddSilence(silence))
check(t, sd.AddSilence(past))
check(t, sd.AddSilence(future))

active, err := sd.GetActiveSilences()
check(t, err)
if len(active) != 1 {
t.Fatalf("Expected only one active silence. Got %d.", len(active))
}

}
31 changes: 26 additions & 5 deletions cmd/bosun/sched/bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ const (
dbBucket = "bindata"
dbConfigTextBucket = "configText"
dbNotifications = "notifications"
dbSilence = "silence"
dbStatus = "status"
)

Expand All @@ -55,7 +54,6 @@ func (s *Schedule) save() {
s.Lock("Save")
store := map[string]interface{}{
dbNotifications: s.Notifications,
dbSilence: s.Silence,
dbStatus: s.status,
}
tostore := make(map[string][]byte)
Expand Down Expand Up @@ -142,9 +140,6 @@ func (s *Schedule) RestoreState() error {
if err := decode(db, dbNotifications, &notifications); err != nil {
slog.Errorln(dbNotifications, err)
}
if err := decode(db, dbSilence, &s.Silence); err != nil {
slog.Errorln(dbSilence, err)
}

status := make(States)
if err := decode(db, dbStatus, &status); err != nil {
Expand Down Expand Up @@ -280,6 +275,9 @@ func migrateOldDataToRedis(db *bolt.DB, data database.DataAccess) error {
if err := migrateIncidents(db, data); err != nil {
return err
}
if err := migrateSilence(db, data); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -441,6 +439,29 @@ func migrateIncidents(db *bolt.DB, data database.DataAccess) error {
return nil
}

func migrateSilence(db *bolt.DB, data database.DataAccess) error {
migrated, err := isMigrated(db, "silence")
if err != nil {
return err
}
if migrated {
return nil
}
slog.Info("migrating silence")
silence := map[string]*models.Silence{}
if err := decode(db, "silence", &silence); err != nil {
return err
}
for _, v := range silence {
v.TagString = v.Tags.Tags()
data.Silence().AddSilence(v)
}
if err = setMigrated(db, "silence"); err != nil {
return err
}
return nil
}

func isMigrated(db *bolt.DB, name string) (bool, error) {
found := false
err := db.View(func(tx *bolt.Tx) error {
Expand Down
2 changes: 1 addition & 1 deletion cmd/bosun/sched/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *Schedule) RunHistory(r *RunHistory) {
}

// RunHistory for a single alert key. Returns true if notifications were altered.
func (s *Schedule) runHistory(r *RunHistory, ak models.AlertKey, event *Event, silenced map[models.AlertKey]Silence) bool {
func (s *Schedule) runHistory(r *RunHistory, ak models.AlertKey, event *Event, silenced map[models.AlertKey]models.Silence) bool {
checkNotify := false
// get existing state object for alert key. add to schedule status if doesn't already exist
state := s.GetStatus(ak)
Expand Down
2 changes: 1 addition & 1 deletion cmd/bosun/sched/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ func (s *Schedule) Host(filter string) (map[string]*HostData, error) {
return hosts, nil
}

func processHostIncidents(host *HostData, states States, silences map[models.AlertKey]Silence) {
func processHostIncidents(host *HostData, states States, silences map[models.AlertKey]models.Silence) {
for ak, state := range states {
if stateHost, ok := state.Group["host"]; !ok {
continue
Expand Down
2 changes: 1 addition & 1 deletion cmd/bosun/sched/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *Schedule) CheckNotifications() time.Duration {
return timeout
}

func (s *Schedule) sendNotifications(silenced map[models.AlertKey]Silence) {
func (s *Schedule) sendNotifications(silenced map[models.AlertKey]models.Silence) {
if s.Conf.Quiet {
slog.Infoln("quiet mode prevented", len(s.pendingNotifications), "notifications")
return
Expand Down
12 changes: 5 additions & 7 deletions cmd/bosun/sched/sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ type Schedule struct {
mutexAquired time.Time
mutexWaitTime int64

Conf *conf.Conf
status States
Silence map[string]*Silence
Group map[time.Time]models.AlertKeys
Conf *conf.Conf
status States
Group map[time.Time]models.AlertKeys

Search *search.Search

Expand Down Expand Up @@ -68,7 +67,6 @@ func (s *Schedule) Init(c *conf.Conf) error {
//be avoided.
var err error
s.Conf = c
s.Silence = make(map[string]*Silence)
s.Group = make(map[time.Time]models.AlertKeys)
s.pendingUnknowns = make(map[*conf.Notification][]*State)
s.status = make(States)
Expand Down Expand Up @@ -225,7 +223,7 @@ type StateTuple struct {
}

// GroupStates groups by NeedAck, Active, Status, and Silenced.
func (states States) GroupStates(silenced map[models.AlertKey]Silence) map[StateTuple]States {
func (states States) GroupStates(silenced map[models.AlertKey]models.Silence) map[StateTuple]States {
r := make(map[StateTuple]States)
for ak, st := range states {
_, sil := silenced[ak]
Expand Down Expand Up @@ -362,7 +360,7 @@ type StateGroups struct {
}

func (s *Schedule) MarshalGroups(T miniprofiler.Timer, filter string) (*StateGroups, error) {
var silenced map[models.AlertKey]Silence
var silenced map[models.AlertKey]models.Silence
T.Step("Silenced", func(miniprofiler.Timer) {
silenced = s.Silenced()
})
Expand Down
19 changes: 19 additions & 0 deletions cmd/bosun/sched/sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,18 @@ type nopDataAccess struct {
database.SearchDataAccess
database.ErrorDataAccess
database.IncidentDataAccess
database.SilenceDataAccess
failingAlerts map[string]bool
idCounter uint64
incidents map[uint64]*models.Incident
silences map[string]*models.Silence
}

func (n *nopDataAccess) Search() database.SearchDataAccess { return n }
func (n *nopDataAccess) Metadata() database.MetadataDataAccess { return n }
func (n *nopDataAccess) Errors() database.ErrorDataAccess { return n }
func (n *nopDataAccess) Incidents() database.IncidentDataAccess { return n }
func (n *nopDataAccess) Silence() database.SilenceDataAccess { return n }

func (n *nopDataAccess) BackupLastInfos(map[string]map[string]*database.LastInfo) error { return nil }
func (n *nopDataAccess) LoadLastInfos() (map[string]map[string]*database.LastInfo, error) {
Expand Down Expand Up @@ -95,13 +98,29 @@ func (n *nopDataAccess) UpdateIncident(id uint64, i *models.Incident) error {
n.incidents[id] = i
return nil
}
func (n *nopDataAccess) GetActiveSilences() ([]*models.Silence, error) {
r := make([]*models.Silence, 0, len(n.silences))
for _, s := range n.silences {
r = append(r, s)
}
return r, nil
}
func (n *nopDataAccess) DeleteSilence(id string) error {
delete(n.silences, id)
return nil
}
func (n *nopDataAccess) AddSilence(s *models.Silence) error {
n.silences[s.ID()] = s
return nil
}

func initSched(c *conf.Conf) (*Schedule, error) {
c.StateFile = ""
s := new(Schedule)
s.DataAccess = &nopDataAccess{
failingAlerts: map[string]bool{},
incidents: map[uint64]*models.Incident{},
silences: map[string]*models.Silence{},
}
err := s.Init(c)
return s, err
Expand Down
Loading