Skip to content

Commit

Permalink
Migrate ingress-handler
Browse files Browse the repository at this point in the history
  • Loading branch information
creydr committed Jul 28, 2023
1 parent 74c1552 commit 9728713
Showing 1 changed file with 6 additions and 41 deletions.
47 changes: 6 additions & 41 deletions pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,9 @@ import (
"knative.dev/eventing/pkg/eventtype"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/tracing"
"knative.dev/eventing/pkg/utils"
)

const (
// noDuration signals that the dispatch step hasn't started
noDuration = -1
defaultMaxIdleConnections = 1000
defaultMaxIdleConnectionsPerHost = 1000
)
Expand Down Expand Up @@ -229,7 +226,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
}

statusCode, dispatchTime := h.receive(ctx, request.Header, event, brokerNamespace, brokerName)
if dispatchTime > noDuration {
if dispatchTime > kncloudevents.NoDuration {
_ = h.Reporter.ReportEventDispatchTime(reporterArgs, statusCode, dispatchTime)
}
_ = h.Reporter.ReportEventCount(reporterArgs, statusCode)
Expand Down Expand Up @@ -259,7 +256,6 @@ func toKReference(broker *eventingv1.Broker) *duckv1.KReference {
}

func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloudevents.Event, brokerNamespace, brokerName string) (int, time.Duration) {

// Setting the extension as a string as the CloudEvents sdk does not support non-string extensions.
event.SetExtension(broker.EventArrivalTime, cloudevents.Timestamp{Time: time.Now()})
if h.Defaulter != nil {
Expand All @@ -269,51 +265,20 @@ func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloud

if ttl, err := broker.GetTTL(event.Context); err != nil || ttl <= 0 {
h.Logger.Debug("dropping event based on TTL status.", zap.Int32("TTL", ttl), zap.String("event.id", event.ID()), zap.Error(err))
return http.StatusBadRequest, noDuration
return http.StatusBadRequest, kncloudevents.NoDuration
}

channelAddress, err := h.getChannelAddress(brokerName, brokerNamespace)
if err != nil {
h.Logger.Warn("Broker not found in the namespace", zap.Error(err))
return http.StatusBadRequest, noDuration
return http.StatusBadRequest, kncloudevents.NoDuration
}

return h.send(ctx, headers, event, *channelAddress)
}

func (h *Handler) send(ctx context.Context, headers http.Header, event *cloudevents.Event, target duckv1.Addressable) (int, time.Duration) {

request, err := kncloudevents.NewCloudEventRequest(ctx, target)
if err != nil {
h.Logger.Error("failed to create event request.", zap.Error(err))
return http.StatusInternalServerError, noDuration
}

message := binding.ToMessage(event)
defer message.Finish(nil)

additionalHeaders := utils.PassThroughHeaders(headers)
err = kncloudevents.WriteRequestWithAdditionalHeaders(ctx, message, request, additionalHeaders)
if err != nil {
h.Logger.Error("failed to write request additionalHeaders.", zap.Error(err))
return http.StatusInternalServerError, noDuration
}

resp, dispatchTime, err := h.sendAndRecordDispatchTime(request)
if resp != nil {
defer resp.Body.Close()
}
dispatchInfo, err := kncloudevents.SendEvent(ctx, *event, *channelAddress, kncloudevents.WithHeader(headers))
if err != nil {
h.Logger.Error("failed to dispatch event", zap.Error(err))
return http.StatusInternalServerError, dispatchTime
return http.StatusInternalServerError, kncloudevents.NoDuration
}

return resp.StatusCode, dispatchTime
}

func (h *Handler) sendAndRecordDispatchTime(request *kncloudevents.CloudEventRequest) (*http.Response, time.Duration, error) {
start := time.Now()
resp, err := request.Send()
dispatchTime := time.Since(start)
return resp, dispatchTime, err
return dispatchInfo.ResponseCode, dispatchInfo.Duration
}

0 comments on commit 9728713

Please sign in to comment.