Skip to content

Commit

Permalink
cmd/bosun: promote scalars to numberSets
Browse files Browse the repository at this point in the history
fixes #1123
  • Loading branch information
maddyblue committed Jul 7, 2015
1 parent 42266d2 commit fa76640
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 43 deletions.
11 changes: 10 additions & 1 deletion cmd/bosun/expr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func (e *State) walkFunc(node *parse.FuncNode, T miniprofiler.Timer) *Results {
var res *Results
T.Step("func: "+node.Name, func(T miniprofiler.Timer) {
var in []reflect.Value
for _, a := range node.Args {
for i, a := range node.Args {
var v interface{}
switch t := a.(type) {
case *parse.StringNode:
Expand All @@ -594,6 +594,15 @@ func (e *State) walkFunc(node *parse.FuncNode, T miniprofiler.Timer) *Results {
default:
panic(fmt.Errorf("expr: unknown func arg type"))
}
if f, ok := v.(float64); ok && node.F.Args[i] == parse.TypeNumberSet {
v = &Results{
Results: ResultSlice{
&Result{
Value: Number(f),
},
},
}
}
in = append(in, reflect.ValueOf(v))
}
f := reflect.ValueOf(node.F.F)
Expand Down
28 changes: 28 additions & 0 deletions cmd/bosun/expr/expr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,31 @@ func TestQueryExpr(t *testing.T) {
}
}
}

func TestScalarPromotion(t *testing.T) {
tests := map[string]map[string]Value{
`abs(-1)`: {"": Number(1)},
}

for exprText, expected := range tests {
e, err := New(exprText)
if err != nil {
t.Fatal(err)
}
results, _, err := e.Execute(nil, nil, nil, nil, nil, queryTime, 0, false, nil, nil, nil)
if err != nil {
t.Fatal(err)
}
for _, r := range results.Results {
tag := r.Group.Tags()
ex := expected[tag]
if ex == nil {
t.Errorf("missing tag %v", tag)
continue
}
if ex != r.Value {
t.Errorf("unmatched values in %v", tag)
}
}
}
}
99 changes: 66 additions & 33 deletions cmd/bosun/expr/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ var builtins = map[string]parse.Func{
F: First,
},
"forecastlr": {
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeScalar},
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeNumberSet},
Return: parse.TypeNumberSet,
Tags: tagFirst,
F: Forecast_lr,
Expand Down Expand Up @@ -196,7 +196,7 @@ var builtins = map[string]parse.Func{
F: Min,
},
"percentile": {
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeScalar},
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeNumberSet},
Return: parse.TypeNumberSet,
Tags: tagFirst,
F: Percentile,
Expand Down Expand Up @@ -260,25 +260,25 @@ var builtins = map[string]parse.Func{
F: Des,
},
"dropge": {
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeScalar},
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeNumberSet},
Return: parse.TypeSeriesSet,
Tags: tagFirst,
F: DropGe,
},
"dropg": {
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeScalar},
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeNumberSet},
Return: parse.TypeSeriesSet,
Tags: tagFirst,
F: DropG,
},
"drople": {
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeScalar},
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeNumberSet},
Return: parse.TypeSeriesSet,
Tags: tagFirst,
F: DropLe,
},
"dropl": {
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeScalar},
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeNumberSet},
Return: parse.TypeSeriesSet,
Tags: tagFirst,
F: DropL,
Expand Down Expand Up @@ -380,39 +380,41 @@ func Duration(e *State, T miniprofiler.Timer, d string) (*Results, error) {
}, nil
}

func DropValues(e *State, T miniprofiler.Timer, series *Results, threshold float64, dropFunction func(float64, float64) bool) (*Results, error) {
for _, res := range series.Results {
func DropValues(e *State, T miniprofiler.Timer, series *Results, threshold *Results, dropFunction func(float64, float64) bool) (*Results, error) {
f := func(res *Results, s *Result, floats []float64) error {
nv := make(Series)
for k, v := range res.Value.Value().(Series) {
if !dropFunction(float64(v), threshold) {
for k, v := range s.Value.Value().(Series) {
if !dropFunction(float64(v), floats[0]) {
//preserve values which should not be discarded
nv[k] = v
}
}
if len(nv) == 0 {
return nil, fmt.Errorf("series %s is empty", res.Group)
return fmt.Errorf("series %s is empty", s.Group)
}
res.Value = nv
s.Value = nv
res.Results = append(res.Results, s)
return nil
}
return series, nil
return match(f, series, threshold)
}

func DropGe(e *State, T miniprofiler.Timer, series *Results, threshold float64) (*Results, error) {
func DropGe(e *State, T miniprofiler.Timer, series *Results, threshold *Results) (*Results, error) {
dropFunction := func(value float64, threshold float64) bool { return value >= threshold }
return DropValues(e, T, series, threshold, dropFunction)
}

func DropG(e *State, T miniprofiler.Timer, series *Results, threshold float64) (*Results, error) {
func DropG(e *State, T miniprofiler.Timer, series *Results, threshold *Results) (*Results, error) {
dropFunction := func(value float64, threshold float64) bool { return value > threshold }
return DropValues(e, T, series, threshold, dropFunction)
}

func DropLe(e *State, T miniprofiler.Timer, series *Results, threshold float64) (*Results, error) {
func DropLe(e *State, T miniprofiler.Timer, series *Results, threshold *Results) (*Results, error) {
dropFunction := func(value float64, threshold float64) bool { return value <= threshold }
return DropValues(e, T, series, threshold, dropFunction)
}

func DropL(e *State, T miniprofiler.Timer, series *Results, threshold float64) (*Results, error) {
func DropL(e *State, T miniprofiler.Timer, series *Results, threshold *Results) (*Results, error) {
dropFunction := func(value float64, threshold float64) bool { return value < threshold }
return DropValues(e, T, series, threshold, dropFunction)
}
Expand All @@ -421,7 +423,7 @@ func DropNA(e *State, T miniprofiler.Timer, series *Results) (*Results, error) {
dropFunction := func(value float64, threshold float64) bool {
return math.IsNaN(float64(value)) || math.IsInf(float64(value), 0)
}
return DropValues(e, T, series, 0, dropFunction)
return DropValues(e, T, series, fromScalar(0), dropFunction)
}

func parseGraphiteResponse(req *graphite.Request, s *graphite.Response, formatTags []string) ([]*Result, error) {
Expand Down Expand Up @@ -874,32 +876,63 @@ func Change(e *State, T miniprofiler.Timer, query, sduration, eduration string)
if err != nil {
return
}
r, err = reduce(e, T, r, change, (sd - ed).Seconds())
r, err = reduce(e, T, r, change, fromScalar((sd - ed).Seconds()))
return
}

func change(dps Series, args ...float64) float64 {
return avg(dps) * args[0]
}

func reduce(e *State, T miniprofiler.Timer, series *Results, F func(Series, ...float64) float64, args ...float64) (*Results, error) {
func fromScalar(f float64) *Results {
return &Results{
Results: ResultSlice{
&Result{
Value: Number(f),
},
},
}
}

func match(f func(res *Results, series *Result, floats []float64) error, series *Results, numberSets ...*Results) (*Results, error) {
res := *series
res.Results = nil
for _, s := range series.Results {
switch t := s.Value.(type) {
case Series:
if len(t) == 0 {
continue
var floats []float64
for _, num := range numberSets {
for _, n := range num.Results {
if len(n.Group) == 0 || s.Group.Overlaps(n.Group) {
floats = append(floats, float64(n.Value.(Number)))
break
}
}
s.Value = Number(F(t, args...))
res.Results = append(res.Results, s)
default:
panic(fmt.Errorf("expr: expected a series"))
}
if len(floats) != len(numberSets) {
if !series.IgnoreUnjoined {
return nil, fmt.Errorf("unjoined groups for %s", s.Group)
}
continue
}
if err := f(&res, s, floats); err != nil {
return nil, err
}
}
return &res, nil
}

func reduce(e *State, T miniprofiler.Timer, series *Results, F func(Series, ...float64) float64, args ...*Results) (*Results, error) {
f := func(res *Results, s *Result, floats []float64) error {
t := s.Value.(Series)
if len(t) == 0 {
return nil
}
s.Value = Number(F(t, floats...))
res.Results = append(res.Results, s)
return nil
}
return match(f, series, args...)
}

func Abs(e *State, T miniprofiler.Timer, series *Results) *Results {
for _, s := range series.Results {
s.Value = Number(math.Abs(float64(s.Value.Value().(Number))))
Expand Down Expand Up @@ -1070,7 +1103,7 @@ func (e *State) since(dps Series, args ...float64) (a float64) {
return s.Seconds()
}

func Forecast_lr(e *State, T miniprofiler.Timer, series *Results, y float64) (r *Results, err error) {
func Forecast_lr(e *State, T miniprofiler.Timer, series *Results, y *Results) (r *Results, err error) {
return reduce(e, T, series, e.forecast_lr, y)
}

Expand Down Expand Up @@ -1107,20 +1140,20 @@ func (e *State) forecast_lr(dps Series, args ...float64) float64 {
return s.Seconds()
}

func Percentile(e *State, T miniprofiler.Timer, series *Results, p float64) (r *Results, err error) {
func Percentile(e *State, T miniprofiler.Timer, series *Results, p *Results) (r *Results, err error) {
return reduce(e, T, series, percentile, p)
}

func Min(e *State, T miniprofiler.Timer, series *Results) (r *Results, err error) {
return reduce(e, T, series, percentile, 0)
return reduce(e, T, series, percentile, fromScalar(0))
}

func Median(e *State, T miniprofiler.Timer, series *Results) (r *Results, err error) {
return reduce(e, T, series, percentile, .5)
return reduce(e, T, series, percentile, fromScalar(.5))
}

func Max(e *State, T miniprofiler.Timer, series *Results) (r *Results, err error) {
return reduce(e, T, series, percentile, 1)
return reduce(e, T, series, percentile, fromScalar(1))
}

// percentile returns the value at the corresponding percentile between 0 and 1.
Expand Down
4 changes: 3 additions & 1 deletion cmd/bosun/expr/parse/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ func (f *FuncNode) Check(t *Tree) error {
for i, a := range f.Args {
ft := f.F.Args[i]
at := a.Return()
if ft != at {
if ft == TypeNumberSet && at == TypeScalar {
// Scalars are promoted to NumberSets during execution.
} else if ft != at {
return fmt.Errorf("parse: expected %v, got %v", ft, at)
}
if err := a.Check(t); err != nil {
Expand Down
15 changes: 7 additions & 8 deletions docs/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ This section documents Bosun's expression language, which is used to define the
There are three data types in Bosun's expression language:

1. **Scalar**: This is the simplest type, it is a single numeric value with no group associated with it. Keep in mind that an empty group, `{}` is still a group.
2. **NumberSet**: A number set is a group of tagged numeric values with one value per unique grouping.
2. **NumberSet**: A number set is a group of tagged numeric values with one value per unique grouping. As a special case, a **scalar** may be used in place of a **numberSet** with a single member with an empty group.
3. **SeriesSet**: A series is an array of timestamp-value pairs and an associated group.

In the vast majority of your alerts you will getting ***seriesSets*** back from your time series database and ***reducing*** them into ***numberSets***.
Expand Down Expand Up @@ -205,7 +205,7 @@ Diff returns the last point of each series minus the first point.

Returns the first (least recent) data point in each series.

## forecastlr(seriesSet, y_val scalar) numberSet
## forecastlr(seriesSet, y_val numberSet|scalar) numberSet

Returns the number of seconds until a linear regression of each series will reach y_val.

Expand All @@ -229,7 +229,7 @@ Returns the median value of each series, same as calling percentile(series, .5).

Returns the minimum value of each series, same as calling percentile(series, 0).

## percentile(seriesSet, p scalar) numberSet
## percentile(seriesSet, p numberSet|scalar) numberSet

Returns the value from each series at the percentile p. Min and Max can be simulated using `p <= 0` and `p >= 1`, respectively.

Expand Down Expand Up @@ -344,23 +344,22 @@ Returns series smoothed using Holt-Winters double exponential smoothing. Alpha
(scalar) is the data smoothing factor. Beta (scalar) is the trend smoothing
factor.

## dropg(seriesSet, scalar) seriesSet
## dropg(seriesSet, threshold numberSet|scalar) seriesSet

Remove any values greater than number from a series. Will error if this operation results in an empty series.

## dropge(seriesSet, scalar) seriesSet
## dropge(seriesSet, threshold numberSet|scalar) seriesSet

Remove any values greater than or equal to number from a series. Will error if this operation results in an empty series.

## dropl(seriesSet, scalar) seriesSet
## dropl(seriesSet, threshold numberSet|scalar) seriesSet

Remove any values lower than number from a series. Will error if this operation results in an empty series.

## drople(seriesSet, scalar) seriesSet
## drople(seriesSet, threshold numberSet|scalar) seriesSet

Remove any values lower than or equal to number from a series. Will error if this operation results in an empty series.


## dropna(seriesSet) seriesSet

Remove any NaN or Inf values from a series. Will error if this operation results in an empty series.
Expand Down

0 comments on commit fa76640

Please sign in to comment.