Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: multiframe altda channel #12400

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions op-batcher/batcher/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,22 +163,22 @@ func (s *channel) ID() derive.ChannelID {
// NextTxData should only be called after HasTxData returned true.
func (s *channel) NextTxData() txData {
nf := s.cfg.MaxFramesPerTx()
txdata := txData{frames: make([]frameData, 0, nf), asBlob: s.cfg.UseBlobs}
txdata := txData{frames: make([]frameData, 0, nf), daType: s.cfg.DaType}
for i := 0; i < nf && s.channelBuilder.HasFrame(); i++ {
frame := s.channelBuilder.NextFrame()
txdata.frames = append(txdata.frames, frame)
}

id := txdata.ID().String()
s.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames), "as_blob", txdata.asBlob)
s.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames), "da_type", txdata.daType)
s.pendingTransactions[id] = txdata

return txdata
}

func (s *channel) HasTxData() bool {
if s.IsFull() || // If the channel is full, we should start to submit it
!s.cfg.UseBlobs { // If using calldata, we only send one frame per tx
s.cfg.DaType == DaTypeCalldata { // If using calldata, we only send one frame per tx
return s.channelBuilder.HasFrame()
}
// Collect enough frames if channel is not full yet
Expand Down
11 changes: 7 additions & 4 deletions op-batcher/batcher/channel_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ type ChannelConfig struct {
// BatchType indicates whether the channel uses SingularBatch or SpanBatch.
BatchType uint

// UseBlobs indicates that this channel should be sent as a multi-blob
// transaction with one blob per frame.
UseBlobs bool
// DaType indicates how the frames in this channel should be sent to the L1.
DaType DaType
}

func (cc ChannelConfig) UseBlobs() bool {
return cc.DaType == DaTypeBlob
}

// ChannelConfig returns a copy of the receiver.
Expand Down Expand Up @@ -93,7 +96,7 @@ func (cc *ChannelConfig) ReinitCompressorConfig() {
}

func (cc *ChannelConfig) MaxFramesPerTx() int {
if !cc.UseBlobs {
if cc.DaType == DaTypeCalldata {
return 1
}
return cc.TargetNumFrames
Expand Down
3 changes: 2 additions & 1 deletion op-batcher/batcher/channel_config_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ func TestDynamicEthChannelConfig_ChannelConfig(t *testing.T) {
calldataCfg := ChannelConfig{
MaxFrameSize: 120_000 - 1,
TargetNumFrames: 1,
DaType: DaTypeCalldata,
}
blobCfg := ChannelConfig{
MaxFrameSize: eth.MaxBlobDataSize - 1,
TargetNumFrames: 3, // gets closest to amortized fixed tx costs
UseBlobs: true,
DaType: DaTypeBlob,
}

tests := []struct {
Expand Down
10 changes: 5 additions & 5 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,16 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
newCfg := s.cfgProvider.ChannelConfig()

// No change:
if newCfg.UseBlobs == s.defaultCfg.UseBlobs {
if newCfg.UseBlobs() == s.defaultCfg.UseBlobs() {
s.log.Debug("Recomputing optimal ChannelConfig: no need to switch DA type",
"useBlobs", s.defaultCfg.UseBlobs)
"useBlobs", s.defaultCfg.UseBlobs())
return s.nextTxData(channel)
}

// Change:
s.log.Info("Recomputing optimal ChannelConfig: changing DA type and requeing blocks...",
"useBlobsBefore", s.defaultCfg.UseBlobs,
"useBlobsAfter", newCfg.UseBlobs)
"useBlobsBefore", s.defaultCfg.UseBlobs(),
"useBlobsAfter", newCfg.UseBlobs())
s.Requeue(newCfg)
channel, err = s.getReadyChannel(l1Head)
if err != nil {
Expand Down Expand Up @@ -282,7 +282,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
"compression_algo", cfg.CompressorConfig.CompressionAlgo,
"target_num_frames", cfg.TargetNumFrames,
"max_frame_size", cfg.MaxFrameSize,
"use_blobs", cfg.UseBlobs,
"da_type", cfg.DaType,
)
s.metr.RecordChannelOpened(pc.ID(), s.blocks.Len())

Expand Down
9 changes: 5 additions & 4 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,11 +509,12 @@ func newFakeDynamicEthChannelConfig(lgr log.Logger,
calldataCfg := ChannelConfig{
MaxFrameSize: 120_000 - 1,
TargetNumFrames: 1,
DaType: DaTypeCalldata,
}
blobCfg := ChannelConfig{
MaxFrameSize: eth.MaxBlobDataSize - 1,
TargetNumFrames: 3, // gets closest to amortized fixed tx costs
UseBlobs: true,
DaType: DaTypeBlob,
}
calldataCfg.InitNoneCompressor()
blobCfg.InitNoneCompressor()
Expand Down Expand Up @@ -565,7 +566,7 @@ func TestChannelManager_TxData(t *testing.T) {

cfg.chooseBlobs = tc.chooseBlobsWhenChannelCreated
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)
require.Equal(t, tc.chooseBlobsWhenChannelCreated, m.defaultCfg.UseBlobs)
require.Equal(t, tc.chooseBlobsWhenChannelCreated, m.defaultCfg.DaType == DaTypeBlob)

// Seed channel manager with a block
rng := rand.New(rand.NewSource(99))
Expand Down Expand Up @@ -602,8 +603,8 @@ func TestChannelManager_TxData(t *testing.T) {
}

require.Equal(t, tc.numExpectedAssessments, cfg.assessments)
require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, data.asBlob)
require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, m.defaultCfg.UseBlobs)
require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, data.daType == DaTypeBlob)
require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, m.defaultCfg.DaType == DaTypeBlob)
})
}

Expand Down
4 changes: 2 additions & 2 deletions op-batcher/batcher/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) {
const n = 6
lgr := testlog.Logger(t, log.LevelWarn)
ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{
UseBlobs: false,
DaType: DaTypeCalldata,
TargetNumFrames: n,
CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) {
const n = eth.MaxBlobsPerBlobTx
lgr := testlog.Logger(t, log.LevelWarn)
ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{
UseBlobs: true,
DaType: DaTypeBlob,
TargetNumFrames: n,
CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
Expand Down
29 changes: 12 additions & 17 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,14 +592,6 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh

// publishToAltDAAndL1 posts the txdata to the DA Provider and then sends the commitment to L1.
func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) {
// sanity checks
if nf := len(txdata.frames); nf != 1 {
l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
}
if txdata.asBlob {
l.Log.Crit("Unexpected blob txdata with AltDA enabled")
}

// when posting txdata to an external DA Provider, we use a goroutine to avoid blocking the main loop
// since it may take a while for the request to return.
goroutineSpawned := daGroup.TryGo(func() error {
Expand Down Expand Up @@ -633,29 +625,32 @@ func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[t
// The method will block if the queue's MaxPendingTransactions is exceeded.
func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) error {
var err error

// if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment.
if l.Config.UseAltDA {
var candidate *txmgr.TxCandidate
switch txdata.daType {
case DaTypeAltDA:
if !l.Config.UseAltDA {
l.Log.Crit("Received AltDA type txdata without AltDA being enabled")
}
// if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment.
l.publishToAltDAAndL1(txdata, queue, receiptsCh, daGroup)
// we return nil to allow publishStateToL1 to keep processing the next txdata
return nil
}

var candidate *txmgr.TxCandidate
if txdata.asBlob {
case DaTypeBlob:
if candidate, err = l.blobTxCandidate(txdata); err != nil {
// We could potentially fall through and try a calldata tx instead, but this would
// likely result in the chain spending more in gas fees than it is tuned for, so best
// to just fail. We do not expect this error to trigger unless there is a serious bug
// or configuration issue.
return fmt.Errorf("could not create blob tx candidate: %w", err)
}
} else {
case DaTypeCalldata:
// sanity check
if nf := len(txdata.frames); nf != 1 {
l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
}
candidate = l.calldataTxCandidate(txdata.CallData())
default:
l.Log.Crit("Unknown DA type", "da_type", txdata.daType)
}

l.sendTx(txdata, false, candidate, queue, receiptsCh)
Expand All @@ -673,7 +668,7 @@ func (l *BatchSubmitter) sendTx(txdata txData, isCancel bool, candidate *txmgr.T
candidate.GasLimit = intrinsicGas
}

queue.Send(txRef{id: txdata.ID(), isCancel: isCancel, isBlob: txdata.asBlob}, *candidate, receiptsCh)
queue.Send(txRef{id: txdata.ID(), isCancel: isCancel, isBlob: txdata.daType == DaTypeBlob}, *candidate, receiptsCh)
}

func (l *BatchSubmitter) blobTxCandidate(data txData) (*txmgr.TxCandidate, error) {
Expand Down
40 changes: 25 additions & 15 deletions op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,30 +204,40 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
TargetNumFrames: cfg.TargetNumFrames,
SubSafetyMargin: cfg.SubSafetyMargin,
BatchType: cfg.BatchType,
// DaType: set below
}

switch cfg.DataAvailabilityType {
case flags.BlobsType, flags.AutoType:
if !cfg.TestUseMaxTxSizeForBlobs {
// account for version byte prefix
cc.MaxFrameSize = eth.MaxBlobDataSize - 1
if bs.UseAltDA {
if cfg.DataAvailabilityType == flags.CalldataType {
cc.DaType = DaTypeAltDA
} else {
return fmt.Errorf("altDA is currently only supported with calldata DA Type")
}
cc.UseBlobs = true
case flags.CalldataType: // do nothing
default:
return fmt.Errorf("unknown data availability type: %v", cfg.DataAvailabilityType)
}
if cc.MaxFrameSize > altda.MaxInputSize {
return fmt.Errorf("max frame size %d exceeds altDA max input size %d", cc.MaxFrameSize, altda.MaxInputSize)
}
} else {

if bs.UseAltDA && cc.MaxFrameSize > altda.MaxInputSize {
return fmt.Errorf("max frame size %d exceeds altDA max input size %d", cc.MaxFrameSize, altda.MaxInputSize)
switch cfg.DataAvailabilityType {
case flags.BlobsType, flags.AutoType:
if !cfg.TestUseMaxTxSizeForBlobs {
// account for version byte prefix
cc.MaxFrameSize = eth.MaxBlobDataSize - 1
}
cc.DaType = DaTypeBlob
case flags.CalldataType: // do nothing
cc.DaType = DaTypeCalldata
default:
return fmt.Errorf("unknown data availability type: %v", cfg.DataAvailabilityType)
}
}

cc.InitCompressorConfig(cfg.ApproxComprRatio, cfg.Compressor, cfg.CompressionAlgo)

if cc.UseBlobs && !bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) {
if cc.UseBlobs() && !bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) {
return errors.New("cannot use Blobs before Ecotone")
}
if !cc.UseBlobs && bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) {
if !cc.UseBlobs() && bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) {
bs.Log.Warn("Ecotone upgrade is active, but batcher is not configured to use Blobs!")
}

Expand Down Expand Up @@ -259,7 +269,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
calldataCC := cc
calldataCC.TargetNumFrames = 1
calldataCC.MaxFrameSize = 120_000
calldataCC.UseBlobs = false
calldataCC.DaType = DaTypeCalldata
calldataCC.ReinitCompressorConfig()

bs.ChannelConfig = NewDynamicEthChannelConfig(bs.Log, 10*time.Second, bs.TxManager, cc, calldataCC)
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/batcher/test_batch_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (l *TestBatchSubmitter) JamTxPool(ctx context.Context) error {
var candidate *txmgr.TxCandidate
var err error
cc := l.state.cfgProvider.ChannelConfig()
if cc.UseBlobs {
if cc.UseBlobs() {
candidate = l.calldataTxCandidate([]byte{})
} else if candidate, err = l.blobTxCandidate(emptyTxData); err != nil {
return err
Expand Down
11 changes: 10 additions & 1 deletion op-batcher/batcher/tx_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,23 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth"
)

// DaType determines how txData is submitted to L1.
type DaType int

const (
DaTypeCalldata DaType = iota
DaTypeBlob
DaTypeAltDA
)

// txData represents the data for a single transaction.
//
// Note: The batcher currently sends exactly one frame per transaction. This
// might change in the future to allow for multiple frames from possibly
// different channels.
type txData struct {
frames []frameData
asBlob bool // indicates whether this should be sent as blob
daType DaType
}

func singleFrameTxData(frame frameData) txData {
Expand Down
6 changes: 4 additions & 2 deletions op-batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ var (
EnvVars: prefixEnvVars("MAX_BLOCKS_PER_SPAN_BATCH"),
}
TargetNumFramesFlag = &cli.IntFlag{
Name: "target-num-frames",
Usage: "The target number of frames to create per channel. Controls number of blobs per blob tx, if using Blob DA.",
Name: "target-num-frames",
Usage: "The target number of frames to create per channel. " +
"Controls number of blobs per blob tx, if using Blob DA, " +
"or number of frames per blob, if using altDA.",
Value: 1,
EnvVars: prefixEnvVars("TARGET_NUM_FRAMES"),
}
Expand Down