Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote SpiceDB Support #24

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ proxy.key
*__failpoint_*.go
*.go__failpoint*
*.sqlite
magefiles/mage_output_file.go
2 changes: 2 additions & 0 deletions deploy/proxy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ spec:
containers:
- args:
- --secure-port=8443
- --spicedb-endpoint
- embedded://
- --backend-kubeconfig
- /opt/proxy/kubeconfig
- --cert-dir
Expand Down
1 change: 1 addition & 0 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
opts := proxy.NewOptions()
opts.BackendConfig = backendCfg
opts.SecureServing.BindPort = port
opts.SpiceDBEndpoint = proxy.EmbeddedSpiceDBEndpoint
opts.SecureServing.BindAddress = net.ParseIP("127.0.0.1")
opts.Authentication.BuiltInOptions.ClientCert.ClientCA = clientCA.Path()
Expect(opts.Complete(ctx)).To(Succeed())
Expand Down
2 changes: 1 addition & 1 deletion e2e/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

// GetAllTuples collects all tuples matching the filter from SpiceDB
func GetAllTuples(ctx context.Context, filter *v1.RelationshipFilter) []*v1.ReadRelationshipsResponse {
client, err := proxySrv.SpiceDBClient.ReadRelationships(ctx, &v1.ReadRelationshipsRequest{
client, err := proxySrv.PermissionClient().ReadRelationships(ctx, &v1.ReadRelationshipsRequest{
Consistency: &v1.Consistency{Requirement: &v1.Consistency_FullyConsistent{FullyConsistent: true}},
RelationshipFilter: filter,
})
Expand Down
22 changes: 16 additions & 6 deletions pkg/proxy/authz.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,15 @@ import (
"k8s.io/apiserver/pkg/endpoints/request"
)

func (s *Server) WithAuthorization(handler, failed http.Handler, watchClient v1.WatchServiceClient, taskHubClient backend.TaskHubClient) http.Handler {
func WithAuthorization(handler, failed http.Handler, permissionsClient v1.PermissionsServiceClient, watchClient v1.WatchServiceClient, taskHubClient backend.TaskHubClient, lockMode *string) (http.Handler, error) {
if *lockMode == "" {
return nil, fmt.Errorf("lock mode is undefined")
}

if !(*lockMode == PessimisticWriteToSpiceDBAndKube || *lockMode == OptimisticWriteToSpiceDBAndKube) {
return nil, fmt.Errorf("unexpected lock mode: %s", *lockMode)
}

return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -67,7 +75,7 @@ func (s *Server) WithAuthorization(handler, failed http.Handler, watchClient v1.
return
}

id, err := taskHubClient.ScheduleNewOrchestration(ctx, s.LockMode, api.WithInput(CreateObjInput{
id, err := taskHubClient.ScheduleNewOrchestration(ctx, *lockMode, api.WithInput(CreateObjInput{
RequestInfo: requestInfo,
UserInfo: userInfo.(*user.DefaultInfo),
ObjectMeta: &pom.ObjectMeta,
Expand Down Expand Up @@ -130,7 +138,7 @@ func (s *Server) WithAuthorization(handler, failed http.Handler, watchClient v1.
requestInfo.APIGroup == "" &&
requestInfo.Name != "" {
go func() {
cr, err := s.SpiceDBClient.CheckPermission(ctx, &v1.CheckPermissionRequest{
cr, err := permissionsClient.CheckPermission(ctx, &v1.CheckPermissionRequest{
Consistency: &v1.Consistency{
Requirement: &v1.Consistency_MinimizeLatency{MinimizeLatency: true},
},
Expand Down Expand Up @@ -173,7 +181,7 @@ func (s *Server) WithAuthorization(handler, failed http.Handler, watchClient v1.
requestInfo.APIGroup == "" {

go func() {
lr, err := s.SpiceDBClient.LookupResources(ctx, &v1.LookupResourcesRequest{
lr, err := permissionsClient.LookupResources(ctx, &v1.LookupResourcesRequest{
Consistency: &v1.Consistency{
Requirement: &v1.Consistency_MinimizeLatency{MinimizeLatency: true},
},
Expand Down Expand Up @@ -246,7 +254,7 @@ func (s *Server) WithAuthorization(handler, failed http.Handler, watchClient v1.
for _, u := range resp.Updates {
if u.Operation == v1.RelationshipUpdate_OPERATION_TOUCH || u.Operation == v1.RelationshipUpdate_OPERATION_CREATE {
// do a check
cr, err := s.SpiceDBClient.CheckPermission(ctx, &v1.CheckPermissionRequest{
cr, err := permissionsClient.CheckPermission(ctx, &v1.CheckPermissionRequest{
Consistency: &v1.Consistency{
Requirement: &v1.Consistency_FullyConsistent{FullyConsistent: true},
// TODO
Expand Down Expand Up @@ -284,7 +292,7 @@ func (s *Server) WithAuthorization(handler, failed http.Handler, watchClient v1.
req = req.WithContext(WithAuthzData(req.Context(), &authzData))

handler.ServeHTTP(w, req)
})
}), nil
}

type requestAuthzData int
Expand Down Expand Up @@ -420,6 +428,8 @@ func (d *AuthzData) FilterResp(resp *http.Response) error {
filtered, err = d.FilterObject(pom.ToPartialObjectMetadata(), body)
}

// FIXME we are not returning the err here, and if added, no e2e test passes because "FilterObject"
// returns "unauthorized" error
if err != nil {
fmt.Println(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/proxy/durable_activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ func (s *Server) WriteToSpiceDB(ctx task.ActivityContext) (any, error) {
return nil, err
}
failpoints.FailPoint("panicWriteSpiceDB")
out, err := s.SpiceDBClient.WriteRelationships(ctx.Context(), &req)
out, err := s.PermissionClient().WriteRelationships(ctx.Context(), &req)
failpoints.FailPoint("panicSpiceDBReadResp")
return out, err
}

// WriteToKube
// WriteToKube peforms a Kube API Server POST, specified in a KubeReqInput propagated via the task.ActivityContext arg
func (s *Server) WriteToKube(ctx task.ActivityContext) (any, error) {
var req KubeReqInput
if err := ctx.GetInput(&req); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/proxy/durable_workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
lockRelationName = "workflow"
workflowResourceType = "workflow"
MaxKubeAttempts = 5
DefaultLockMode = PessimisticWriteToSpiceDBAndKube
OptimisticWriteToSpiceDBAndKube = "OptimisticWriteToSpiceDBAndKube"
PessimisticWriteToSpiceDBAndKube = "PessimisticWriteToSpiceDBAndKube"
WriteToSpiceDB = "WriteToSpiceDBActivity"
Expand Down
90 changes: 82 additions & 8 deletions pkg/proxy/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,36 @@ package proxy
import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"time"

v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
"github.com/authzed/grpcutil"
"github.com/authzed/spicedb/pkg/cmd/server"
"github.com/spf13/pflag"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials/insecure"
genericapiserver "k8s.io/apiserver/pkg/server"
apiserveroptions "k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/component-base/logs"
logsv1 "k8s.io/component-base/logs/api/v1"

apiserveroptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/klog/v2"

"github.com/authzed/spicedb-kubeapi-proxy/pkg/spicedb"
)

const (
defaultDurableTaskDatabasePath = "/tmp/dtx.sqlite"
EmbeddedSpiceDBEndpoint = "embedded://"
)

type Options struct {
SecureServing apiserveroptions.SecureServingOptionsWithLoopback
Authentication Authentication
Expand All @@ -35,8 +47,16 @@ type Options struct {
ServingInfo *genericapiserver.SecureServingInfo
AdditionalAuthEnabled bool

SpicedbServer server.RunnableServer
SpiceDBClient any
WatchClient v1.WatchServiceClient
PermissionsClient v1.PermissionsServiceClient
SpiceDBEndpoint string
EmbeddedSpiceDB server.RunnableServer
insecure bool
skipVerifyCA bool
token string

DurableTaskDatabasePath string
LockMode string
}

func NewOptions() *Options {
Expand All @@ -55,7 +75,12 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
o.SecureServing.AddFlags(fs)
o.Authentication.AddFlags(fs)
logsv1.AddFlags(o.Logs, fs)
fs.StringVar(&o.DurableTaskDatabasePath, "durabletask-database-path", defaultDurableTaskDatabasePath, "Path for the file representing the SQLite database used for the durable task engine.")
fs.StringVar(&o.BackendKubeconfigPath, "backend-kubeconfig", o.BackendKubeconfigPath, "The path to the kubeconfig to proxy connections to. It should authenticate the user with cluster-admin permission.")
fs.StringVar(&o.SpiceDBEndpoint, "spicedb-endpoint", "localhost:50051", "Defines the endpoint endpoint to the SpiceDB authorizing proxy operations. if embedded:// is specified, an in memory ephemeral instance created.")
fs.BoolVar(&o.insecure, "spicedb-insecure", false, "If set to true uses the insecure transport configuration for gRPC. Set to false by default.")
fs.BoolVar(&o.skipVerifyCA, "spicedb-skip-verify-ca", false, "If set to true backend certificate trust chain is not verified. Set to false by default.")
fs.StringVar(&o.token, "spicedb-token", "", "specifies the preshared key to use with the remote SpiceDB")
}

func (o *Options) Complete(ctx context.Context) error {
Expand All @@ -69,14 +94,15 @@ func (o *Options) Complete(ctx context.Context) error {
if !filepath.IsAbs(o.BackendKubeconfigPath) {
pwd, err := os.Getwd()
if err != nil {
return err
return fmt.Errorf("couldn't load kubeconfig: %w", err)
}
o.BackendKubeconfigPath = filepath.Join(pwd, o.BackendKubeconfigPath)
}
o.BackendConfig, err = clientcmd.LoadFromFile(o.BackendKubeconfigPath)
if err != nil {
return err
return fmt.Errorf("couldn't load kubeconfig: %w", err)
}
klog.FromContext(ctx).WithValues("kubeconfig", o.BackendKubeconfigPath).Error(err, "loaded backend kube config")
}

if !filepath.IsAbs(o.SecureServing.ServerCert.CertDirectory) {
Expand All @@ -97,11 +123,59 @@ func (o *Options) Complete(ctx context.Context) error {

o.AdditionalAuthEnabled = o.Authentication.AdditionalAuthEnabled()

o.SpicedbServer, err = spicedb.NewServer(ctx)
spicedbURl, err := url.Parse(o.SpiceDBEndpoint)
if err != nil {
return err
return fmt.Errorf("unable to parse SpiceDB endpoint URL: %w", err)
}

var conn *grpc.ClientConn
if spicedbURl.Scheme == "embedded" {
klog.FromContext(ctx).WithValues("spicedb-endpoint", o.SpiceDBEndpoint).Info("using embedded SpiceDB")
o.EmbeddedSpiceDB, err = spicedb.NewServer(ctx)
if err != nil {
return fmt.Errorf("unable to stand up embedded SpiceDB: %w", err)
}

conn, err = o.EmbeddedSpiceDB.GRPCDialContext(ctx, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("unable to open gRPC connection with embedded SpiceDB: %w", err)
}
} else {
klog.FromContext(ctx).WithValues("spicedb-endpoint", o.SpiceDBEndpoint).
WithValues("spicedb-insecure", o.insecure).
WithValues("spicedb-skip-verify-ca", o.skipVerifyCA).
Info("using remote SpiceDB")
var opts []grpc.DialOption
if o.insecure {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
opts = append(opts, grpcutil.WithInsecureBearerToken(o.token))
} else {
opts = append(opts, grpcutil.WithBearerToken(o.token))
verification := grpcutil.VerifyCA
if o.skipVerifyCA {
verification = grpcutil.SkipVerifyCA
}

certs, err := grpcutil.WithSystemCerts(verification)
if err != nil {
return fmt.Errorf("unable to load system certificates: %w", err)
}

opts = append(opts, certs)
}
opts = append(opts, grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoff.DefaultConfig}))

timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
conn, err = grpc.DialContext(timeoutCtx, o.SpiceDBEndpoint, opts...)
if err != nil {
return fmt.Errorf("unable to open gRPC connection to remote SpiceDB at %s: %w", o.SpiceDBEndpoint, err)
}
}

o.PermissionsClient = v1.NewPermissionsServiceClient(conn)
o.WatchClient = v1.NewWatchServiceClient(conn)

return nil
}

Expand Down
Loading
Loading