Skip to content

Commit

Permalink
Correctly ignore attributes when serializing
Browse files Browse the repository at this point in the history
  • Loading branch information
cschleiden committed Mar 10, 2023
1 parent b4553df commit 3a90503
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
14 changes: 8 additions & 6 deletions backend/redis/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type eventWithoutAttributes struct {
func (e *eventWithoutAttributes) MarshalJSON() ([]byte, error) {
return json.Marshal(&struct {
*history.Event
Attributes interface{} `json:"attributes"`
Attributes interface{} `json:"attr"`
}{
Event: e.Event,
Attributes: nil,
Expand All @@ -38,7 +38,7 @@ func marshalEventWithoutAttributes(event *history.Event) (string, error) {
// ARGV[1..n] - payload values
var addPayloadsCmd = redis.NewScript(`
for i = 1, #ARGV do
redis.call("SET", KEYS[i], ARGV[i], "NX")
redis.pcall("SET", KEYS[i], ARGV[i], "NX")
end
return 0
Expand Down Expand Up @@ -107,6 +107,7 @@ func addEventsToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string
return nil
}

// Adds an event to be delivered in the future. Not cluster-safe.
// KEYS[1] - future event zset key
// KEYS[2] - future event key
// KEYS[3] - future event payload key
Expand All @@ -116,7 +117,7 @@ func addEventsToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string
// ARGV[4] - event payload
var addFutureEventCmd = redis.NewScript(`
redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2])
redis.call("HSET", KEYS[2], "instance", ARGV[2], "event", ARGV[3])
redis.call("HSET", KEYS[2], "instance", ARGV[2], "event", ARGV[3], "payload", KEYS[3])
redis.call("SET", KEYS[3], ARGV[4], "NX")
return 0
`)
Expand All @@ -142,17 +143,18 @@ func addFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.Work
).Err()
}

// Remove a scheduled future event. Not cluster-safe.
// KEYS[1] - future event zset key
// KEYS[2] - future event key
// KEYS[3] - future event payload key
var removeFutureEventCmd = redis.NewScript(`
redis.call("ZREM", KEYS[1], KEYS[2])
redis.call("DEL", KEYS[3])
local k = redis.call("HGET", KEYS[2], "payload")
redis.call("DEL", k)
return redis.call("DEL", KEYS[2])
`)

// removeFutureEvent removes a scheduled future event for the given event. Events are associated via their ScheduleEventID
func removeFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) {
key := futureEventKey(instance.InstanceID, event.ScheduleEventID)
removeFutureEventCmd.Run(ctx, p, []string{futureEventsKey(), key, payloadKey(event.ID)})
removeFutureEventCmd.Run(ctx, p, []string{futureEventsKey(), key})
}
14 changes: 14 additions & 0 deletions backend/redis/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,30 @@ func (rb *redisBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
return nil, err
}

payloadKeys := make([]string, 0, len(msgs))
var events []*history.Event
for _, msg := range msgs {
var event *history.Event
if err := json.Unmarshal([]byte(msg.Values["event"].(string)), &event); err != nil {
return nil, fmt.Errorf("unmarshaling event: %w", err)
}

payloadKeys = append(payloadKeys, payloadKey(event.ID))
events = append(events, event)
}

res, err := rb.rdb.MGet(ctx, payloadKeys...).Result()
if err != nil {
return nil, fmt.Errorf("reading payloads: %w", err)
}

for i, event := range events {
event.Attributes, err = history.DeserializeAttributes(event.Type, []byte(res[i].(string)))
if err != nil {
return nil, fmt.Errorf("deserializing attributes for event %v: %w", event.Type, err)
}
}

return events, nil
}

Expand Down

0 comments on commit 3a90503

Please sign in to comment.