From c2adc6b96990f815fd6bc1089237189b3d83e0ee Mon Sep 17 00:00:00 2001 From: Arne Luenser Date: Mon, 19 Dec 2022 18:31:00 +0100 Subject: [PATCH 1/3] fix: add failing webhook async dispatch test --- selfservice/hook/web_hook.go | 20 +++- selfservice/hook/web_hook_integration_test.go | 97 ++++++++++++++++++- 2 files changed, 111 insertions(+), 6 deletions(-) diff --git a/selfservice/hook/web_hook.go b/selfservice/hook/web_hook.go index 98bc24fc27c3..b798f9e39914 100644 --- a/selfservice/hook/web_hook.go +++ b/selfservice/hook/web_hook.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "net/http" + "time" "github.com/pkg/errors" "github.com/tidwall/gjson" @@ -282,6 +283,8 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error { } errChan := make(chan error, 1) + e.deps.Logger().WithRequest(req.Request).Info("Dispatching webhook") + t0 := time.Now() go func() { defer close(errChan) @@ -307,9 +310,16 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error { }() if gjson.GetBytes(e.conf, "response.ignore").Bool() { + traceID, spanID := span.SpanContext().TraceID(), span.SpanContext().SpanID() go func() { - err := <-errChan - e.deps.Logger().WithError(err).Warning("A web hook request failed but the error was ignored because the configuration indicated that the upstream response should be ignored.") + if err := <-errChan; err != nil { + e.deps.Logger().WithField("otel", map[string]string{ + "trace_id": traceID.String(), + "span_id": spanID.String(), + }).WithError(err).Warning("Webhook request failed but the error was ignored because the configuration indicated that the upstream response should be ignored.") + } else { + e.deps.Logger().WithField("duration", time.Since(t0)).Info("Webhook request succeeded") + } }() return nil } @@ -323,7 +333,7 @@ func parseWebhookResponse(resp *http.Response) (err error) { } var hookResponse rawHookResponse if err := json.NewDecoder(resp.Body).Decode(&hookResponse); err != nil { - return errors.Wrap(err, "hook response could not be unmarshalled properly from JSON") + return errors.Wrap(err, "webhook response could not be unmarshalled properly from JSON") } var validationErrs []*schema.ValidationError @@ -343,11 +353,11 @@ func parseWebhookResponse(resp *http.Response) (err error) { Context: detail.Context, }) } - validationErrs = append(validationErrs, schema.NewHookValidationError(msg.InstancePtr, "a web-hook target returned an error", messages)) + validationErrs = append(validationErrs, schema.NewHookValidationError(msg.InstancePtr, "a webhook target returned an error", messages)) } if len(validationErrs) == 0 { - return errors.New("error while parsing hook response: got no validation errors") + return errors.New("error while parsing webhook response: got no validation errors") } return schema.NewValidationListError(validationErrs) diff --git a/selfservice/hook/web_hook_integration_test.go b/selfservice/hook/web_hook_integration_test.go index 1c7ddb21df00..bef9bef44680 100644 --- a/selfservice/hook/web_hook_integration_test.go +++ b/selfservice/hook/web_hook_integration_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/require" "github.com/ory/kratos/schema" @@ -365,7 +366,7 @@ func TestWebHooks(t *testing.T) { }`, ) - webhookError := schema.NewValidationListError([]*schema.ValidationError{schema.NewHookValidationError("#/traits/username", "a web-hook target returned an error", text.Messages{{ID: 1234, Type: "info", Text: "error message"}})}) + webhookError := schema.NewValidationListError([]*schema.ValidationError{schema.NewHookValidationError("#/traits/username", "a webhook target returned an error", text.Messages{{ID: 1234, Type: "info", Text: "error message"}})}) for _, tc := range []struct { uc string callWebHook func(wh *hook.WebHook, req *http.Request, f flow.Flow, s *session.Session) error @@ -839,3 +840,97 @@ func TestDisallowPrivateIPRanges(t *testing.T) { require.Contains(t, err.Error(), "192.168.178.0 is not a public IP address") }) } + +func TestAsyncWebhook(t *testing.T) { + conf, reg := internal.NewFastRegistryWithMocks(t) + _ = conf + // conf.MustSet(ctx, config.ViperKeyClientHTTPNoPrivateIPRanges, true) + // conf.MustSet(ctx, config.ViperKeyClientHTTPPrivateIPExceptionURLs, []string{webhookReceiver.URL}) + logger := logrusx.New("kratos", "test") + logHook := new(test.Hook) + logger.Logger.Hooks.Add(logHook) + whDeps := struct { + x.SimpleLoggerWithClient + *jsonnetsecure.TestProvider + }{ + x.SimpleLoggerWithClient{L: logger, C: reg.HTTPClient(context.Background()), T: otelx.NewNoop(logger, &otelx.Config{ServiceName: "kratos"})}, + jsonnetsecure.NewTestProvider(t), + } + + req := &http.Request{ + Header: map[string][]string{"Some-Header": {"Some-Value"}}, + Host: "www.ory.sh", + TLS: new(tls.ConnectionState), + URL: &url.URL{Path: "/some_end_point"}, + Method: http.MethodPost, + } + incomingCtx, incomingCancel := context.WithCancel(context.Background()) + if deadline, ok := t.Deadline(); ok { + var cleanup context.CancelFunc + incomingCtx, cleanup = context.WithDeadline(incomingCtx, deadline.Add(-time.Second)) + defer cleanup() + } + req = req.WithContext(incomingCtx) + s := &session.Session{ID: x.NewUUID(), Identity: &identity.Identity{ID: x.NewUUID()}} + f := &login.Flow{ID: x.NewUUID()} + + handlerEntered, blockHandlerOnExit := make(chan struct{}), make(chan struct{}) + webhookReceiver := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + close(handlerEntered) + <-blockHandlerOnExit + w.Write([]byte("ok")) + })) + t.Cleanup(webhookReceiver.Close) + // defer webhookReceiver.Close() + + wh := hook.NewWebHook(&whDeps, json.RawMessage(fmt.Sprintf(` + { + "url": %q, + "method": "GET", + "body": "file://stub/test_body.jsonnet", + "response": { + "ignore": true + } + }`, webhookReceiver.URL))) + err := wh.ExecuteLoginPostHook(nil, req, node.DefaultGroup, f, s) + require.NoError(t, err) // execution returns immediately for async webhook + select { + case <-time.After(200 * time.Millisecond): + t.Fatal("timed out waiting for webhook request to reach test handler") + case <-handlerEntered: + // ok + } + // at this point, a goroutine is in the middle of the call to our test handler and waiting for a response + incomingCancel() // simulate the incoming Kratos request having finished + testFor := time.After(200 * time.Millisecond) + for done := false; !done; { + if last := logHook.LastEntry(); last != nil { + msg, err := last.String() + require.NoError(t, err) + assert.Contains(t, msg, "Dispatching webhook") + } + + select { + case <-testFor: + done = true + case <-time.After(50 * time.Millisecond): + // continue loop + } + } + logHook.Reset() + close(blockHandlerOnExit) + testFor = time.After(200 * time.Millisecond) + for done := false; !done; { + if last := logHook.LastEntry(); last != nil { + msg, err := last.String() + require.NoError(t, err) + assert.Contains(t, msg, "Webhook request succeeded") + } + select { + case <-testFor: + done = true + case <-time.After(50 * time.Millisecond): + // continue loop + } + } +} From 134295ea93d043f073c77994a82f06e9cba01696 Mon Sep 17 00:00:00 2001 From: Arne Luenser Date: Mon, 19 Dec 2022 22:07:43 +0100 Subject: [PATCH 2/3] fix: spurious cancelation of async webhooks, better tracing Previously, async webhooks (response.ignore=true) would be canceled early once the incoming Kratos request was served and it's associated context released. We now dissociate the cancellation of async hooks from the normal request processing flow. --- selfservice/hook/web_hook.go | 57 +++++++++++-------- selfservice/hook/web_hook_integration_test.go | 20 +++---- 2 files changed, 43 insertions(+), 34 deletions(-) diff --git a/selfservice/hook/web_hook.go b/selfservice/hook/web_hook.go index b798f9e39914..1327ea747de6 100644 --- a/selfservice/hook/web_hook.go +++ b/selfservice/hook/web_hook.go @@ -12,7 +12,9 @@ import ( "github.com/pkg/errors" "github.com/tidwall/gjson" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + semconv "go.opentelemetry.io/otel/semconv/v1.11.0" "go.opentelemetry.io/otel/trace" "github.com/ory/kratos/ui/node" @@ -30,7 +32,6 @@ import ( "github.com/ory/kratos/session" "github.com/ory/kratos/text" "github.com/ory/kratos/x" - "github.com/ory/x/otelx" ) var ( @@ -254,22 +255,6 @@ func (e *WebHook) ExecuteSettingsPrePersistHook(_ http.ResponseWriter, req *http } func (e *WebHook) execute(ctx context.Context, data *templateContext) error { - span := trace.SpanFromContext(ctx) - attrs := map[string]string{ - "webhook.http.method": data.RequestMethod, - "webhook.http.url": data.RequestURL, - "webhook.http.headers": fmt.Sprintf("%#v", data.RequestHeaders), - } - - if data.Identity != nil { - attrs["webhook.identity.id"] = data.Identity.ID.String() - } else { - attrs["webhook.identity.id"] = "" - } - - span.SetAttributes(otelx.StringAttrs(attrs)...) - defer span.End() - builder, err := request.NewBuilder(e.conf, e.deps) if err != nil { return err @@ -282,34 +267,60 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error { return err } - errChan := make(chan error, 1) + attrs := semconv.HTTPClientAttributesFromHTTPRequest(req.Request) + if data.Identity != nil { + attrs = append(attrs, + attribute.String("webhook.identity.id", data.Identity.ID.String()), + attribute.String("webhook.identity.nid", data.Identity.NID.String()), + ) + } + var ( + httpClient = e.deps.HTTPClient(ctx) + async = gjson.GetBytes(e.conf, "response.ignore").Bool() + parseResponse = gjson.GetBytes(e.conf, "can_interrupt").Bool() + tracer = trace.SpanFromContext(ctx).TracerProvider().Tracer("kratos-webhooks") + cancel context.CancelFunc = func() {} + spanOpts = []trace.SpanStartOption{trace.WithAttributes(attrs...)} + errChan = make(chan error, 1) + ) + if async { + // dissociate the context from the one passed into this function + ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute) + spanOpts = append(spanOpts, trace.WithNewRoot()) + } + ctx, span := tracer.Start(ctx, "Webhook", spanOpts...) e.deps.Logger().WithRequest(req.Request).Info("Dispatching webhook") t0 := time.Now() go func() { defer close(errChan) + defer cancel() + defer span.End() - resp, err := e.deps.HTTPClient(ctx).Do(req.WithContext(ctx)) + resp, err := httpClient.Do(req.WithContext(ctx)) if err != nil { + span.SetStatus(codes.Error, err.Error()) errChan <- errors.WithStack(err) return } defer resp.Body.Close() + span.SetAttributes(semconv.HTTPAttributesFromHTTPStatusCode(resp.StatusCode)...) if resp.StatusCode >= http.StatusBadRequest { - if gjson.GetBytes(e.conf, "can_interrupt").Bool() { + span.SetStatus(codes.Error, "HTTP status code >= 400") + if parseResponse { if err := parseWebhookResponse(resp); err != nil { + span.SetStatus(codes.Error, err.Error()) errChan <- err } } - errChan <- fmt.Errorf("web hook failed with status code %v", resp.StatusCode) - span.SetStatus(codes.Error, fmt.Sprintf("web hook failed with status code %v", resp.StatusCode)) + errChan <- fmt.Errorf("webhook failed with status code %v", resp.StatusCode) return } errChan <- nil }() - if gjson.GetBytes(e.conf, "response.ignore").Bool() { + if async { traceID, spanID := span.SpanContext().TraceID(), span.SpanContext().SpanID() go func() { if err := <-errChan; err != nil { diff --git a/selfservice/hook/web_hook_integration_test.go b/selfservice/hook/web_hook_integration_test.go index bef9bef44680..7621c7609ce6 100644 --- a/selfservice/hook/web_hook_integration_test.go +++ b/selfservice/hook/web_hook_integration_test.go @@ -842,10 +842,7 @@ func TestDisallowPrivateIPRanges(t *testing.T) { } func TestAsyncWebhook(t *testing.T) { - conf, reg := internal.NewFastRegistryWithMocks(t) - _ = conf - // conf.MustSet(ctx, config.ViperKeyClientHTTPNoPrivateIPRanges, true) - // conf.MustSet(ctx, config.ViperKeyClientHTTPPrivateIPExceptionURLs, []string{webhookReceiver.URL}) + _, reg := internal.NewFastRegistryWithMocks(t) logger := logrusx.New("kratos", "test") logHook := new(test.Hook) logger.Logger.Hooks.Add(logHook) @@ -866,6 +863,7 @@ func TestAsyncWebhook(t *testing.T) { } incomingCtx, incomingCancel := context.WithCancel(context.Background()) if deadline, ok := t.Deadline(); ok { + // cancel this context one second before test timeout for clean shutdown var cleanup context.CancelFunc incomingCtx, cleanup = context.WithDeadline(incomingCtx, deadline.Add(-time.Second)) defer cleanup() @@ -881,7 +879,6 @@ func TestAsyncWebhook(t *testing.T) { w.Write([]byte("ok")) })) t.Cleanup(webhookReceiver.Close) - // defer webhookReceiver.Close() wh := hook.NewWebHook(&whDeps, json.RawMessage(fmt.Sprintf(` { @@ -902,7 +899,7 @@ func TestAsyncWebhook(t *testing.T) { } // at this point, a goroutine is in the middle of the call to our test handler and waiting for a response incomingCancel() // simulate the incoming Kratos request having finished - testFor := time.After(200 * time.Millisecond) + timeout := time.After(200 * time.Millisecond) for done := false; !done; { if last := logHook.LastEntry(); last != nil { msg, err := last.String() @@ -911,7 +908,7 @@ func TestAsyncWebhook(t *testing.T) { } select { - case <-testFor: + case <-timeout: done = true case <-time.After(50 * time.Millisecond): // continue loop @@ -919,16 +916,17 @@ func TestAsyncWebhook(t *testing.T) { } logHook.Reset() close(blockHandlerOnExit) - testFor = time.After(200 * time.Millisecond) - for done := false; !done; { + timeout = time.After(200 * time.Millisecond) + for { if last := logHook.LastEntry(); last != nil { msg, err := last.String() require.NoError(t, err) assert.Contains(t, msg, "Webhook request succeeded") + break } select { - case <-testFor: - done = true + case <-timeout: + t.Fatal("timed out waiting for successful webhook completion") case <-time.After(50 * time.Millisecond): // continue loop } From 1da1cd1c9006d38f67c156e1b50461e7c094c754 Mon Sep 17 00:00:00 2001 From: ory-bot <60093411+ory-bot@users.noreply.github.com> Date: Tue, 20 Dec 2022 07:57:48 +0100 Subject: [PATCH 3/3] chore: code review --- selfservice/hook/web_hook.go | 52 +++++++++++-------- selfservice/hook/web_hook_integration_test.go | 33 ++++-------- 2 files changed, 41 insertions(+), 44 deletions(-) diff --git a/selfservice/hook/web_hook.go b/selfservice/hook/web_hook.go index 1327ea747de6..d3a4107e05fd 100644 --- a/selfservice/hook/web_hook.go +++ b/selfservice/hook/web_hook.go @@ -274,29 +274,36 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error { attribute.String("webhook.identity.nid", data.Identity.NID.String()), ) } + var ( - httpClient = e.deps.HTTPClient(ctx) - async = gjson.GetBytes(e.conf, "response.ignore").Bool() - parseResponse = gjson.GetBytes(e.conf, "can_interrupt").Bool() - tracer = trace.SpanFromContext(ctx).TracerProvider().Tracer("kratos-webhooks") - cancel context.CancelFunc = func() {} - spanOpts = []trace.SpanStartOption{trace.WithAttributes(attrs...)} - errChan = make(chan error, 1) + httpClient = e.deps.HTTPClient(ctx) + ignoreResponse = gjson.GetBytes(e.conf, "response.ignore").Bool() + canInterrupt = gjson.GetBytes(e.conf, "can_interrupt").Bool() + tracer = trace.SpanFromContext(ctx).TracerProvider().Tracer("kratos-webhooks") + spanOpts = []trace.SpanStartOption{trace.WithAttributes(attrs...)} + errChan = make(chan error, 1) ) - if async { - // dissociate the context from the one passed into this function - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute) - spanOpts = append(spanOpts, trace.WithNewRoot()) - } + ctx, span := tracer.Start(ctx, "Webhook", spanOpts...) e.deps.Logger().WithRequest(req.Request).Info("Dispatching webhook") - t0 := time.Now() + + req = req.WithContext(ctx) + if ignoreResponse { + // This is one of the few places where spawning a context.Background() is ok. We need to do this + // because the function runs asynchronously and we don't want to cancel the request if the + // incoming request context is cancelled. + // + // The webhook will still cancel after 30 seconds as that is the configured timeout for the HTTP client. + req = req.WithContext(context.Background()) + // spanOpts = append(spanOpts, trace.WithNewRoot()) + } + + startTime := time.Now() go func() { defer close(errChan) - defer cancel() defer span.End() - resp, err := httpClient.Do(req.WithContext(ctx)) + resp, err := httpClient.Do(req) if err != nil { span.SetStatus(codes.Error, err.Error()) errChan <- errors.WithStack(err) @@ -307,7 +314,7 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error { if resp.StatusCode >= http.StatusBadRequest { span.SetStatus(codes.Error, "HTTP status code >= 400") - if parseResponse { + if canInterrupt { if err := parseWebhookResponse(resp); err != nil { span.SetStatus(codes.Error, err.Error()) errChan <- err @@ -320,16 +327,17 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error { errChan <- nil }() - if async { + if ignoreResponse { traceID, spanID := span.SpanContext().TraceID(), span.SpanContext().SpanID() + logger := e.deps.Logger().WithField("otel", map[string]string{ + "trace_id": traceID.String(), + "span_id": spanID.String(), + }) go func() { if err := <-errChan; err != nil { - e.deps.Logger().WithField("otel", map[string]string{ - "trace_id": traceID.String(), - "span_id": spanID.String(), - }).WithError(err).Warning("Webhook request failed but the error was ignored because the configuration indicated that the upstream response should be ignored.") + logger.WithField("duration", time.Since(startTime)).WithError(err).Warning("Webhook request failed but the error was ignored because the configuration indicated that the upstream response should be ignored.") } else { - e.deps.Logger().WithField("duration", time.Since(t0)).Info("Webhook request succeeded") + logger.WithField("duration", time.Since(startTime)).Info("Webhook request succeeded") } }() return nil diff --git a/selfservice/hook/web_hook_integration_test.go b/selfservice/hook/web_hook_integration_test.go index 7621c7609ce6..87bec5b24003 100644 --- a/selfservice/hook/web_hook_integration_test.go +++ b/selfservice/hook/web_hook_integration_test.go @@ -861,6 +861,7 @@ func TestAsyncWebhook(t *testing.T) { URL: &url.URL{Path: "/some_end_point"}, Method: http.MethodPost, } + incomingCtx, incomingCancel := context.WithCancel(context.Background()) if deadline, ok := t.Deadline(); ok { // cancel this context one second before test timeout for clean shutdown @@ -868,6 +869,7 @@ func TestAsyncWebhook(t *testing.T) { incomingCtx, cleanup = context.WithDeadline(incomingCtx, deadline.Add(-time.Second)) defer cleanup() } + req = req.WithContext(incomingCtx) s := &session.Session{ID: x.NewUUID(), Identity: &identity.Identity{ID: x.NewUUID()}} f := &login.Flow{ID: x.NewUUID()} @@ -899,31 +901,17 @@ func TestAsyncWebhook(t *testing.T) { } // at this point, a goroutine is in the middle of the call to our test handler and waiting for a response incomingCancel() // simulate the incoming Kratos request having finished + close(blockHandlerOnExit) timeout := time.After(200 * time.Millisecond) - for done := false; !done; { - if last := logHook.LastEntry(); last != nil { - msg, err := last.String() - require.NoError(t, err) - assert.Contains(t, msg, "Dispatching webhook") + var found bool + for !found { + for _, entry := range logHook.AllEntries() { + if entry.Message == "Webhook request succeeded" { + found = true + break + } } - select { - case <-timeout: - done = true - case <-time.After(50 * time.Millisecond): - // continue loop - } - } - logHook.Reset() - close(blockHandlerOnExit) - timeout = time.After(200 * time.Millisecond) - for { - if last := logHook.LastEntry(); last != nil { - msg, err := last.String() - require.NoError(t, err) - assert.Contains(t, msg, "Webhook request succeeded") - break - } select { case <-timeout: t.Fatal("timed out waiting for successful webhook completion") @@ -931,4 +919,5 @@ func TestAsyncWebhook(t *testing.T) { // continue loop } } + require.True(t, found) }