diff --git a/cmd/cannon.go b/cmd/cannon.go index 12d8065b..c4f27081 100644 --- a/cmd/cannon.go +++ b/cmd/cannon.go @@ -14,6 +14,84 @@ var ( cannonCfgFile string ) +type CannonOverride struct { + FlagHelper func(cmd *cobra.Command) + Setter func(cmd *cobra.Command, overrides *cannon.Override) error +} + +type CannonOverrideConfig struct { + FlagName string + EnvName string + Description string + OverrideFunc func(val string, overrides *cannon.Override) +} + +func createCannonOverride(config CannonOverrideConfig) CannonOverride { + return CannonOverride{ + FlagHelper: func(cmd *cobra.Command) { + cmd.Flags().String(config.FlagName, "", config.Description+` (env: `+config.EnvName+`)`) + }, + Setter: func(cmd *cobra.Command, overrides *cannon.Override) error { + val := "" + + if cmd.Flags().Changed(config.FlagName) { + val = cmd.Flags().Lookup(config.FlagName).Value.String() + } + + if os.Getenv(config.EnvName) != "" { + val = os.Getenv(config.EnvName) + } + + if val == "" { + return nil + } + + config.OverrideFunc(val, overrides) + + return nil + }, + } +} + +var CannonOverrides = []CannonOverride{ + createCannonOverride(CannonOverrideConfig{ + FlagName: "cannon-xatu-output-authorization", + EnvName: "CANNON_XATU_OUTPUT_AUTHORIZATION", + Description: "sets the authorization secret for all xatu outputs", + OverrideFunc: func(val string, overrides *cannon.Override) { + overrides.XatuOutputAuth.Enabled = true + overrides.XatuOutputAuth.Value = val + }, + }), + createCannonOverride(CannonOverrideConfig{ + FlagName: "cannon-xatu-coordinator-authorization", + EnvName: "CANNON_XATU_COORDINATOR_AUTHORIZATION", + Description: "sets the authorization secret for the xatu coordinator", + OverrideFunc: func(val string, overrides *cannon.Override) { + overrides.XatuCoordinatorAuth.Enabled = true + overrides.XatuCoordinatorAuth.Value = val + }, + }), + createCannonOverride(CannonOverrideConfig{ + FlagName: "cannon-beacon-node-url", + EnvName: "CANNON_BEACON_NODE_URL", + Description: "sets the beacon node url", + OverrideFunc: func(val string, overrides *cannon.Override) { + overrides.BeaconNodeURL.Enabled = true + overrides.BeaconNodeURL.Value = val + }, + }), + createCannonOverride(CannonOverrideConfig{ + FlagName: "cannon-beacon-node-authorization-header", + EnvName: "CANNON_BEACON_NODE_AUTHORIZATION_HEADER", + Description: "sets the beacon node authorization header", + OverrideFunc: func(val string, overrides *cannon.Override) { + overrides.BeaconNodeAuthorizationHeader.Enabled = true + overrides.BeaconNodeAuthorizationHeader.Value = val + }, + }), +} + // cannonCmd represents the cannon command var cannonCmd = &cobra.Command{ Use: "cannon", @@ -32,7 +110,15 @@ var cannonCmd = &cobra.Command{ log.WithField("location", cannonCfgFile).Info("Loaded config") - cannon, err := cannon.New(cmd.Context(), log, config) + overrides := &cannon.Override{} + + for _, override := range CannonOverrides { + if errr := override.Setter(cmd, overrides); errr != nil { + log.Fatal(errr) + } + } + + cannon, err := cannon.New(cmd.Context(), log, config, overrides) if err != nil { log.Fatal(err) } @@ -49,6 +135,10 @@ func init() { rootCmd.AddCommand(cannonCmd) cannonCmd.Flags().StringVar(&cannonCfgFile, "config", "cannon.yaml", "config file (default is cannon.yaml)") + + for _, override := range CannonOverrides { + override.FlagHelper(cannonCmd) + } } func loadcannonConfigFromFile(file string) (*cannon.Config, error) { diff --git a/cmd/server.go b/cmd/server.go index 66ec30d0..6a5191ed 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -13,12 +13,78 @@ var ( serverCfgFile string ) +type ServerOverride struct { + FlagHelper func(cmd *cobra.Command) + Setter func(cmd *cobra.Command, overrides *server.Override) error +} + +type ServerOverrideConfig struct { + FlagName string + EnvName string + Description string + OverrideFunc func(val string, overrides *server.Override) +} + +func createServerOverride(config ServerOverrideConfig) ServerOverride { + return ServerOverride{ + FlagHelper: func(cmd *cobra.Command) { + cmd.Flags().String(config.FlagName, "", config.Description+` (env: `+config.EnvName+`)`) + }, + Setter: func(cmd *cobra.Command, overrides *server.Override) error { + val := "" + + if cmd.Flags().Changed(config.FlagName) { + val = cmd.Flags().Lookup(config.FlagName).Value.String() + } + + if os.Getenv(config.EnvName) != "" { + val = os.Getenv(config.EnvName) + } + + if val == "" { + return nil + } + + config.OverrideFunc(val, overrides) + + return nil + }, + } +} + +var ServerOverrides = []ServerOverride{ + createServerOverride(ServerOverrideConfig{ + FlagName: "server-event-ingester-basic-auth-username", + EnvName: "SERVER_EVENT_INGESTER_BASIC_AUTH_USERNAME", + Description: "sets the basic auth username for the event ingester", + OverrideFunc: func(val string, overrides *server.Override) { + overrides.EventIngesterBasicAuth.Username = val + }, + }), + createServerOverride(ServerOverrideConfig{ + FlagName: "server-event-ingester-basic-auth-password", + EnvName: "SERVER_EVENT_INGESTER_BASIC_AUTH_PASSWORD", + Description: "sets the basic auth password for the event ingester", + OverrideFunc: func(val string, overrides *server.Override) { + overrides.EventIngesterBasicAuth.Password = val + }, + }), + createServerOverride(ServerOverrideConfig{ + FlagName: "server-coordinator-auth-secret", + EnvName: "SERVER_COORDINATOR_AUTH_SECRET", + Description: "sets the auth secret for the coordinator", + OverrideFunc: func(val string, overrides *server.Override) { + overrides.CoordinatorAuth.AuthSecret = val + }, + }), +} + // serverCmd represents the server command var serverCmd = &cobra.Command{ Use: "server", Short: "Runs Xatu in Server mode.", Long: `Runs Xatu in Server mode, which means it will listen to gRPC requests from - Xatu Sentry nodes and forward the data on to the configured sinks.`, + Xatu modules and forward the data on to the configured sinks.`, Run: func(cmd *cobra.Command, args []string) { initCommon() @@ -31,7 +97,15 @@ var serverCmd = &cobra.Command{ log.WithField("location", serverCfgFile).Info("Loaded config") - server, err := server.NewXatu(cmd.Context(), log, config) + o := &server.Override{} + + for _, override := range ServerOverrides { + if errr := override.Setter(cmd, o); errr != nil { + log.Fatal(errr) + } + } + + server, err := server.NewXatu(cmd.Context(), log, config, o) if err != nil { log.Fatal(err) } @@ -48,6 +122,10 @@ func init() { rootCmd.AddCommand(serverCmd) serverCmd.Flags().StringVar(&serverCfgFile, "config", "server.yaml", "config file (default is server.yaml)") + + for _, override := range ServerOverrides { + override.FlagHelper(serverCmd) + } } func loadServerConfigFromFile(file string) (*server.Config, error) { diff --git a/deploy/local/docker-compose/xatu-cannon.yaml b/deploy/local/docker-compose/xatu-cannon.yaml new file mode 100644 index 00000000..df377a5d --- /dev/null +++ b/deploy/local/docker-compose/xatu-cannon.yaml @@ -0,0 +1,80 @@ +logging: "info" # panic,fatal,warn,info,debug,trace +metricsAddr: ":9097" +pprofAddr: ":6061" # optional. if supplied it enables pprof server + +name: xatu-cannon + +# derivers: +# attesterSlashing: +# enabled: false +# blsToExecutionChange: +# enabled: false +# deposit: +# enabled: false +# withdrawal: +# enabled: false +# executionTransaction: +# enabled: false +# proposerSlashing: +# enabled: false +# voluntaryExit: +# enabled: false +# beaconBlock: +# enabled: true +# beaconBlobSidecar: +# enabled: false +# proposerDuty: +# enabled: true +# elaboratedAttestation: +# enabled: false +# beaconValidators: +# enabled: false +# elaboratedAttestation: +# enabled: false +# beaconCommittee: +# enabled: false + + # blockClassification: + # enabled: false + + + +# Better to use a NTP server close eg. +# time.aws.com - AWS +# time.windows.com - Azure +# time.google.com - GCP +# pool.ntp.org - https://www.pool.ntp.org/zone/@ +ntpServer: time.google.com + +tracing: + enabled: true + endpoint: tempo:4318 + insecure: true + sampling: + rate: 0.1 + +ethereum: + beaconNodeAddress: http://SET_ME:5052 + # overrideNetworkName: mainnet + # blockCacheSize: 10000 + # blockCacheTtl: 3h + # blockPreloadWorkers: 25 + +coordinator: + address: xatu-server:8080 + headers: + Authorization: "Bearer SET_ME" + +outputs: + - name: xatu + type: xatu + config: + address: xatu-server:8080 + tls: false + maxQueueSize: 51200 + batchTimeout: 0.5s + exportTimeout: 30s + maxExportBatchSize: 32 + workers: 50 + headers: + Authorization: "Bearer SET_ME" \ No newline at end of file diff --git a/deploy/local/docker-compose/xatu-server.yaml b/deploy/local/docker-compose/xatu-server.yaml index 331f901f..7a8d56bc 100644 --- a/deploy/local/docker-compose/xatu-server.yaml +++ b/deploy/local/docker-compose/xatu-server.yaml @@ -38,7 +38,7 @@ services: enabled: true # requires persistence to be enabled auth: enabled: false - secret: "super-secret-token" + # secret: "super-secret-token" nodeRecord: maxQueueSize: 51200 batchTimeout: 5s @@ -48,27 +48,27 @@ services: enabled: true clientNameSalt: "example_salt" authorization: - enabled: false - groups: - default: - obscureClientNames: true - users: - user_a: - password: password - shane: - password: warne - restricted: - eventFilter: - eventNames: - - BEACON_API_ETH_V2_BEACON_BLOCK_V2 - - BEACON_API_ETH_V1_EVENTS_BLOB_SIDECAR - - BEACON_API_ETH_V1_EVENTS_BLOCK_V2 - - BEACON_API_ETH_V1_EVENTS_CHAIN_REORG_V2 - - BEACON_API_ETH_V1_EVENTS_FINALIZED_CHECKPOINT_V2 - - BEACON_API_ETH_V1_EVENTS_HEAD_V2 - users: - user_b: - password: password + # enabled: false + # groups: + # default: + # obscureClientNames: true + # users: + # user_a: + # password: password + # shane: + # password: warne + # restricted: + # eventFilter: + # eventNames: + # - BEACON_API_ETH_V2_BEACON_BLOCK_V2 + # - BEACON_API_ETH_V1_EVENTS_BLOB_SIDECAR + # - BEACON_API_ETH_V1_EVENTS_BLOCK_V2 + # - BEACON_API_ETH_V1_EVENTS_CHAIN_REORG_V2 + # - BEACON_API_ETH_V1_EVENTS_FINALIZED_CHECKPOINT_V2 + # - BEACON_API_ETH_V1_EVENTS_HEAD_V2 + # users: + # user_b: + # password: password outputs: - name: general type: http diff --git a/docker-compose.yml b/docker-compose.yml index 6b30fbb0..55ae76a1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -153,7 +153,9 @@ services: ports: - "${XATU_SERVER_ADDRESS:-0.0.0.0}:${XATU_SERVER_PORT:-8080}:8080" environment: - EVENT_INGESTER_AUTHORIZATION_SECRET: ${EVENT_INGESTER_AUTHORIZATION_SECRET} + # SERVER_EVENT_INGESTER_BASIC_AUTH_USERNAME: ${SERVER_EVENT_INGESTER_BASIC_AUTH_USERNAME:-xatu} + # SERVER_EVENT_INGESTER_BASIC_AUTH_PASSWORD: ${SERVER_EVENT_INGESTER_BASIC_AUTH_PASSWORD:-example} + # SERVER_COORDINATOR_AUTH_SECRET: ${SERVER_COORDINATOR_AUTH_SECRET:-super_secret} volumes: - ./deploy/local/docker-compose/xatu-server.yaml:/etc/xatu-server/config.yaml networks: @@ -527,7 +529,27 @@ services: sed 's/\$$HOSTNAME/'"$$BASE_HOSTNAME"'/g' /tmp/nginx.conf > /etc/nginx/nginx.conf && nginx -g 'daemon off;' " - + xatu-cannon: + profiles: + - "cannon" + command: cannon --config /etc/cannon/config.yaml + container_name: xatu-cannon + hostname: xatu-cannon + build: + context: . + dockerfile: Dockerfile + environment: + # Default + CANNON_XATU_COORDINATOR_AUTHORIZATION: ${CANNON_XATU_COORDINATOR_AUTHORIZATION:-Bearer super_secret} + # Default of xatu:example + CANNON_XATU_OUTPUT_AUTHORIZATION: ${CANNON_XATU_OUTPUT_AUTHORIZATION:-Basic eGF0dTpleGFtcGxl} + # Default of http://localhost:5052 + CANNON_BEACON_NODE_URL: ${CANNON_BEACON_NODE_URL:-http://localhost:5052} + volumes: + - ./deploy/local/docker-compose/xatu-cannon.yaml:/etc/cannon/config.yaml + networks: + - xatu-net + networks: xatu-net: driver: bridge diff --git a/pkg/cannon/cannon.go b/pkg/cannon/cannon.go index 0d4be1fd..e4875fe6 100644 --- a/pkg/cannon/cannon.go +++ b/pkg/cannon/cannon.go @@ -27,6 +27,7 @@ import ( "github.com/ethpandaops/xatu/pkg/cannon/iterator" "github.com/ethpandaops/xatu/pkg/observability" "github.com/ethpandaops/xatu/pkg/output" + oxatu "github.com/ethpandaops/xatu/pkg/output/xatu" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/go-co-op/gocron" "github.com/google/uuid" @@ -58,9 +59,11 @@ type Cannon struct { coordinatorClient *coordinator.Client shutdownFuncs []func(ctx context.Context) error + + overrides *Override } -func New(ctx context.Context, log logrus.FieldLogger, config *Config) (*Cannon, error) { +func New(ctx context.Context, log logrus.FieldLogger, config *Config, overrides *Override) (*Cannon, error) { if config == nil { return nil, errors.New("config is required") } @@ -69,6 +72,12 @@ func New(ctx context.Context, log logrus.FieldLogger, config *Config) (*Cannon, return nil, err } + if overrides != nil { + if err := config.ApplyOverrides(overrides, log); err != nil { + return nil, fmt.Errorf("failed to apply overrides: %w", err) + } + } + sinks, err := config.CreateSinks(log) if err != nil { return nil, err @@ -96,9 +105,35 @@ func New(ctx context.Context, log logrus.FieldLogger, config *Config) (*Cannon, eventDerivers: nil, // Derivers are created once the beacon node is ready coordinatorClient: coordinatorClient, shutdownFuncs: []func(ctx context.Context) error{}, + overrides: overrides, }, nil } +func (c *Cannon) ApplyOverrideBeforeStartAfterCreation(ctx context.Context) error { + if c.overrides == nil { + return nil + } + + if c.overrides.XatuOutputAuth.Enabled { + c.log.Info("Overriding output authorization on xatu sinks") + + for _, sink := range c.sinks { + if sink.Type() == string(output.SinkTypeXatu) { + xatuSink, ok := sink.(*oxatu.Xatu) + if !ok { + return perrors.New("failed to assert xatu sink") + } + + c.log.WithField("sink_name", sink.Name()).Info("Overriding xatu output authorization") + + xatuSink.SetAuthorization(c.overrides.XatuOutputAuth.Value) + } + } + } + + return nil +} + func (c *Cannon) Start(ctx context.Context) error { // Start tracing if enabled if c.Config.Tracing.Enabled { @@ -159,6 +194,10 @@ func (c *Cannon) Start(ctx context.Context) error { } } + if err := c.ApplyOverrideBeforeStartAfterCreation(ctx); err != nil { + return fmt.Errorf("failed to apply overrides before start: %w", err) + } + if c.Config.Ethereum.OverrideNetworkName != "" { c.log.WithField("network", c.Config.Ethereum.OverrideNetworkName).Info("Overriding network name") } diff --git a/pkg/cannon/config.go b/pkg/cannon/config.go index bfaeb214..fe70acb5 100644 --- a/pkg/cannon/config.go +++ b/pkg/cannon/config.go @@ -99,3 +99,29 @@ func (c *Config) CreateSinks(log logrus.FieldLogger) ([]output.Sink, error) { return sinks, nil } + +func (c *Config) ApplyOverrides(o *Override, log logrus.FieldLogger) error { + if o == nil { + return nil + } + + if o.BeaconNodeURL.Enabled { + log.Info("Overriding beacon node URL") + + c.Ethereum.BeaconNodeAddress = o.BeaconNodeURL.Value + } + + if o.BeaconNodeAuthorizationHeader.Enabled { + log.Info("Overriding beacon node authorization header") + + c.Ethereum.BeaconNodeHeaders["Authorization"] = o.BeaconNodeAuthorizationHeader.Value + } + + if o.XatuCoordinatorAuth.Enabled { + log.Info("Overriding xatu coordinator authorization") + + c.Coordinator.Headers["Authorization"] = o.XatuCoordinatorAuth.Value + } + + return nil +} diff --git a/pkg/cannon/overrides.go b/pkg/cannon/overrides.go new file mode 100644 index 00000000..95aff6c3 --- /dev/null +++ b/pkg/cannon/overrides.go @@ -0,0 +1,20 @@ +package cannon + +type Override struct { + BeaconNodeURL struct { + Enabled bool + Value string + } + BeaconNodeAuthorizationHeader struct { + Enabled bool + Value string + } + XatuOutputAuth struct { + Enabled bool + Value string + } + XatuCoordinatorAuth struct { + Enabled bool + Value string + } +} diff --git a/pkg/output/xatu/config.go b/pkg/output/xatu/config.go index cf90b4ca..ff968383 100644 --- a/pkg/output/xatu/config.go +++ b/pkg/output/xatu/config.go @@ -6,17 +6,16 @@ import ( ) type Config struct { - Address string `yaml:"address"` - Headers map[string]string `yaml:"headers"` - TLS bool `yaml:"tls" default:"false"` - MaxQueueSize int `yaml:"maxQueueSize" default:"51200"` - BatchTimeout time.Duration `yaml:"batchTimeout" default:"5s"` - ExportTimeout time.Duration `yaml:"exportTimeout" default:"30s"` - MaxExportBatchSize int `yaml:"maxExportBatchSize" default:"512"` - Workers int `yaml:"workers" default:"1"` - Retry RetryConfig `yaml:"retry"` - AuthorizationSecret string `yaml:"authorizationSecret"` - KeepAlive KeepAliveConfig `yaml:"keepAlive"` + Address string `yaml:"address"` + Headers map[string]string `yaml:"headers"` + TLS bool `yaml:"tls" default:"false"` + MaxQueueSize int `yaml:"maxQueueSize" default:"51200"` + BatchTimeout time.Duration `yaml:"batchTimeout" default:"5s"` + ExportTimeout time.Duration `yaml:"exportTimeout" default:"30s"` + MaxExportBatchSize int `yaml:"maxExportBatchSize" default:"512"` + Workers int `yaml:"workers" default:"1"` + Retry RetryConfig `yaml:"retry"` + KeepAlive KeepAliveConfig `yaml:"keepAlive"` } type KeepAliveConfig struct { diff --git a/pkg/server/config.go b/pkg/server/config.go index e8d5c4e4..ce352a76 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -1,11 +1,15 @@ package server import ( + "fmt" + "github.com/ethpandaops/xatu/pkg/observability" "github.com/ethpandaops/xatu/pkg/server/geoip" "github.com/ethpandaops/xatu/pkg/server/persistence" "github.com/ethpandaops/xatu/pkg/server/service" + "github.com/ethpandaops/xatu/pkg/server/service/event-ingester/auth" "github.com/ethpandaops/xatu/pkg/server/store" + "github.com/sirupsen/logrus" ) type Config struct { @@ -65,3 +69,47 @@ func (c *Config) Validate() error { return nil } + +func (c *Config) ApplyOverrides(o *Override, log logrus.FieldLogger) error { + if o == nil { + return nil + } + + if o.EventIngesterBasicAuth.Username != "" || o.EventIngesterBasicAuth.Password != "" { + log.Info("Enabling event ingester basic authentication via override") + + if o.EventIngesterBasicAuth.Password == "" { + return fmt.Errorf("invalid basic auth override configuration: password is required") + } + + if o.EventIngesterBasicAuth.Username == "" { + return fmt.Errorf("invalid basic auth override configuration: username is required") + } + + // Event Ingester + c.Services.EventIngester.Authorization.Enabled = true + + c.Services.EventIngester.Authorization.Groups = make(auth.GroupsConfig) + groupName := "simple" + + c.Services.EventIngester.Authorization.Groups[groupName] = auth.GroupConfig{ + Users: auth.UsersConfig{}, + } + + c.Services.EventIngester.Authorization.Groups[groupName].Users[o.EventIngesterBasicAuth.Username] = auth.UserConfig{ + Password: o.EventIngesterBasicAuth.Password, + } + } + + if o.CoordinatorAuth.Enabled { + log.Info("Enabling coordinator authentication via override") + + // Coordinator + enabled := true + c.Services.Coordinator.Auth.Enabled = &enabled + + c.Services.Coordinator.Auth.Secret = o.CoordinatorAuth.AuthSecret + } + + return nil +} diff --git a/pkg/server/overrides.go b/pkg/server/overrides.go new file mode 100644 index 00000000..bb8b7f67 --- /dev/null +++ b/pkg/server/overrides.go @@ -0,0 +1,14 @@ +package server + +type Override struct { + // EventIngesterBasicAuth configures basic authentication for the event ingester. + EventIngesterBasicAuth struct { + Username string + Password string + } + // CoordinatorAuth configures the authentication for the coordinator. + CoordinatorAuth struct { + Enabled bool + AuthSecret string + } +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 06e2e579..b06d4db7 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -48,14 +48,22 @@ type Xatu struct { clockDrift *time.Duration + overrides *Override + shutdownFuncs []func(ctx context.Context) error } -func NewXatu(ctx context.Context, log logrus.FieldLogger, conf *Config) (*Xatu, error) { +func NewXatu(ctx context.Context, log logrus.FieldLogger, conf *Config, o *Override) (*Xatu, error) { if err := conf.Validate(); err != nil { return nil, err } + if o != nil { + if err := conf.ApplyOverrides(o, log); err != nil { + return nil, fmt.Errorf("failed to apply overrides: %w", err) + } + } + clockDrift := time.Duration(0) var p *persistence.Client @@ -96,6 +104,7 @@ func NewXatu(ctx context.Context, log logrus.FieldLogger, conf *Config) (*Xatu, services: services, clockDrift: &clockDrift, shutdownFuncs: []func(ctx context.Context) error{}, + overrides: o, }, nil } diff --git a/pkg/server/service/event-ingester/ingester.go b/pkg/server/service/event-ingester/ingester.go index 9e002e81..81cd156c 100644 --- a/pkg/server/service/event-ingester/ingester.go +++ b/pkg/server/service/event-ingester/ingester.go @@ -34,13 +34,15 @@ type Ingester struct { } func NewIngester(ctx context.Context, log logrus.FieldLogger, conf *Config, clockDrift *time.Duration, geoipProvider geoip.Provider, cache store.Cache) (*Ingester, error) { + log = log.WithField("server/module", ServiceType) + a, err := auth.NewAuthorization(log, conf.Authorization) if err != nil { return nil, err } e := &Ingester{ - log: log.WithField("server/module", ServiceType), + log: log, config: conf, auth: a, handler: NewHandler(log, clockDrift, geoipProvider, cache, conf.ClientNameSalt),