Skip to content

Commit

Permalink
fix: return correct count of ErrNotExecuted (#22273)
Browse files Browse the repository at this point in the history
executeQuery() iterates over statements until each is
processed or if an error is encountered that causes
the loop to exit pre-maturely. It should return
ErrNotExecuted for each remaining statement in the
query

closes #19136
  • Loading branch information
davidby-influx authored Aug 24, 2021
1 parent 27e5f97 commit 0090c5b
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
- [#22090](https:/influxdata/influxdb/pull/22090): fix: systemd service -- handle https, 40x, and block indefinitely
- [#22195](https:/influxdata/influxdb/pull/22195): fix: avoid compaction queue stats flutter
- [#22283](https:/influxdata/influxdb/pull/22283): fix: require database authorization to see continuous queries
- [#22273](https:/influxdata/influxdb/pull/22273): fix: return correct count of ErrNotExecuted

v1.9.2 [unreleased]
- [#21631](https:/influxdata/influxdb/pull/21631): fix: group by returns multiple results per group in some circumstances
Expand Down
1 change: 0 additions & 1 deletion query/execution_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func (ctx *ExecutionContext) Value(key interface{}) interface{} {
// send sends a Result to the Results channel and will exit if the query has
// been aborted.
func (ctx *ExecutionContext) send(result *Result) error {
result.StatementID = ctx.statementID
select {
case <-ctx.AbortCh:
return ErrQueryAborted
Expand Down
4 changes: 2 additions & 2 deletions query/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ LOOP:
// Normalize each statement if possible.
if normalizer, ok := e.StatementExecutor.(StatementNormalizer); ok {
if err := normalizer.NormalizeStatement(stmt, defaultDB, opt.RetentionPolicy); err != nil {
if err := ctx.send(&Result{Err: err}); err == ErrQueryAborted {
if err := ctx.send(&Result{Err: err, StatementID: i}); err == ErrQueryAborted {
return
}
break
Expand Down Expand Up @@ -380,7 +380,7 @@ LOOP:
}

// Send error results for any statements which were not executed.
for ; i < len(query.Statements)-1; i++ {
for i++; i < len(query.Statements); i++ {
if err := ctx.send(&Result{
StatementID: i,
Err: ErrNotExecuted,
Expand Down
67 changes: 67 additions & 0 deletions query/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ func (e *StatementExecutor) ExecuteStatement(ctx *query.ExecutionContext, stmt i
return e.ExecuteStatementFn(stmt, ctx)
}

type StatementNormalizerExecutor struct {
StatementExecutor
NormalizeStatementFn func(stmt influxql.Statement, database, retentionPolicy string) error
}

func (e *StatementNormalizerExecutor) NormalizeStatement(stmt influxql.Statement, database, retentionPolicy string) error {
return e.NormalizeStatementFn(stmt, database, retentionPolicy)
}

func NewQueryExecutor() *query.Executor {
return query.NewExecutor()
}
Expand Down Expand Up @@ -478,6 +487,64 @@ func TestQueryExecutor_Panic(t *testing.T) {
}
}

const goodStatement = `SELECT count(value) FROM cpu`

func TestQueryExecutor_NotExecuted(t *testing.T) {
var executorFailIndex int
var executorCallCount int
queryStatements := []string{goodStatement, goodStatement, goodStatement, goodStatement, goodStatement}
queryStr := strings.Join(queryStatements, ";")
var closing chan struct{}

q, err := influxql.ParseQuery(queryStr)
if err != nil {
t.Fatalf("parsing %s: %v", queryStr, err)
}

e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
defer func() { executorCallCount++ }()
if executorFailIndex == executorCallCount {
closing <- struct{}{}
close(closing)
select {
case <-ctx.Done():
return nil
}
} else {
return ctx.Send(&query.Result{Err: nil})
}
},
}
testFn := func(testName string, i int) {
results := e.ExecuteQuery(q, query.ExecutionOptions{}, closing)
checkNotExecutedResults(t, results, testName, i, len(q.Statements))
}
for i := 0; i < len(q.Statements); i++ {
closing = make(chan struct{})
executorFailIndex = i
executorCallCount = 0
testFn("executor", i)
}
}

func checkNotExecutedResults(t *testing.T, results <-chan *query.Result, testName string, failIndex int, lenQuery int) {
notExecutedIndex := failIndex + 1
for result := range results {
if result.Err == query.ErrNotExecuted {
if result.StatementID != notExecutedIndex {
t.Fatalf("StatementID for ErrNotExecuted in wrong order - expected: %d, got: %d", notExecutedIndex, result.StatementID)
} else {
notExecutedIndex++
}
}
}
if notExecutedIndex != lenQuery {
t.Fatalf("wrong number of results from %s with fail index of %d - got: %d, expected: %d", testName, failIndex, notExecutedIndex - (1 + failIndex), lenQuery-(1+failIndex))
}
}

func TestQueryExecutor_InvalidSource(t *testing.T) {
e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
Expand Down

0 comments on commit 0090c5b

Please sign in to comment.