Skip to content

Commit

Permalink
services: add new service for fetching blocks from NeoFS
Browse files Browse the repository at this point in the history
Close #3496

Co-authored-by: Anna Shaleva <[email protected]>
Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland and AnnaShaleva committed Sep 5, 2024
1 parent ef0f72c commit 7696a5a
Show file tree
Hide file tree
Showing 10 changed files with 646 additions and 49 deletions.
12 changes: 12 additions & 0 deletions config/protocol.testnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,15 @@ ApplicationConfiguration:
Enabled: false
Addresses:
- ":2113"
NeoFSBlockFetcher:
Enabled: true
Addresses:
- st1.storage.fs.neo.org:8080
Timeout: 10m
DownloaderWorkersCount: 500
OIDBatchSize: 8000
BQueueSize: 16000 # must be larger than OIDBatchSize; highly recommended to be 2*OIDBatchSize or 3*OIDBatchSize
SkipIndexFilesSearch: false
ContainerID: "EPGuD26wYgQJbmDdVBoYoNZiMKHwFMJT3A5WqPjdUHxH"
BlockAttribute: "block"
OidAttribute: "oid"
35 changes: 35 additions & 0 deletions docs/node-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ node-related settings described in the table below.
| GarbageCollectionPeriod | `uint32` | 10000 | Controls MPT garbage collection interval (in blocks) for configurations with `RemoveUntraceableBlocks` enabled and `KeepOnlyLatestState` disabled. In this mode the node stores a number of MPT trees (corresponding to `MaxTraceableBlocks` and `StateSyncInterval`), but the DB needs to be clean from old entries from time to time. Doing it too often will cause too much processing overhead, doing it too rarely will leave more useless data in the DB. |
| KeepOnlyLatestState | `bool` | `false` | Specifies if MPT should only store the latest state (or a set of latest states, see `P2PStateExchangeExtensions` section in the ProtocolConfiguration for details). If true, DB size will be smaller, but older roots won't be accessible. This value should remain the same for the same database. | |
| LogPath | `string` | "", so only console logging | File path where to store node logs. |
| NeoFSBlockFetcher | [NeoFSBlockFetcher Configuration](#NeoFSBlockFetcher-Configuration) | | NeoFSBlockFetcher module configuration. See the [NeoFSBlockFetcher Configuration](#NeoFSBlockFetcher-Configuration) section for details. |
| Oracle | [Oracle Configuration](#Oracle-Configuration) | | Oracle module configuration. See the [Oracle Configuration](#Oracle-Configuration) section for details. |
| P2P | [P2P Configuration](#P2P-Configuration) | | Configuration values for P2P network interaction. See the [P2P Configuration](#P2P-Configuration) section for details. |
| P2PNotary | [P2P Notary Configuration](#P2P-Notary-Configuration) | | P2P Notary module configuration. See the [P2P Notary Configuration](#P2P-Notary-Configuration) section for details. |
Expand Down Expand Up @@ -323,6 +324,40 @@ where:
- `Path` is a path to wallet.
- `Password` is a wallet password.

### NeoFSBlockFetcher Configuration
`NeoFSBlockFetcher` configuration section contains settings for NeoFS block fetcher
module and has the following structure:
```
NeoFSBlockFetcher:
Enabled: true
Addresses:
- st1.storage.fs.neo.org:8080
Timeout: 10m
DownloaderWorkersCount: 500
OIDBatchSize: 8000
BQueueSize: 16000
SkipIndexFilesSearch: false
ContainerID: "EPGuD26wYgQJbmDdVBoYoNZiMKHwFMJT3A5WqPjdUHxH"
BlockAttribute: "block"
OidAttribute: "oid"
```
where:
- `Enabled` enables NeoFS block fetcher module.
- `UnlockWallet` contains wallet settings, see
[Unlock Wallet Configuration](#Unlock-Wallet-Configuration) section for
structure details. Without this setting, the module will use new generated private key.
- `Addresses` is a list of NeoFS storage nodes addresses.
- `Timeout` is a timeout for a single request to NeoFS storage node.
- `ContainerID` is a container ID to fetch blocks from.
- `BlockAttribute` is an attribute name of NeoFS object that contains block data.
- `OidAttribute` is an attribute name of NeoFS object that contains OIDs of blocks objects.
- `DownloaderWorkersCount` is a number of workers that download blocks from NeoFS.
- `OIDBatchSize` is a number of OIDs to search and fetch from NeoFS in one request in case of SkipIndexFilesSearch=true.
- `BQueueSize` is a size of the block queue. It must be larger than `OIDBatchSize` and
highly recommended to be 2*`OIDBatchSize` or 3*`OIDBatchSize`.
- `SkipIndexFilesSearch` is a flag that allows skipping index files search in NeoFS
storage nodes and search for blocks directly. It is set to `false` by default.

## Protocol Configuration

`ProtocolConfiguration` section of `yaml` node configuration file contains
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
golang.org/x/term v0.18.0
golang.org/x/text v0.14.0
golang.org/x/tools v0.19.0
google.golang.org/grpc v1.62.0
gopkg.in/yaml.v3 v3.0.1
)

Expand Down Expand Up @@ -69,7 +70,6 @@ require (
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect
google.golang.org/grpc v1.62.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
)
23 changes: 17 additions & 6 deletions pkg/config/application_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ type ApplicationConfiguration struct {
Pprof BasicService `yaml:"Pprof"`
Prometheus BasicService `yaml:"Prometheus"`

Relay bool `yaml:"Relay"`
Consensus Consensus `yaml:"Consensus"`
RPC RPC `yaml:"RPC"`
Oracle OracleConfiguration `yaml:"Oracle"`
P2PNotary P2PNotary `yaml:"P2PNotary"`
StateRoot StateRoot `yaml:"StateRoot"`
Relay bool `yaml:"Relay"`
Consensus Consensus `yaml:"Consensus"`
RPC RPC `yaml:"RPC"`
Oracle OracleConfiguration `yaml:"Oracle"`
P2PNotary P2PNotary `yaml:"P2PNotary"`
StateRoot StateRoot `yaml:"StateRoot"`
NeoFSBlockFetcher NeoFSBlockFetcher `yaml:"NeoFSBlockFetcher"`

Check failure on line 32 in pkg/config/application_config.go

View workflow job for this annotation

GitHub Actions / Lint

File is not `gofmt`-ed with `-s` (gofmt)
}

// EqualsButServices returns true when the o is the same as a except for services
Expand Down Expand Up @@ -141,3 +142,13 @@ func (a *ApplicationConfiguration) GetAddresses() ([]AnnounceableAddress, error)
}
return addrs, nil
}

// Validate checks ApplicationConfiguration for internal consistency and returns
// an error if any invalid settings are found. This ensures that the application
// configuration is valid and safe to use for further operations.
func (a *ApplicationConfiguration) Validate() error {
if err := a.NeoFSBlockFetcher.Validate(); err != nil {
return fmt.Errorf("failed to validate NeoFSBlockFetcher section: %w", err)
}
return nil
}
60 changes: 60 additions & 0 deletions pkg/config/blockfetcher_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package config

import (
"errors"
"fmt"
"time"

cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
)

const (
DefaultTimeout = 1 * time.Minute
DefaultOIDBatchSize = 8000
DefaultDownloaderWorkersCount = 500
)

// NeoFSBlockFetcher represents the configuration for the NeoFS block fetcher service.
type NeoFSBlockFetcher struct {
InternalService `yaml:",inline"`
Timeout time.Duration `yaml:"Timeout"`
ContainerID string `yaml:"ContainerID"`
Addresses []string `yaml:"Addresses"`
OIDBatchSize int `yaml:"OIDBatchSize"` // valid only for SkipIndexFilesSearch = true
BlockAttribute string `yaml:"BlockAttribute"`
OidAttribute string `yaml:"OidAttribute"`
HeaderAttribute string `yaml:"HeaderAttribute"`
DownloaderWorkersCount int `yaml:"DownloaderWorkersCount"`
BQueueSize int `yaml:"BQueueSize"`
SkipIndexFilesSearch bool `yaml:"SkipIndexFilesSearch"`
}

// Validate checks NeoFSBlockFetcher for internal consistency and ensures
// that all required fields are properly set. It returns an error if the
// configuration is invalid or if the ContainerID cannot be properly decoded.
func (cfg *NeoFSBlockFetcher) Validate() error {
if !cfg.Enabled {
return nil
}
if cfg.ContainerID == "" {
return errors.New("container ID is not set")
}
var containerID cid.ID
err := containerID.DecodeString(cfg.ContainerID)
if err != nil {
return fmt.Errorf("invalid container ID: %w", err)
}
if cfg.Timeout == 0 {
cfg.Timeout = DefaultTimeout
}
if cfg.OIDBatchSize == 0 {
cfg.OIDBatchSize = DefaultOIDBatchSize
}
if cfg.DownloaderWorkersCount <= 0 {
cfg.DownloaderWorkersCount = DefaultDownloaderWorkersCount
}
if cfg.BQueueSize < cfg.OIDBatchSize {
return fmt.Errorf("BQueueSize (%d) is lower than OIDBatchSize (%d)", cfg.BQueueSize, cfg.OIDBatchSize)
}
return nil
}
4 changes: 4 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ func LoadFile(configPath string, relativePath ...string) (Config, error) {
if err != nil {
return Config{}, err
}
err = config.ApplicationConfiguration.Validate()
if err != nil {
return Config{}, err
}

return config, nil
}
Expand Down
82 changes: 58 additions & 24 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/network/capability"
"github.com/nspcc-dev/neo-go/pkg/network/extpool"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/services/blockfetcher"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -103,10 +104,12 @@ type (
chain Ledger
bQueue *bqueue.Queue
bSyncQueue *bqueue.Queue
bFetcherQueue *bqueue.Queue
mempool *mempool.Pool
notaryRequestPool *mempool.Pool
extensiblePool *extpool.Pool
notaryFeer NotaryFeer
blockFetcher *blockfetcher.Service

serviceLock sync.RWMutex
services map[string]Service
Expand All @@ -133,6 +136,7 @@ type (
runFin chan struct{}
broadcastTxFin chan struct{}
runProtoFin chan struct{}
blockFetcherFin chan struct{}

transactions chan *transaction.Transaction

Expand Down Expand Up @@ -171,6 +175,7 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
newTransport func(*Server, string) Transporter,
newDiscovery func([]string, time.Duration, Transporter) Discoverer,
) (*Server, error) {
var err error
if log == nil {
return nil, errors.New("logger is a required parameter")
}
Expand All @@ -182,28 +187,30 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
}

s := &Server{
ServerConfig: config,
chain: chain,
id: randomID(),
config: chain.GetConfig().ProtocolConfiguration,
quit: make(chan struct{}),
relayFin: make(chan struct{}),
runFin: make(chan struct{}),
broadcastTxFin: make(chan struct{}),
runProtoFin: make(chan struct{}),
register: make(chan Peer),
unregister: make(chan peerDrop),
handshake: make(chan Peer),
txInMap: make(map[util.Uint256]struct{}),
peers: make(map[Peer]bool),
mempool: chain.GetMemPool(),
extensiblePool: extpool.New(chain, config.ExtensiblePoolSize),
log: log,
txin: make(chan *transaction.Transaction, 64),
transactions: make(chan *transaction.Transaction, 64),
services: make(map[string]Service),
extensHandlers: make(map[string]func(*payload.Extensible) error),
stateSync: stSync,
ServerConfig: config,
chain: chain,
id: randomID(),
config: chain.GetConfig().ProtocolConfiguration,
quit: make(chan struct{}),
relayFin: make(chan struct{}),
runFin: make(chan struct{}),
broadcastTxFin: make(chan struct{}),
runProtoFin: make(chan struct{}),
blockFetcherFin: make(chan struct{}, 1),
register: make(chan Peer),
unregister: make(chan peerDrop),
handshake: make(chan Peer),
txInMap: make(map[util.Uint256]struct{}),
peers: make(map[Peer]bool),
mempool: chain.GetMemPool(),
extensiblePool: extpool.New(chain, config.ExtensiblePoolSize),
log: log,
txin: make(chan *transaction.Transaction, 64),
transactions: make(chan *transaction.Transaction, 64),
services: make(map[string]Service),
extensHandlers: make(map[string]func(*payload.Extensible) error),
stateSync: stSync,

Check failure on line 213 in pkg/network/server.go

View workflow job for this annotation

GitHub Actions / Lint

File is not `gofmt`-ed with `-s` (gofmt)
}
if chain.P2PSigExtensionsEnabled() {
s.notaryFeer = NewNotaryFeer(chain)
Expand All @@ -219,7 +226,13 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
}, bqueue.CacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)

s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.CacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)

s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking)
s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, func() {
close(s.blockFetcherFin)
})
if err != nil {
return nil, err
}
if s.MinPeers < 0 {
s.log.Info("bad MinPeers configured, using the default value",
zap.Int("configured", s.MinPeers),
Expand Down Expand Up @@ -295,6 +308,14 @@ func (s *Server) Start() {
go s.relayBlocksLoop()
go s.bQueue.Run()
go s.bSyncQueue.Run()
go s.bFetcherQueue.Run()
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled && s.blockFetcher != nil {
err := s.blockFetcher.Start()
if err != nil {
s.log.Error("NeoFS block fetcher service:", zap.Error(err))
s.blockFetcher.Shutdown()
}
}
for _, tr := range s.transports {
go tr.Accept()
}
Expand All @@ -319,6 +340,7 @@ func (s *Server) Shutdown() {
}
s.bQueue.Discard()
s.bSyncQueue.Discard()
s.bFetcherQueue.Discard()
s.serviceLock.RLock()
for _, svc := range s.services {
svc.Shutdown()
Expand Down Expand Up @@ -546,6 +568,9 @@ func (s *Server) run() {

s.discovery.RegisterGood(p)

s.tryInitStateSync()
s.tryStartServices()
case <-s.blockFetcherFin:
s.tryInitStateSync()
s.tryStartServices()
}
Expand Down Expand Up @@ -702,7 +727,7 @@ func (s *Server) IsInSync() bool {
var peersNumber int
var notHigher int

if s.stateSync.IsActive() {
if s.stateSync.IsActive() || s.blockFetcher.IsActive() {
return false
}

Expand Down Expand Up @@ -762,6 +787,9 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error {

// handleBlockCmd processes the block received from its peer.
func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {
if s.blockFetcher.IsActive() {
return nil
}
if s.stateSync.IsActive() {
return s.bSyncQueue.PutBlock(block)
}
Expand All @@ -782,6 +810,9 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error {
}

func (s *Server) requestBlocksOrHeaders(p Peer) error {
if s.blockFetcher.IsActive() {
return nil
}
if s.stateSync.NeedHeaders() {
if s.chain.HeaderHeight() < p.LastBlockIndex() {
return s.requestHeaders(p)
Expand Down Expand Up @@ -1428,6 +1459,9 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
}

func (s *Server) tryInitStateSync() {
if s.blockFetcher.IsActive() {
return
}
if !s.stateSync.IsActive() {
s.bSyncQueue.Discard()
return
Expand Down
Loading

0 comments on commit 7696a5a

Please sign in to comment.