Skip to content

Commit

Permalink
services: add new service for fetching blocks from NeoFS
Browse files Browse the repository at this point in the history
Close #3496

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Aug 23, 2024
1 parent dc6c195 commit 70cfc5a
Show file tree
Hide file tree
Showing 11 changed files with 725 additions and 40 deletions.
88 changes: 88 additions & 0 deletions cli/server/dump_bin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package server

import (
"fmt"
"os"
"path/filepath"

"github.com/nspcc-dev/neo-go/cli/cmdargs"
"github.com/nspcc-dev/neo-go/cli/options"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/urfave/cli/v2"
)

func dumpBin(ctx *cli.Context) error {
if err := cmdargs.EnsureNone(ctx); err != nil {
return err

Check warning on line 17 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L15-L17

Added lines #L15 - L17 were not covered by tests
}
cfg, err := options.GetConfigFromContext(ctx)
if err != nil {
return cli.Exit(err, 1)

Check warning on line 21 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L19-L21

Added lines #L19 - L21 were not covered by tests
}
log, _, logCloser, err := options.HandleLoggingParams(ctx.Bool("debug"), cfg.ApplicationConfiguration)
if err != nil {
return cli.Exit(err, 1)

Check warning on line 25 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L23-L25

Added lines #L23 - L25 were not covered by tests
}
if logCloser != nil {
defer func() { _ = logCloser() }()

Check warning on line 28 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L27-L28

Added lines #L27 - L28 were not covered by tests
}
count := uint32(ctx.Uint("count"))
start := uint32(ctx.Uint("start"))

Check warning on line 31 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L30-L31

Added lines #L30 - L31 were not covered by tests

chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
if err != nil {
return err

Check warning on line 35 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L33-L35

Added lines #L33 - L35 were not covered by tests
}
defer func() {
pprof.ShutDown()
prometheus.ShutDown()
chain.Close()
}()

Check warning on line 41 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L37-L41

Added lines #L37 - L41 were not covered by tests

blocksCount := chain.BlockHeight() + 1
if start+count > blocksCount {
return cli.Exit(fmt.Errorf("chain is not that high (%d) to dump %d blocks starting from %d", blocksCount-1, count, start), 1)

Check warning on line 45 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L43-L45

Added lines #L43 - L45 were not covered by tests
}
if count == 0 {
count = blocksCount - start

Check warning on line 48 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L47-L48

Added lines #L47 - L48 were not covered by tests
}

out := ctx.String("out")
if out == "" {
return cli.Exit("output directory is not specified", 1)

Check warning on line 53 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L51-L53

Added lines #L51 - L53 were not covered by tests
}
if _, err = os.Stat(out); os.IsNotExist(err) {
if err = os.MkdirAll(out, os.ModePerm); err != nil {
return cli.Exit(fmt.Sprintf("failed to create directory %s: %v", out, err), 1)

Check warning on line 57 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L55-L57

Added lines #L55 - L57 were not covered by tests
}
}

for i := start; i < start+count; i++ {
bh := chain.GetHeaderHash(i)
blk, err := chain.GetBlock(bh)
if err != nil {
return cli.Exit(fmt.Sprintf("failed to get block %d: %s", i, err), 1)

Check warning on line 65 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L61-L65

Added lines #L61 - L65 were not covered by tests
}
filePath := filepath.Join(out, fmt.Sprintf("block-%d.bin", i))
if err = saveBlockToFile(blk, filePath); err != nil {
return cli.Exit(fmt.Sprintf("failed to save block %d to file %s: %s", i, filePath, err), 1)

Check warning on line 69 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L67-L69

Added lines #L67 - L69 were not covered by tests
}
}
return nil

Check warning on line 72 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L72

Added line #L72 was not covered by tests
}

func saveBlockToFile(blk *block.Block, filePath string) error {
file, err := os.Create(filePath)
if err != nil {
return err

Check warning on line 78 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L75-L78

Added lines #L75 - L78 were not covered by tests
}
defer file.Close()

Check warning on line 80 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L80

Added line #L80 was not covered by tests

writer := io.NewBinWriterFromIO(file)
blk.EncodeBinary(writer)
if writer.Err != nil {
return writer.Err

Check warning on line 85 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L82-L85

Added lines #L82 - L85 were not covered by tests
}
return nil

Check warning on line 87 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L87

Added line #L87 was not covered by tests
}
7 changes: 7 additions & 0 deletions cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ func NewCommands() []*cli.Command {
Action: dumpDB,
Flags: cfgCountOutFlags,
},
{
Name: "dump-bin",
Usage: "Dump blocks (starting with the genesis or specified block) to the directory in binary format",
UsageText: "neo-go db dump-bin -o directory [-s start] [-c count] [--config-path path] [-p/-m/-t] [--config-file file]",
Action: dumpBin,
Flags: cfgCountOutFlags,
},
{
Name: "restore",
Usage: "Restore blocks from the file",
Expand Down
16 changes: 16 additions & 0 deletions config/protocol.testnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,19 @@ ApplicationConfiguration:
Enabled: false
Addresses:
- ":2113"
# NeoFSBlockFetcher:
# Enabled: true
# UnlockWallet:
# Path: "/notary_wallet.json"
# Password: "pass"
# Addresses:
# - st1.t5.fs.neo.org:8080
# Timeout: 10s
# ContainerID: "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG"
# Mode: "indexSearch"
# BatchSize: 100
# BlockAttribute: "index_block_2"
# OidAttribute: "block_oids"
# HeaderAttribute: "index_header"


2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
golang.org/x/term v0.18.0
golang.org/x/text v0.14.0
golang.org/x/tools v0.19.0
google.golang.org/grpc v1.62.0
gopkg.in/yaml.v3 v3.0.1
)

Expand Down Expand Up @@ -67,7 +68,6 @@ require (
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect
google.golang.org/grpc v1.62.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
)
21 changes: 15 additions & 6 deletions pkg/config/application_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ type ApplicationConfiguration struct {
Pprof BasicService `yaml:"Pprof"`
Prometheus BasicService `yaml:"Prometheus"`

Relay bool `yaml:"Relay"`
Consensus Consensus `yaml:"Consensus"`
RPC RPC `yaml:"RPC"`
Oracle OracleConfiguration `yaml:"Oracle"`
P2PNotary P2PNotary `yaml:"P2PNotary"`
StateRoot StateRoot `yaml:"StateRoot"`
Relay bool `yaml:"Relay"`
Consensus Consensus `yaml:"Consensus"`
RPC RPC `yaml:"RPC"`
Oracle OracleConfiguration `yaml:"Oracle"`
P2PNotary P2PNotary `yaml:"P2PNotary"`
StateRoot StateRoot `yaml:"StateRoot"`
NeoFSBlockFetcher NeoFSBlockFetcher `yaml:"NeoFSBlockFetcher"`
}

// EqualsButServices returns true when the o is the same as a except for services
Expand Down Expand Up @@ -145,3 +146,11 @@ func (a *ApplicationConfiguration) GetAddresses() ([]AnnounceableAddress, error)
}
return addrs, nil
}

// Validate checks the configuration for correctness.
func (a *ApplicationConfiguration) Validate() error {
if err := a.NeoFSBlockFetcher.Validate(); err != nil {
return err

Check warning on line 153 in pkg/config/application_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/application_config.go#L151-L153

Added lines #L151 - L153 were not covered by tests
}
return nil

Check warning on line 155 in pkg/config/application_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/application_config.go#L155

Added line #L155 was not covered by tests
}
35 changes: 35 additions & 0 deletions pkg/config/blockfetcher_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package config

import (
"errors"
"fmt"
"time"

cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
)

// NeoFSBlockFetcher represents the configuration for the blockfetcher service.
type NeoFSBlockFetcher struct {
InternalService `yaml:",inline"`
Timeout time.Duration `yaml:"Timeout"`
ContainerID string `yaml:"ContainerID"`
Mode string `yaml:"Mode"`
Addresses []string `yaml:"Addresses"`
BatchSize int `yaml:"BatchSize"`
BlockAttribute string `yaml:"BlockAttribute"`
OidAttribute string `yaml:"OidAttribute"`
HeaderAttribute string `yaml:"HeaderAttribute"`
}

// Validate checks the configuration for the blockfetcher service.
func (f NeoFSBlockFetcher) Validate() error {
if f.ContainerID == "" {
return errors.New("container ID is not set")

Check warning on line 27 in pkg/config/blockfetcher_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/blockfetcher_config.go#L25-L27

Added lines #L25 - L27 were not covered by tests
}
var containerID cid.ID
err := containerID.DecodeString(f.ContainerID)
if err != nil {
return fmt.Errorf("invalid container ID: %w", err)

Check warning on line 32 in pkg/config/blockfetcher_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/blockfetcher_config.go#L29-L32

Added lines #L29 - L32 were not covered by tests
}
return nil

Check warning on line 34 in pkg/config/blockfetcher_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/blockfetcher_config.go#L34

Added line #L34 was not covered by tests
}
23 changes: 22 additions & 1 deletion pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/network/capability"
"github.com/nspcc-dev/neo-go/pkg/network/extpool"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/services/blockfetcher"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -103,10 +104,12 @@ type (
chain Ledger
bQueue *bqueue.Queue
bSyncQueue *bqueue.Queue
bFetcherQueue *bqueue.Queue
mempool *mempool.Pool
notaryRequestPool *mempool.Pool
extensiblePool *extpool.Pool
notaryFeer NotaryFeer
blockFetcher *blockfetcher.Service

serviceLock sync.RWMutex
services map[string]Service
Expand Down Expand Up @@ -220,6 +223,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy

s.bSyncQueue = bqueue.New(s.stateSync, log, nil, updateBlockQueueLenMetric)

s.bFetcherQueue = bqueue.New(chain, log, nil, updateBlockQueueLenMetric)
s.blockFetcher = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, s.bFetcherQueue, log)

if s.MinPeers < 0 {
s.log.Info("bad MinPeers configured, using the default value",
zap.Int("configured", s.MinPeers),
Expand Down Expand Up @@ -295,6 +301,10 @@ func (s *Server) Start() {
go s.relayBlocksLoop()
go s.bQueue.Run()
go s.bSyncQueue.Run()
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
go s.bFetcherQueue.Run()
s.blockFetcher.Start()

Check warning on line 306 in pkg/network/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/server.go#L305-L306

Added lines #L305 - L306 were not covered by tests
}
for _, tr := range s.transports {
go tr.Accept()
}
Expand All @@ -319,6 +329,7 @@ func (s *Server) Shutdown() {
}
s.bQueue.Discard()
s.bSyncQueue.Discard()
s.bFetcherQueue.Discard()
s.serviceLock.RLock()
for _, svc := range s.services {
svc.Shutdown()
Expand Down Expand Up @@ -706,7 +717,7 @@ func (s *Server) IsInSync() bool {
var peersNumber int
var notHigher int

if s.stateSync.IsActive() {
if s.stateSync.IsActive() || s.blockFetcher.IsActive() {
return false
}

Expand Down Expand Up @@ -766,6 +777,9 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error {

// handleBlockCmd processes the block received from its peer.
func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {
if s.blockFetcher.IsActive() {
return nil

Check warning on line 781 in pkg/network/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/server.go#L781

Added line #L781 was not covered by tests
}
if s.stateSync.IsActive() {
return s.bSyncQueue.PutBlock(block)
}
Expand All @@ -786,6 +800,9 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error {
}

func (s *Server) requestBlocksOrHeaders(p Peer) error {
if s.blockFetcher.IsActive() {
return nil

Check warning on line 804 in pkg/network/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/server.go#L804

Added line #L804 was not covered by tests
}
if s.stateSync.NeedHeaders() {
if s.chain.HeaderHeight() < p.LastBlockIndex() {
return s.requestHeaders(p)
Expand Down Expand Up @@ -1434,6 +1451,10 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
}

func (s *Server) tryInitStateSync() {
if s.blockFetcher.IsActive() {
s.log.Info("Postponing StateSync until BlockFetcher completes")
return

Check warning on line 1456 in pkg/network/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/server.go#L1455-L1456

Added lines #L1455 - L1456 were not covered by tests
}
if !s.stateSync.IsActive() {
s.bSyncQueue.Discard()
return
Expand Down
40 changes: 22 additions & 18 deletions pkg/network/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type (

// BroadcastFactor is the factor (0-100) for fan-out optimization.
BroadcastFactor int

// NeoFSBlockFetcherCfg is the configuration for the blockfetcher service.
NeoFSBlockFetcherCfg config.NeoFSBlockFetcher
}
)

Expand All @@ -89,24 +92,25 @@ func NewServerConfig(cfg config.Config) (ServerConfig, error) {
return ServerConfig{}, fmt.Errorf("failed to parse addresses: %w", err)
}
c := ServerConfig{
UserAgent: cfg.GenerateUserAgent(),
Addresses: addrs,
Net: protoConfig.Magic,
Relay: appConfig.Relay,
Seeds: protoConfig.SeedList,
DialTimeout: appConfig.P2P.DialTimeout,
ProtoTickInterval: appConfig.P2P.ProtoTickInterval,
PingInterval: appConfig.P2P.PingInterval,
PingTimeout: appConfig.P2P.PingTimeout,
MaxPeers: appConfig.P2P.MaxPeers,
AttemptConnPeers: appConfig.P2P.AttemptConnPeers,
MinPeers: appConfig.P2P.MinPeers,
TimePerBlock: protoConfig.TimePerBlock,
OracleCfg: appConfig.Oracle,
P2PNotaryCfg: appConfig.P2PNotary,
StateRootCfg: appConfig.StateRoot,
ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize,
BroadcastFactor: appConfig.P2P.BroadcastFactor,
UserAgent: cfg.GenerateUserAgent(),
Addresses: addrs,
Net: protoConfig.Magic,
Relay: appConfig.Relay,
Seeds: protoConfig.SeedList,
DialTimeout: appConfig.P2P.DialTimeout,
ProtoTickInterval: appConfig.P2P.ProtoTickInterval,
PingInterval: appConfig.P2P.PingInterval,
PingTimeout: appConfig.P2P.PingTimeout,
MaxPeers: appConfig.P2P.MaxPeers,
AttemptConnPeers: appConfig.P2P.AttemptConnPeers,
MinPeers: appConfig.P2P.MinPeers,
TimePerBlock: protoConfig.TimePerBlock,
OracleCfg: appConfig.Oracle,
P2PNotaryCfg: appConfig.P2PNotary,
StateRootCfg: appConfig.StateRoot,
ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize,
BroadcastFactor: appConfig.P2P.BroadcastFactor,
NeoFSBlockFetcherCfg: appConfig.NeoFSBlockFetcher,
}
return c, nil
}
Loading

0 comments on commit 70cfc5a

Please sign in to comment.