Skip to content

Commit

Permalink
Update Pinto query validator failed log, minor refactor pinot visibil…
Browse files Browse the repository at this point in the history
…ity store to remove panics (#5664)

* Update Pinto query validator failed log to include more details, refactor to remove panics

* fix error wrapping
  • Loading branch information
neil-xie authored Feb 15, 2024
1 parent 8d41945 commit 9c26651
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 60 deletions.
100 changes: 68 additions & 32 deletions common/persistence/pinot/pinotVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,11 @@ func (v *pinotVisibilityStore) ListOpenWorkflowExecutions(
isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool {
return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime)
}
query := getListWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request, false)
query, err := getListWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request, false)
if err != nil {
v.logger.Error(fmt.Sprintf("failed to build list workflow executions query %v", err))
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand All @@ -302,7 +306,11 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutions(
return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime)
}

query := getListWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request, true)
query, err := getListWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request, true)
if err != nil {
v.logger.Error(fmt.Sprintf("failed to build list workflow executions query %v", err))
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand All @@ -320,7 +328,11 @@ func (v *pinotVisibilityStore) ListOpenWorkflowExecutionsByType(ctx context.Cont
return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime)
}

query := getListWorkflowExecutionsByTypeQuery(v.pinotClient.GetTableName(), request, false)
query, err := getListWorkflowExecutionsByTypeQuery(v.pinotClient.GetTableName(), request, false)
if err != nil {
v.logger.Error(fmt.Sprintf("failed to build list workflow executions by type query %v", err))
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand All @@ -338,7 +350,11 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutionsByType(ctx context.Co
return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime)
}

query := getListWorkflowExecutionsByTypeQuery(v.pinotClient.GetTableName(), request, true)
query, err := getListWorkflowExecutionsByTypeQuery(v.pinotClient.GetTableName(), request, true)
if err != nil {
v.logger.Error(fmt.Sprintf("failed to build list workflow executions by type query %v", err))
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand All @@ -356,7 +372,11 @@ func (v *pinotVisibilityStore) ListOpenWorkflowExecutionsByWorkflowID(ctx contex
return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime)
}

query := getListWorkflowExecutionsByWorkflowIDQuery(v.pinotClient.GetTableName(), request, false)
query, err := getListWorkflowExecutionsByWorkflowIDQuery(v.pinotClient.GetTableName(), request, false)
if err != nil {
v.logger.Error(fmt.Sprintf("failed to build list workflow executions by workflowID query %v", err))
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand All @@ -374,7 +394,11 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutionsByWorkflowID(ctx cont
return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime)
}

query := getListWorkflowExecutionsByWorkflowIDQuery(v.pinotClient.GetTableName(), request, true)
query, err := getListWorkflowExecutionsByWorkflowIDQuery(v.pinotClient.GetTableName(), request, true)
if err != nil {
v.logger.Error(fmt.Sprintf("failed to build list workflow executions by workflowID query %v", err))
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand All @@ -392,7 +416,11 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutionsByStatus(ctx context.
return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime)
}

query := getListWorkflowExecutionsByStatusQuery(v.pinotClient.GetTableName(), request)
query, err := getListWorkflowExecutionsByStatusQuery(v.pinotClient.GetTableName(), request)
if err != nil {
v.logger.Error(fmt.Sprintf("failed to build list workflow executions by status query %v", err))
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand Down Expand Up @@ -439,7 +467,11 @@ func (v *pinotVisibilityStore) GetClosedWorkflowExecution(ctx context.Context, r
func (v *pinotVisibilityStore) ListWorkflowExecutions(ctx context.Context, request *p.ListWorkflowExecutionsByQueryRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
checkPageSize(request)

query := v.getListWorkflowExecutionsByQueryQuery(v.pinotClient.GetTableName(), request)
query, err := v.getListWorkflowExecutionsByQueryQuery(v.pinotClient.GetTableName(), request)
if err != nil {
v.logger.Error(fmt.Sprintf("failed to build list workflow executions query %v", err))
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand All @@ -462,7 +494,11 @@ func (v *pinotVisibilityStore) ListWorkflowExecutions(ctx context.Context, reque
func (v *pinotVisibilityStore) ScanWorkflowExecutions(ctx context.Context, request *p.ListWorkflowExecutionsByQueryRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
checkPageSize(request)

query := v.getListWorkflowExecutionsByQueryQuery(v.pinotClient.GetTableName(), request)
query, err := v.getListWorkflowExecutionsByQueryQuery(v.pinotClient.GetTableName(), request)
if err != nil {
v.logger.Error(fmt.Sprintf("failed to build scan workflow executions query %v", err))
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand Down Expand Up @@ -749,14 +785,14 @@ func (v *pinotVisibilityStore) getCountWorkflowExecutionsQuery(tableName string,
return query.String()
}

func (v *pinotVisibilityStore) getListWorkflowExecutionsByQueryQuery(tableName string, request *p.ListWorkflowExecutionsByQueryRequest) string {
func (v *pinotVisibilityStore) getListWorkflowExecutionsByQueryQuery(tableName string, request *p.ListWorkflowExecutionsByQueryRequest) (string, error) {
if request == nil {
return ""
return "", nil
}

token, err := pnt.GetNextPageToken(request.NextPageToken)
if err != nil {
panic(fmt.Sprintf("deserialize next page token error: %s", err))
return "", fmt.Errorf("next page token: %w", err)
}

query := NewPinotQuery(tableName)
Expand All @@ -770,20 +806,20 @@ func (v *pinotVisibilityStore) getListWorkflowExecutionsByQueryQuery(tableName s
// if customized query is empty, directly return
if requestQuery == "" {
query.addOffsetAndLimits(token.From, request.PageSize)
return query.String()
return query.String(), nil
}

requestQuery = filterPrefix(requestQuery)
if common.IsJustOrderByClause(requestQuery) {
query.concatSorter(requestQuery)
query.addOffsetAndLimits(token.From, request.PageSize)
return query.String()
return query.String(), nil
}

comparExpr, orderBy := parseOrderBy(requestQuery)
comparExpr, err = v.pinotQueryValidator.ValidateQuery(comparExpr)
if err != nil {
v.logger.Error(fmt.Sprintf("pinot query validator error: %s", err))
return "", fmt.Errorf("pinot query validator error: %w, query: %s", err, request.Query)
}

comparExpr = filterPrefix(comparExpr)
Expand All @@ -800,7 +836,7 @@ func (v *pinotVisibilityStore) getListWorkflowExecutionsByQueryQuery(tableName s
}

query.addOffsetAndLimits(token.From, request.PageSize)
return query.String()
return query.String(), nil
}

func filterPrefix(query string) string {
Expand Down Expand Up @@ -907,14 +943,14 @@ func findLastOrderBy(list []string) int {
return 0
}

func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWorkflowExecutionsRequest, isClosed bool) string {
func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWorkflowExecutionsRequest, isClosed bool) (string, error) {
if request == nil {
return ""
return "", nil
}

token, err := pnt.GetNextPageToken(request.NextPageToken)
if err != nil {
panic(fmt.Sprintf("deserialize next page token error: %s", err))
return "", fmt.Errorf("next page token: %w", err)
}

from := token.From
Expand All @@ -939,12 +975,12 @@ func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWor
query.addPinotSorter(StartTime, DescendingOrder)
query.addOffsetAndLimits(from, pageSize)

return query.String()
return query.String(), nil
}

func getListWorkflowExecutionsByTypeQuery(tableName string, request *p.InternalListWorkflowExecutionsByTypeRequest, isClosed bool) string {
func getListWorkflowExecutionsByTypeQuery(tableName string, request *p.InternalListWorkflowExecutionsByTypeRequest, isClosed bool) (string, error) {
if request == nil {
return ""
return "", nil
}

query := NewPinotQuery(tableName)
Expand All @@ -968,19 +1004,19 @@ func getListWorkflowExecutionsByTypeQuery(tableName string, request *p.InternalL

token, err := pnt.GetNextPageToken(request.NextPageToken)
if err != nil {
panic(fmt.Sprintf("deserialize next page token error: %s", err))
return "", fmt.Errorf("next page token: %w", err)
}

from := token.From
pageSize := request.PageSize
query.addOffsetAndLimits(from, pageSize)

return query.String()
return query.String(), nil
}

func getListWorkflowExecutionsByWorkflowIDQuery(tableName string, request *p.InternalListWorkflowExecutionsByWorkflowIDRequest, isClosed bool) string {
func getListWorkflowExecutionsByWorkflowIDQuery(tableName string, request *p.InternalListWorkflowExecutionsByWorkflowIDRequest, isClosed bool) (string, error) {
if request == nil {
return ""
return "", nil
}

query := NewPinotQuery(tableName)
Expand All @@ -1004,19 +1040,19 @@ func getListWorkflowExecutionsByWorkflowIDQuery(tableName string, request *p.Int

token, err := pnt.GetNextPageToken(request.NextPageToken)
if err != nil {
panic(fmt.Sprintf("deserialize next page token error: %s", err))
return "", fmt.Errorf("next page token: %w", err)
}

from := token.From
pageSize := request.PageSize
query.addOffsetAndLimits(from, pageSize)

return query.String()
return query.String(), nil
}

func getListWorkflowExecutionsByStatusQuery(tableName string, request *p.InternalListClosedWorkflowExecutionsByStatusRequest) string {
func getListWorkflowExecutionsByStatusQuery(tableName string, request *p.InternalListClosedWorkflowExecutionsByStatusRequest) (string, error) {
if request == nil {
return ""
return "", nil
}

query := NewPinotQuery(tableName)
Expand Down Expand Up @@ -1047,14 +1083,14 @@ func getListWorkflowExecutionsByStatusQuery(tableName string, request *p.Interna

token, err := pnt.GetNextPageToken(request.NextPageToken)
if err != nil {
panic(fmt.Sprintf("deserialize next page token error: %s", err))
return "", fmt.Errorf("next page token: %w", err)
}

from := token.From
pageSize := request.PageSize
query.addOffsetAndLimits(from, pageSize)

return query.String()
return query.String(), nil
}

func getGetClosedWorkflowExecutionQuery(tableName string, request *p.InternalGetClosedWorkflowExecutionRequest) string {
Expand Down
40 changes: 25 additions & 15 deletions common/persistence/pinot/pinotVisibilityStore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,9 @@ LIMIT 0, 0

for name, test := range tests {
t.Run(name, func(t *testing.T) {
assert.NotPanics(t, func() {
output := visibilityStore.getListWorkflowExecutionsByQueryQuery(testTableName, test.input)
assert.Equal(t, test.expectedOutput, output)
})
output, err := visibilityStore.getListWorkflowExecutionsByQueryQuery(testTableName, test.input)
assert.Equal(t, test.expectedOutput, output)
assert.NoError(t, err)
})
}
}
Expand All @@ -329,9 +328,9 @@ func TestGetListWorkflowExecutionsQuery(t *testing.T) {
NextPageToken: nil,
}

closeResult := getListWorkflowExecutionsQuery(testTableName, request, true)
openResult := getListWorkflowExecutionsQuery(testTableName, request, false)
nilResult := getListWorkflowExecutionsQuery(testTableName, nil, true)
closeResult, err1 := getListWorkflowExecutionsQuery(testTableName, request, true)
openResult, err2 := getListWorkflowExecutionsQuery(testTableName, request, false)
nilResult, err3 := getListWorkflowExecutionsQuery(testTableName, nil, true)
expectCloseResult := fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
Expand All @@ -356,6 +355,9 @@ LIMIT 0, 10
assert.Equal(t, closeResult, expectCloseResult)
assert.Equal(t, openResult, expectOpenResult)
assert.Equal(t, nilResult, expectNilResult)
assert.NoError(t, err1)
assert.NoError(t, err2)
assert.NoError(t, err3)
}

func TestGetListWorkflowExecutionsByTypeQuery(t *testing.T) {
Expand All @@ -371,9 +373,9 @@ func TestGetListWorkflowExecutionsByTypeQuery(t *testing.T) {
WorkflowTypeName: testWorkflowType,
}

closeResult := getListWorkflowExecutionsByTypeQuery(testTableName, request, true)
openResult := getListWorkflowExecutionsByTypeQuery(testTableName, request, false)
nilResult := getListWorkflowExecutionsByTypeQuery(testTableName, nil, true)
closeResult, err1 := getListWorkflowExecutionsByTypeQuery(testTableName, request, true)
openResult, err2 := getListWorkflowExecutionsByTypeQuery(testTableName, request, false)
nilResult, err3 := getListWorkflowExecutionsByTypeQuery(testTableName, nil, true)
expectCloseResult := fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
Expand All @@ -400,6 +402,9 @@ LIMIT 0, 10
assert.Equal(t, closeResult, expectCloseResult)
assert.Equal(t, openResult, expectOpenResult)
assert.Equal(t, nilResult, expectNilResult)
assert.NoError(t, err1)
assert.NoError(t, err2)
assert.NoError(t, err3)
}

func TestGetListWorkflowExecutionsByWorkflowIDQuery(t *testing.T) {
Expand All @@ -415,9 +420,9 @@ func TestGetListWorkflowExecutionsByWorkflowIDQuery(t *testing.T) {
WorkflowID: testWorkflowID,
}

closeResult := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, true)
openResult := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, false)
nilResult := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, nil, true)
closeResult, err1 := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, true)
openResult, err2 := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, false)
nilResult, err3 := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, nil, true)
expectCloseResult := fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
Expand All @@ -444,6 +449,9 @@ LIMIT 0, 10
assert.Equal(t, closeResult, expectCloseResult)
assert.Equal(t, openResult, expectOpenResult)
assert.Equal(t, nilResult, expectNilResult)
assert.NoError(t, err1)
assert.NoError(t, err2)
assert.NoError(t, err3)
}

func TestGetListWorkflowExecutionsByStatusQuery(t *testing.T) {
Expand All @@ -459,8 +467,8 @@ func TestGetListWorkflowExecutionsByStatusQuery(t *testing.T) {
Status: types.WorkflowExecutionCloseStatus(0),
}

closeResult := getListWorkflowExecutionsByStatusQuery(testTableName, request)
nilResult := getListWorkflowExecutionsByStatusQuery(testTableName, nil)
closeResult, err1 := getListWorkflowExecutionsByStatusQuery(testTableName, request)
nilResult, err2 := getListWorkflowExecutionsByStatusQuery(testTableName, nil)
expectCloseResult := fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
Expand All @@ -474,6 +482,8 @@ LIMIT 0, 10

assert.Equal(t, expectCloseResult, closeResult)
assert.Equal(t, expectNilResult, nilResult)
assert.NoError(t, err1)
assert.NoError(t, err2)
}

func TestGetGetClosedWorkflowExecutionQuery(t *testing.T) {
Expand Down
Loading

0 comments on commit 9c26651

Please sign in to comment.