Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-node: handle crit error events, lift event-system out of driver #11932

Merged
merged 1 commit into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@ import (
"sync/atomic"
"time"

"github.com/ethereum-optimism/optimism/op-node/rollup/sequencing"

"github.com/hashicorp/go-multierror"
"github.com/libp2p/go-libp2p/core/peer"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/event"
gethevent "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"

altda "github.com/ethereum-optimism/optimism/op-alt-da"
Expand All @@ -24,6 +22,8 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-node/rollup/sequencing"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
Expand Down Expand Up @@ -52,6 +52,9 @@ type OpNode struct {
l1SafeSub ethereum.Subscription // Subscription to get L1 safe blocks, a.k.a. justified data (polling)
l1FinalizedSub ethereum.Subscription // Subscription to get L1 safe blocks, a.k.a. justified data (polling)

eventSys event.System
eventDrain event.Drainer

l1Source *sources.L1Client // L1 Client to fetch data from
l2Driver *driver.Driver // L2 Engine to Sync
l2Source *sources.EngineClient // L2 Execution Engine RPC bindings
Expand Down Expand Up @@ -126,6 +129,7 @@ func (n *OpNode) init(ctx context.Context, cfg *Config) error {
if err := n.initTracer(ctx, cfg); err != nil {
return fmt.Errorf("failed to init the trace: %w", err)
}
n.initEventSystem()
if err := n.initL1(ctx, cfg); err != nil {
return fmt.Errorf("failed to init L1: %w", err)
}
Expand Down Expand Up @@ -159,6 +163,16 @@ func (n *OpNode) init(ctx context.Context, cfg *Config) error {
return nil
}

func (n *OpNode) initEventSystem() {
// This executor will be configurable in the future, for parallel event processing
executor := event.NewGlobalSynchronous(n.resourcesCtx)
sys := event.NewSystem(n.log, executor)
sys.AddTracer(event.NewMetricsTracer(n.metrics))
sys.Register("node", event.DeriverFunc(n.onEvent), event.DefaultRegisterOpts())
n.eventSys = sys
n.eventDrain = executor
}

func (n *OpNode) initTracer(ctx context.Context, cfg *Config) error {
if cfg.Tracer != nil {
n.tracer = cfg.Tracer
Expand All @@ -185,7 +199,7 @@ func (n *OpNode) initL1(ctx context.Context, cfg *Config) error {
}

// Keep subscribed to the L1 heads, which keeps the L1 maintainer pointing to the best headers to sync
n.l1HeadsSub = event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) {
n.l1HeadsSub = gethevent.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (gethevent.Subscription, error) {
if err != nil {
n.log.Warn("resubscribing after failed L1 subscription", "err", err)
}
Expand Down Expand Up @@ -410,7 +424,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config) error {
} else {
n.safeDB = safedb.Disabled
}
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source,
n.l2Driver = driver.NewDriver(n.eventSys, n.eventDrain, &cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source,
n.supervisor, n.beacon, n, n, n.log, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, altDA)
return nil
}
Expand Down Expand Up @@ -509,6 +523,20 @@ func (n *OpNode) Start(ctx context.Context) error {
return nil
}

// onEvent handles broadcast events.
// The OpNode itself is a deriver to catch system-critical events.
// Other event-handling should be encapsulated into standalone derivers.
func (n *OpNode) onEvent(ev event.Event) bool {
switch x := ev.(type) {
case rollup.CriticalErrorEvent:
n.log.Error("Critical error", "err", x.Err)
n.cancel(fmt.Errorf("critical error: %w", x.Err))
return true
default:
return false
}
}

func (n *OpNode) OnNewL1Head(ctx context.Context, sig eth.L1BlockRef) {
n.tracer.OnNewL1Head(ctx, sig)

Expand Down Expand Up @@ -679,6 +707,10 @@ func (n *OpNode) Stop(ctx context.Context) error {
}
}

if n.eventSys != nil {
n.eventSys.Stop()
}

if n.safeDB != nil {
if err := n.safeDB.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close safe head db: %w", err))
Expand Down
22 changes: 8 additions & 14 deletions op-node/rollup/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,14 @@ type SequencerStateListener interface {
SequencerStopped() error
}

type Drain interface {
Drain() error
}

// NewDriver composes an events handler that tracks L1 state, triggers L2 Derivation, and optionally sequences new L2 blocks.
func NewDriver(
sys event.Registry,
drain Drain,
driverCfg *Config,
cfg *rollup.Config,
l2 L2Chain,
Expand All @@ -170,17 +176,6 @@ func NewDriver(
) *Driver {
driverCtx, driverCancel := context.WithCancel(context.Background())

var executor event.Executor
var drain func() error
// This instantiation will be one of more options: soon there will be a parallel events executor
{
s := event.NewGlobalSynchronous(driverCtx)
executor = s
drain = s.Drain
}
sys := event.NewSystem(log, executor)
sys.AddTracer(event.NewMetricsTracer(metrics))

opts := event.DefaultRegisterOpts()

// If interop is scheduled we start the driver.
Expand Down Expand Up @@ -236,7 +231,7 @@ func NewDriver(
L2: l2,
Log: log,
Ctx: driverCtx,
Drain: drain,
Drain: drain.Drain,
}
sys.Register("sync", syncDeriver, opts)

Expand All @@ -260,12 +255,11 @@ func NewDriver(

driverEmitter := sys.Register("driver", nil, opts)
driver := &Driver{
eventSys: sys,
statusTracker: statusTracker,
SyncDeriver: syncDeriver,
sched: schedDeriv,
emitter: driverEmitter,
drain: drain,
drain: drain.Drain,
stateReq: make(chan chan struct{}),
forceReset: make(chan chan struct{}, 10),
driverConfig: driverCfg,
Expand Down
24 changes: 0 additions & 24 deletions op-node/rollup/driver/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
type SyncStatus = eth.SyncStatus

type Driver struct {
eventSys event.System

statusTracker SyncStatusTracker

*SyncDeriver
Expand Down Expand Up @@ -100,7 +98,6 @@ func (s *Driver) Start() error {
func (s *Driver) Close() error {
s.driverCancel()
s.wg.Wait()
s.eventSys.Stop()
s.sequencer.Close()
return nil
}
Expand Down Expand Up @@ -282,27 +279,6 @@ func (s *Driver) eventLoop() {
}
}

// OnEvent handles broadcasted events.
// The Driver itself is a deriver to catch system-critical events.
// Other event-handling should be encapsulated into standalone derivers.
func (s *Driver) OnEvent(ev event.Event) bool {
switch x := ev.(type) {
case rollup.CriticalErrorEvent:
s.Log.Error("Derivation process critical error", "err", x.Err)
// we need to unblock event-processing to be able to close
go func() {
logger := s.Log
err := s.Close()
if err != nil {
logger.Error("Failed to shutdown driver on critical error", "err", err)
}
}()
return true
default:
return false
}
}

type SyncDeriver struct {
// The derivation pipeline is reset whenever we reorg.
// The derivation pipeline determines the new l2Safe.
Expand Down
11 changes: 2 additions & 9 deletions op-node/rollup/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,5 @@ func (ev ResetEvent) String() string {
return "reset-event"
}

type CriticalErrorEvent struct {
Err error
}

var _ event.Event = CriticalErrorEvent{}

func (ev CriticalErrorEvent) String() string {
return "critical-error"
}
// CriticalErrorEvent is an alias for event.CriticalErrorEvent
type CriticalErrorEvent = event.CriticalErrorEvent
10 changes: 10 additions & 0 deletions op-node/rollup/event/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,13 @@ func (fn DeriverFunc) OnEvent(ev Event) bool {
type NoopEmitter struct{}

func (e NoopEmitter) Emit(ev Event) {}

type CriticalErrorEvent struct {
Err error
}

var _ Event = CriticalErrorEvent{}

func (ev CriticalErrorEvent) String() string {
return "critical-error"
}
19 changes: 18 additions & 1 deletion op-node/rollup/event/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/ethereum/go-ethereum/log"
)

type System interface {
type Registry interface {
// Register registers a named event-emitter, optionally processing events itself:
// deriver may be nil, not all registrants have to process events.
// A non-nil deriver may implement AttachEmitter to automatically attach the Emitter to it,
Expand All @@ -20,6 +20,10 @@ type System interface {
// Unregister removes a named emitter,
// also removing it from the set of events-receiving derivers (if registered with non-nil deriver).
Unregister(name string) (old Emitter)
}

type System interface {
Registry
// AddTracer registers a tracer to capture all event deriver/emitter work. It runs until RemoveTracer is called.
// Duplicate tracers are allowed.
AddTracer(t Tracer)
Expand Down Expand Up @@ -73,6 +77,10 @@ func (r *systemActor) RunEvent(ev AnnotatedEvent) {
if r.ctx.Err() != nil {
return
}
if r.sys.abort.Load() && !Is[CriticalErrorEvent](ev.Event) {
// if aborting, and not the CriticalErrorEvent itself, then do not process the event
return
}

prev := r.currentEvent
start := time.Now()
Expand All @@ -99,6 +107,9 @@ type Sys struct {

tracers []Tracer
tracersLock sync.RWMutex

// if true, no events may be processed, except CriticalError itself
abort atomic.Bool
}

func NewSystem(log log.Logger, ex Executor) *Sys {
Expand Down Expand Up @@ -240,6 +251,12 @@ func (s *Sys) emit(name string, derivContext uint64, ev Event) {
emitContext := s.emitContext.Add(1)
annotated := AnnotatedEvent{Event: ev, EmitContext: emitContext}

// As soon as anything emits a critical event,
// make the system aware, before the executor event schedules it for processing.
if Is[CriticalErrorEvent](ev) {
s.abort.Store(true)
}

emitTime := time.Now()
s.recordEmit(name, annotated, derivContext, emitTime)

Expand Down
44 changes: 44 additions & 0 deletions op-node/rollup/event/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package event

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -104,3 +105,46 @@ func TestSystemBroadcast(t *testing.T) {
require.Equal(t, 3, fooCount)
require.Equal(t, 3, barCount)
}

func TestCriticalError(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
count := 0
seenCrit := 0
deriverFn := DeriverFunc(func(ev Event) bool {
switch ev.(type) {
case CriticalErrorEvent:
seenCrit += 1
default:
count += 1
}
return true
})
exec := NewGlobalSynchronous(context.Background())
sys := NewSystem(logger, exec)
emitterA := sys.Register("a", deriverFn, DefaultRegisterOpts())
emitterB := sys.Register("b", deriverFn, DefaultRegisterOpts())

require.NoError(t, exec.Drain(), "can drain, even if empty")
emitterA.Emit(TestEvent{})
require.Equal(t, 0, count, "no processing yet, queued event")
require.NoError(t, exec.Drain())
require.Equal(t, 2, count, "both A and B processed the event")

emitterA.Emit(TestEvent{})
emitterB.Emit(TestEvent{})
testErr := errors.New("test crit error")
emitterB.Emit(CriticalErrorEvent{Err: testErr})
require.Equal(t, 2, count, "no processing yet, queued events")
require.Equal(t, 0, seenCrit, "critical error events are still scheduled like normal")
require.True(t, sys.abort.Load(), "we are aware of the crit")
require.NoError(t, exec.Drain())
require.Equal(t, 2, count, "still no processing, since we hit a crit error, the events are ignored")
require.Equal(t, 2, seenCrit, "but everyone has seen the crit now")

// We are able to stop the processing now
sys.Stop()

emitterA.Emit(TestEvent{})
require.NoError(t, exec.Drain(), "system is closed, no further event processing")
require.Equal(t, 2, count)
}
Loading