Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Improved multinode proxy #249

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 156 additions & 27 deletions deps/apiinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@ package deps

import (
"context"
"errors"
"fmt"
"math/rand"
"net/http"
"reflect"
"sync"
"time"

"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/big"

"github.com/filecoin-project/curio/api"

"github.com/filecoin-project/lotus/chain/types"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/lib/retry"
)

func GetFullNodeAPIV1Curio(ctx *cli.Context, ainfoCfg []string) (api.Chain, jsonrpc.ClientCloser, error) {
Expand Down Expand Up @@ -55,10 +59,8 @@ func GetFullNodeAPIV1Curio(ctx *cli.Context, ainfoCfg []string) (api.Chain, json
closers = append(closers, closer)
}

// When running in cluster mode and trying to establish connections to multiple nodes, fail
// if less than 2 lotus nodes are actually running
if len(httpHeads) > 1 && len(fullNodes) < 2 {
return nil, nil, xerrors.Errorf("Not able to establish connection to more than a single node")
if len(fullNodes) == 0 {
return nil, nil, xerrors.Errorf("failed to establish connection with all nodes")
}

finalCloser := func() {
Expand Down Expand Up @@ -96,54 +98,152 @@ func newChainNodeRPCV1(ctx context.Context, addr string, requestHeader http.Head
return &res, closer, err
}

const initialBackoff = time.Second
const maxRetryAttempts = 5
const maxBehinhBestHealthy = 1
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved

var errorsToRetry = []error{&jsonrpc.RPCConnectionError{}, &jsonrpc.ErrClient{}}

const preferredAllBad = -1

// FullNodeProxy creates a proxy for the Chain API
// TODO: port improvements here from https:/filecoin-project/lotus/pull/11470
func FullNodeProxy[T api.Chain](ins []T, outstr *api.ChainStruct) {
providerCount := len(ins)

var healthyLk sync.Mutex
unhealthyProviders := make([]bool, providerCount)

nextHealthyProvider := func(start int) int {
healthyLk.Lock()
defer healthyLk.Unlock()

for i := 0; i < providerCount; i++ {
idx := (start + i) % providerCount
if !unhealthyProviders[idx] {
return idx
}
}
return preferredAllBad
}

// watch provider health
startWatch := func() {
if len(ins) == 1 {
// not like we have any onter node to go to..
return
}

// don't bother for short-running commands
time.Sleep(250 * time.Millisecond)

var bestKnownTipset, nextBestKnownTipset *types.TipSet

for {
var wg sync.WaitGroup
wg.Add(providerCount)

for i := 0; i < providerCount; i++ {
go func(i int) {
defer wg.Done()

toctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) // todo better timeout
ch, err := ins[i].ChainHead(toctx)
cancel()

// error is definitely not healthy
if err != nil {
healthyLk.Lock()
unhealthyProviders[i] = true
healthyLk.Unlock()

log.Errorw("rpc check chain head call failed", "fail_type", "rpc_error", "provider", i, "error", err)
return
}

healthyLk.Lock()
// maybe set best next
if nextBestKnownTipset == nil || big.Cmp(ch.ParentWeight(), nextBestKnownTipset.ParentWeight()) > 0 || len(ch.Blocks()) > len(nextBestKnownTipset.Blocks()) {
nextBestKnownTipset = ch
}

if bestKnownTipset != nil {
// if we're behind the best tipset, mark as unhealthy
unhealthyProviders[i] = ch.Height() < bestKnownTipset.Height()-maxBehinhBestHealthy
if unhealthyProviders[i] {
log.Errorw("rpc check chain head call failed", "fail_type", "behind_best", "provider", i, "height", ch.Height(), "best_height", bestKnownTipset.Height())
}
}
healthyLk.Unlock()
}(i)
}

wg.Wait()
bestKnownTipset = nextBestKnownTipset

time.Sleep(5 * time.Second)
}
}
var starWatchOnce sync.Once

// populate output api proxy

outs := api.GetInternalStructs(outstr)

var rins []reflect.Value
var apiProviders []reflect.Value
for _, in := range ins {
rins = append(rins, reflect.ValueOf(in))
apiProviders = append(apiProviders, reflect.ValueOf(in))
}

for _, out := range outs {
rProxyInternal := reflect.ValueOf(out).Elem()
rOutStruct := reflect.ValueOf(out).Elem()

for f := 0; f < rProxyInternal.NumField(); f++ {
field := rProxyInternal.Type().Field(f)
for f := 0; f < rOutStruct.NumField(); f++ {
field := rOutStruct.Type().Field(f)

var fns []reflect.Value
for _, rin := range rins {
fns = append(fns, rin.MethodByName(field.Name))
var providerFuncs []reflect.Value
for _, rin := range apiProviders {
mv := rin.MethodByName(field.Name)
if !mv.IsValid() {
continue
}
providerFuncs = append(providerFuncs, mv)
}

rProxyInternal.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) {
errorsToRetry := []error{&jsonrpc.RPCConnectionError{}, &jsonrpc.ErrClient{}}
initialBackoff, err := time.ParseDuration("1s")
if err != nil {
return nil
}
rOutStruct.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) {
starWatchOnce.Do(func() {
go startWatch()
})

ctx := args[0].Interface().(context.Context)

curr := -1
preferredProvider := new(int)
*preferredProvider = nextHealthyProvider(0)
if *preferredProvider == preferredAllBad {
// select at random, retry will do it's best
*preferredProvider = rand.Intn(providerCount)
}

// for calls that need to be performed on the same node
// primarily for miner when calling create block and submit block subsequently
key := contextKey("retry-node")
if ctx.Value(key) != nil {
if (*ctx.Value(key).(**int)) == nil {
*ctx.Value(key).(**int) = &curr
*ctx.Value(key).(**int) = preferredProvider
} else {
curr = **ctx.Value(key).(**int) - 1
preferredProvider = *ctx.Value(key).(**int)
}
}

total := len(rins)
result, _ := retry.Retry(ctx, 5, initialBackoff, errorsToRetry, func() ([]reflect.Value, error) {
curr = (curr + 1) % total
result, _ := Retry(ctx, maxRetryAttempts, initialBackoff, errorsToRetry, func(isRetry bool) ([]reflect.Value, error) {
if isRetry {
pp := nextHealthyProvider(*preferredProvider + 1)
if pp == -1 {
return nil, xerrors.Errorf("no healthy providers")
}
*preferredProvider = pp
}

result := fns[curr].Call(args)
result := providerFuncs[*preferredProvider].Call(args)
if result[len(result)-1].IsNil() {
return result, nil
}
Expand All @@ -155,3 +255,32 @@ func FullNodeProxy[T api.Chain](ins []T, outstr *api.ChainStruct) {
}
}
}

func Retry[T any](ctx context.Context, attempts int, initialBackoff time.Duration, errorTypes []error, f func(isRetry bool) (T, error)) (result T, err error) {
for i := 0; i < attempts; i++ {
if i > 0 {
log.Info("Retrying after error:", err)
time.Sleep(initialBackoff)
initialBackoff *= 2
}
result, err = f(i > 0)
if err == nil || !ErrorIsIn(err, errorTypes) {
return result, err
}
if ctx.Err() != nil {
return result, ctx.Err()
}
}
log.Errorf("Failed after %d attempts, last error: %s", attempts, err)
return result, err
}

func ErrorIsIn(err error, errorTypes []error) bool {
for _, etype := range errorTypes {
tmp := reflect.New(reflect.PointerTo(reflect.ValueOf(etype).Elem().Type())).Interface()
if errors.As(err, &tmp) {
return true
}
}
return false
}
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved
Loading