Skip to content

Commit

Permalink
Merge #3273
Browse files Browse the repository at this point in the history
3273: Checkpointing V6 - support concurrent checkpoint encoding/decoding r=zhangchiqing a=zhangchiqing

Closes #3075 

This PR implements the checkpointing V6. 

Checkpointing Version 6 splits the single checkpoint file into 18 files in total. The main benefits are:
- The benefit of splitting the checkpoint file is to support concurrent writes to multiple sub files which speeds up checkpoint generation, and concurrent reads which speeds up reading checkpoint.
- V6 is benefited from V5, where it builds the sub trees first to be encoded, which built the ground for allowing concurrent processing.

See complete design in this doc:
https://www.notion.so/dapperlabs/Checkpoint-V6-8c7b97937da54c5b9e6c18b5b4598f2e

Comparison between V5 and V6 using latest mainnet19 data snapshot:
- checkpoint writing is reduced from `16mins` to `3mins`, 5.3 times faster
- checkpoint reading is reduced from `12mins` to `2mins`, 6 times faster

This is a feature branch. There are more TODO items to be done in separate PRs. Once this PR is approved, I will close this PR until all TODO items are done, then I will re-open this PR, and merge to master. 

Co-authored-by: Leo Zhang (zhangchiqing) <[email protected]>
  • Loading branch information
bors[bot] and zhangchiqing authored Oct 17, 2022
2 parents 24f691a + 40e1658 commit bba471c
Show file tree
Hide file tree
Showing 18 changed files with 2,546 additions and 507 deletions.
4 changes: 4 additions & 0 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,10 @@ func (exeNode *ExecutionNode) LoadExecutionStateLedger(
return nil, fmt.Errorf("failed to initialize wal: %w", err)
}

if exeNode.exeConf.outputCheckpointV5 {
exeNode.diskWAL.UseCheckpointVersion5()
}

exeNode.ledgerStorage, err = ledger.NewLedger(exeNode.diskWAL, int(exeNode.exeConf.mTrieCacheSize), exeNode.collector, node.Logger.With().Str("subcomponent",
"ledger").Logger(), ledger.DefaultPathFinderVersion)
return exeNode.ledgerStorage, err
Expand Down
2 changes: 2 additions & 0 deletions cmd/execution_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ExecutionConfig struct {
transactionResultsCacheSize uint
checkpointDistance uint
checkpointsToKeep uint
outputCheckpointV5 bool
stateDeltasLimit uint
chunkDataPackCacheSize uint
chunkDataPackRequestsCacheSize uint32
Expand Down Expand Up @@ -63,6 +64,7 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
flags.Uint32Var(&exeConf.mTrieCacheSize, "mtrie-cache-size", 500, "cache size for MTrie")
flags.UintVar(&exeConf.checkpointDistance, "checkpoint-distance", 20, "number of WAL segments between checkpoints")
flags.UintVar(&exeConf.checkpointsToKeep, "checkpoints-to-keep", 5, "number of recent checkpoints to keep (0 to keep all)")
flags.BoolVar(&exeConf.outputCheckpointV5, "outputCheckpointV5", false, "output checkpoint file in v5")
flags.UintVar(&exeConf.stateDeltasLimit, "state-deltas-limit", 100, "maximum number of state deltas in the memory pool")
flags.UintVar(&exeConf.computationConfig.ProgramsCacheSize, "cadence-execution-cache", programs.DefaultProgramsCacheSize,
"cache size for Cadence execution")
Expand Down
49 changes: 47 additions & 2 deletions cmd/util/cmd/execution-state-extract/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
flagChain string
flagNoMigration bool
flagNoReport bool
flagVersion int
)

func getChain(chainName string) (chain flow.Chain, err error) {
Expand Down Expand Up @@ -70,6 +71,8 @@ func init() {

Cmd.Flags().BoolVar(&flagNoReport, "no-report", false,
"don't report the state")

Cmd.Flags().IntVar(&flagVersion, "version", 6, "checkpoint version")
}

func run(*cobra.Command, []string) {
Expand All @@ -86,6 +89,8 @@ func run(*cobra.Command, []string) {
log.Fatal().Err(err).Msg("malformed block hash")
}

log.Info().Msgf("extracting state by block ID: %v", blockID)

db := common.InitStorage(flagDatadir)
defer db.Close()

Expand All @@ -94,7 +99,7 @@ func run(*cobra.Command, []string) {

stateCommitment, err = getStateCommitment(commits, blockID)
if err != nil {
log.Fatal().Err(err).Msg("cannot get state commitment for block")
log.Fatal().Err(err).Msgf("cannot get state commitment for block %v", blockID)
}
}

Expand All @@ -108,6 +113,8 @@ func run(*cobra.Command, []string) {
if err != nil {
log.Fatal().Err(err).Msg("invalid state commitment length")
}

log.Info().Msgf("extracting state by state commitment: %x", stateCommitment)
}

if len(flagBlockHash) == 0 && len(flagStateCommitment) == 0 {
Expand All @@ -130,7 +137,20 @@ func run(*cobra.Command, []string) {
}
}

log.Info().Msgf("Block state commitment: %s", hex.EncodeToString(stateCommitment[:]))
log.Info().Msgf("Extracting state from %s, exporting root checkpoint to %s, version: %v",
flagExecutionStateDir,
path.Join(flagOutputDir, bootstrap.FilenameWALRootCheckpoint),
flagVersion)

log.Info().Msgf("Block state commitment: %s from %v, output dir: %s",
hex.EncodeToString(stateCommitment[:]),
flagExecutionStateDir,
flagOutputDir)

// err := ensureCheckpointFileExist(flagExecutionStateDir)
// if err != nil {
// log.Fatal().Err(err).Msgf("cannot ensure checkpoint file exist in folder %v", flagExecutionStateDir)
// }

chain, err := getChain(flagChain)
if err != nil {
Expand All @@ -143,6 +163,7 @@ func run(*cobra.Command, []string) {
flagOutputDir,
log.Logger,
chain,
flagVersion,
!flagNoMigration,
!flagNoReport,
)
Expand All @@ -151,3 +172,27 @@ func run(*cobra.Command, []string) {
log.Fatal().Err(err).Msgf("error extracting the execution state: %s", err.Error())
}
}

// func ensureCheckpointFileExist(dir string) error {
// checkpoints, err := wal.Checkpoints(dir)
// if err != nil {
// return fmt.Errorf("could not find checkpoint files: %v", err)
// }
//
// if len(checkpoints) != 0 {
// log.Info().Msgf("found checkpoint %v files: %v", len(checkpoints), checkpoints)
// return nil
// }
//
// has, err := wal.HasRootCheckpoint(dir)
// if err != nil {
// return fmt.Errorf("could not check has root checkpoint: %w", err)
// }
//
// if has {
// log.Info().Msg("found root checkpoint file")
// return nil
// }
//
// return fmt.Errorf("no checkpoint file was found, no root checkpoint file was found")
// }
14 changes: 12 additions & 2 deletions cmd/util/cmd/execution-state-extract/execution_state_extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ func extractExecutionState(
outputDir string,
log zerolog.Logger,
chain flow.Chain,
version int, // to be removed after next spork
migrate bool,
report bool,
) error {

log.Info().Msg("init WAL")

diskWal, err := wal.NewDiskWAL(
zerolog.Nop(),
log,
nil,
metrics.NewNoopCollector(),
dir,
Expand All @@ -45,6 +48,8 @@ func extractExecutionState(
return fmt.Errorf("cannot create disk WAL: %w", err)
}

log.Info().Msg("init ledger")

led, err := complete.NewLedger(
diskWal,
complete.DefaultCacheSize,
Expand All @@ -60,11 +65,15 @@ func extractExecutionState(
checkpointsToKeep = 1
)

compactor, err := complete.NewCompactor(led, diskWal, zerolog.Nop(), complete.DefaultCacheSize, checkpointDistance, checkpointsToKeep, atomic.NewBool(false))
log.Info().Msg("init compactor")

compactor, err := complete.NewCompactor(led, diskWal, log, complete.DefaultCacheSize, checkpointDistance, checkpointsToKeep, atomic.NewBool(false))
if err != nil {
return fmt.Errorf("cannot create compactor: %w", err)
}

log.Info().Msgf("waiting for compactor to load checkpoint and WAL")

<-compactor.Ready()

defer func() {
Expand Down Expand Up @@ -116,6 +125,7 @@ func extractExecutionState(
complete.DefaultPathFinderVersion,
outputDir,
bootstrap.FilenameWALRootCheckpoint,
version,
)
if err != nil {
return fmt.Errorf("cannot generate the output checkpoint: %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestExtractExecutionState(t *testing.T) {
outdir,
zerolog.Nop(),
flow.Emulator.Chain(),
6,
false,
false,
)
Expand Down
Loading

0 comments on commit bba471c

Please sign in to comment.