Skip to content

Commit

Permalink
implement a locking version of the dual write workflow
Browse files Browse the repository at this point in the history
this also removes the failpoint library in favor of a quick local
version that doesn't require transforming the codebase.
  • Loading branch information
ecordell committed Sep 1, 2023
1 parent 3be6842 commit e9c8ce0
Show file tree
Hide file tree
Showing 12 changed files with 634 additions and 336 deletions.
5 changes: 3 additions & 2 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ import (
)

var (
testEnv *envtest.Environment
testEnv *envtest.Environment
proxySrv *proxy.Server

// adminUser is configured for the un-proxied apiserver
adminUser *envtest.AuthenticatedUser
Expand Down Expand Up @@ -107,7 +108,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
opts.SecureServing.BindAddress = net.ParseIP("127.0.0.1")
opts.Authentication.BuiltInOptions.ClientCert.ClientCA = clientCA.Path()
Expect(opts.Complete(ctx)).To(Succeed())
proxySrv, err := proxy.NewServer(ctx, *opts)
proxySrv, err = proxy.NewServer(ctx, *opts)
Expect(err).To(Succeed())

ctx, cancel := context.WithCancel(context.Background())
Expand Down
285 changes: 187 additions & 98 deletions e2e/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,25 @@ package e2e

import (
"context"
"fmt"
"sync"

v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/pingcap/failpoint"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"

"github.com/authzed/spicedb-kubeapi-proxy/pkg/failpoints"
"github.com/authzed/spicedb-kubeapi-proxy/pkg/proxy"
)

var _ = Describe("Proxy", func() {
Describe("with two users", func() {
When("there are two users", func() {
var paulClient, chaniClient, adminClient kubernetes.Interface
var paulNamespace, chaniNamespace string

Expand All @@ -46,125 +49,211 @@ var _ = Describe("Proxy", func() {
})

AfterEach(func(ctx context.Context) {
_ = adminClient.CoreV1().Namespaces().Delete(ctx, paulNamespace, metav1.DeleteOptions{})
_ = adminClient.CoreV1().Namespaces().Delete(ctx, chaniNamespace, metav1.DeleteOptions{})
})
orphan := metav1.DeletePropagationOrphan
_ = adminClient.CoreV1().Namespaces().Delete(ctx, paulNamespace, metav1.DeleteOptions{PropagationPolicy: &orphan})
_ = adminClient.CoreV1().Namespaces().Delete(ctx, chaniNamespace, metav1.DeleteOptions{PropagationPolicy: &orphan})

It("doesn't show users namespaces the other has created", func(ctx context.Context) {
// not created yet, neither can access
_, err := paulClient.CoreV1().Namespaces().Get(ctx, paulNamespace, metav1.GetOptions{})
Expect(k8serrors.IsNotFound(err)).To(BeTrue())
_, err = chaniClient.CoreV1().Namespaces().Get(ctx, chaniNamespace, metav1.GetOptions{})
Expect(k8serrors.IsNotFound(err)).To(BeTrue())
// ensure there are no remaining locks
Expect(len(GetAllTuples(ctx, &v1.RelationshipFilter{
ResourceType: "lock",
OptionalRelation: "workflow",
OptionalSubjectFilter: &v1.SubjectFilter{SubjectType: "workflow"},
}))).To(BeZero())
})

// each creates their respective namespace
_, err = paulClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: paulNamespace},
CreateNamespace := func(ctx context.Context, client kubernetes.Interface, namespace string) error {
_, err := client.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: namespace},
}, metav1.CreateOptions{})
return err
}
GetNamespace := func(ctx context.Context, client kubernetes.Interface, namespace string) error {
_, err := client.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{})
return err
}
ListNamespaces := func(ctx context.Context, client kubernetes.Interface) []string {
visibleNamespaces, err := client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
Expect(err).To(Succeed())
return lo.Map(visibleNamespaces.Items, func(item corev1.Namespace, index int) string {
return item.Name
})
}

_, err = chaniClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: chaniNamespace},
}, metav1.CreateOptions{})
Expect(err).To(Succeed())
JustBeforeEach(func(ctx context.Context) {
// before every test, assert no access
Expect(k8serrors.IsNotFound(GetNamespace(ctx, paulClient, paulNamespace))).To(BeTrue())
Expect(k8serrors.IsNotFound(GetNamespace(ctx, paulClient, chaniNamespace))).To(BeTrue())
Expect(k8serrors.IsNotFound(GetNamespace(ctx, chaniClient, paulNamespace))).To(BeTrue())
Expect(k8serrors.IsNotFound(GetNamespace(ctx, chaniClient, chaniNamespace))).To(BeTrue())
})

// each can get their respective namespace
_, err = paulClient.CoreV1().Namespaces().Get(ctx, paulNamespace, metav1.GetOptions{})
Expect(err).To(Succeed())
_, err = chaniClient.CoreV1().Namespaces().Get(ctx, chaniNamespace, metav1.GetOptions{})
Expect(err).To(Succeed())
AssertDualWriteBehavior := func() {
It("doesn't show users namespaces the other has created", func(ctx context.Context) {
// each creates their respective namespace
Expect(CreateNamespace(ctx, paulClient, paulNamespace)).To(Succeed())
Expect(CreateNamespace(ctx, chaniClient, chaniNamespace)).To(Succeed())

// neither can get each other's namespace
out, err := paulClient.CoreV1().Namespaces().Get(ctx, chaniNamespace, metav1.GetOptions{})
fmt.Println(out)
Expect(k8serrors.IsNotFound(err)).To(BeTrue())
_, err = chaniClient.CoreV1().Namespaces().Get(ctx, paulNamespace, metav1.GetOptions{})
Expect(k8serrors.IsNotFound(err)).To(BeTrue())
// each can get their respective namespace
Expect(GetNamespace(ctx, paulClient, paulNamespace)).To(Succeed())
Expect(GetNamespace(ctx, chaniClient, chaniNamespace)).To(Succeed())

// neither can see each other's namespace in the list
paulVisibleNamespaces, err := paulClient.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
Expect(err).To(Succeed())
paulVisibleNamespaceNames := lo.Map(paulVisibleNamespaces.Items, func(item corev1.Namespace, index int) string {
return item.Name
// neither can get each other's namespace
Expect(k8serrors.IsNotFound(GetNamespace(ctx, paulClient, chaniNamespace))).To(BeTrue())
Expect(k8serrors.IsNotFound(GetNamespace(ctx, chaniClient, paulNamespace))).To(BeTrue())

// neither can see each other's namespace in the list
paulList := ListNamespaces(ctx, paulClient)
chaniList := ListNamespaces(ctx, chaniClient)
Expect(paulList).ToNot(ContainElement(chaniNamespace))
Expect(paulList).To(ContainElement(paulNamespace))
Expect(chaniList).ToNot(ContainElement(paulNamespace))
Expect(chaniList).To(ContainElement(chaniNamespace))
})
Expect(paulVisibleNamespaceNames).To(ContainElement(paulNamespace))

chaniVisibleNamespaces, err := chaniClient.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
Expect(err).To(Succeed())
chaniVisibleNamespaceNames := lo.Map(chaniVisibleNamespaces.Items, func(item corev1.Namespace, index int) string {
return item.Name
It("cleans up dual writes on kube failures", func(ctx context.Context) {
// paul creates his namespace
Expect(CreateNamespace(ctx, paulClient, paulNamespace)).To(Succeed())

// make kube write fail for chani's namespace, spicedb write will have
// succeeded
if proxySrv.LockMode == proxy.LockingWriteToSpiceDBAndKube {
// the locking version retries if the connection fails
failpoints.EnableFailPoint("panicKubeWrite", proxy.MaxKubeAttempts+1)
} else {
failpoints.EnableFailPoint("panicKubeWrite", 1)
}
Expect(CreateNamespace(ctx, chaniClient, chaniNamespace)).ToNot(BeNil())

// paul creates chani's namespace
Expect(CreateNamespace(ctx, paulClient, chaniNamespace)).To(Succeed())

// paul can get get both namespaces
Expect(GetNamespace(ctx, paulClient, paulNamespace)).To(Succeed())
Expect(GetNamespace(ctx, paulClient, chaniNamespace)).To(Succeed())

// chani can't get her namespace - this indicates the spicedb write was rolled back
// from the failed dual write above
Expect(k8serrors.IsNotFound(GetNamespace(ctx, chaniClient, chaniNamespace))).To(BeTrue())
})
Expect(chaniVisibleNamespaceNames).To(ContainElement(chaniNamespace))
})

It("cleans up dual writes on errors", func(ctx context.Context) {
// not created yet, neither can access
_, err := paulClient.CoreV1().Namespaces().Get(ctx, paulNamespace, metav1.GetOptions{})
Expect(k8serrors.IsNotFound(err)).To(BeTrue())
_, err = chaniClient.CoreV1().Namespaces().Get(ctx, chaniNamespace, metav1.GetOptions{})
Expect(k8serrors.IsNotFound(err)).To(BeTrue())
It("recovers dual writes when kube write succeeds but crashes", func(ctx context.Context) {
// paul creates his namespace
Expect(CreateNamespace(ctx, paulClient, paulNamespace)).To(Succeed())

// paul creates his namespace
_, err = paulClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: paulNamespace},
}, metav1.CreateOptions{})
Expect(err).To(Succeed())
// make kube write succeed, but crash process before it can be recorded
failpoints.EnableFailPoint("panicKubeReadResp", 1)
Expect(CreateNamespace(ctx, chaniClient, chaniNamespace)).ToNot(BeNil())

// make kube write fail
err = failpoint.Enable("github.com/authzed/spicedb-kubeapi-proxy/pkg/proxy/panicKubeWrite", "panic")
Expect(err).To(Succeed())
// Chani can get her namespace - the workflow has resolve the write
// Pessimistic locking retried the kube request and got an "already exists" err
// Optimistic locking checked kube and saw that the object already existed
Expect(GetNamespace(ctx, chaniClient, chaniNamespace)).To(Succeed())
})

_, err = chaniClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: chaniNamespace},
}, metav1.CreateOptions{})
Expect(err).ToNot(BeNil())
It("recovers dual writes when spicedb write failures", func(ctx context.Context) {
// paul creates his namespace
Expect(CreateNamespace(ctx, paulClient, paulNamespace)).To(Succeed())

// paul creates chani's namespace
Expect(failpoint.Disable("github.com/authzed/spicedb-kubeapi-proxy/pkg/proxy/panicKubeWrite")).To(Succeed())
_, err = paulClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: chaniNamespace},
}, metav1.CreateOptions{})
Expect(err).To(Succeed())
// make spicedb write crash on chani's namespace write
failpoints.EnableFailPoint("panicWriteSpiceDB", 1)
Expect(CreateNamespace(ctx, chaniClient, chaniNamespace)).ToNot(BeNil())

// paul can get get both namespaces
_, err = paulClient.CoreV1().Namespaces().Get(ctx, paulNamespace, metav1.GetOptions{})
Expect(err).To(Succeed())
_, err = paulClient.CoreV1().Namespaces().Get(ctx, chaniNamespace, metav1.GetOptions{})
Expect(err).To(Succeed())
// paul creates chani's namespace so that the namespace exists
Expect(CreateNamespace(ctx, paulClient, chaniNamespace)).To(Succeed())

// check that chani can't get her namespace, indirectly showing
// that the spicedb write was rolled back
Expect(k8serrors.IsNotFound(GetNamespace(ctx, chaniClient, chaniNamespace))).To(BeTrue())

// chani can't get her namespace - this indicates the spicedb write was rolled back
// from the failed dual write above
_, err = chaniClient.CoreV1().Namespaces().Get(ctx, chaniNamespace, metav1.GetOptions{})
Expect(k8serrors.IsNotFound(err)).To(BeTrue())
})
// confirm the relationship doesn't exist
Expect(len(GetAllTuples(ctx, &v1.RelationshipFilter{
ResourceType: "namespace",
OptionalResourceId: chaniNamespace,
OptionalRelation: "creator",
OptionalSubjectFilter: &v1.SubjectFilter{SubjectType: "user", OptionalSubjectId: "chani"},
}))).To(BeZero())
})

It("recovers dual writes when it crashes", func(ctx context.Context) {
// not created yet, neither can access
_, err := paulClient.CoreV1().Namespaces().Get(ctx, paulNamespace, metav1.GetOptions{})
Expect(k8serrors.IsNotFound(err)).To(BeTrue())
_, err = chaniClient.CoreV1().Namespaces().Get(ctx, chaniNamespace, metav1.GetOptions{})
Expect(k8serrors.IsNotFound(err)).To(BeTrue())
It("recovers dual writes when spicedb write succeeds but crashes", func(ctx context.Context) {
// paul creates his namespace
Expect(CreateNamespace(ctx, paulClient, paulNamespace)).To(Succeed())

// paul creates his namespace
_, err = paulClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: paulNamespace},
}, metav1.CreateOptions{})
Expect(err).To(Succeed())
// make spicedb write crash on chani's namespace write
failpoints.EnableFailPoint("panicSpiceDBReadResp", 1)
err := CreateNamespace(ctx, chaniClient, chaniNamespace)
Expect(err).ToNot(BeNil())
// pessimistic locking reports a conflict, optimistic locking reports already exists
Expect(k8serrors.IsConflict(err) || k8serrors.IsAlreadyExists(err)).To(BeTrue())

// make kube write succeed, but crash process before it can be recorded
err = failpoint.Enable("github.com/authzed/spicedb-kubeapi-proxy/pkg/proxy/panicKubeReadResp", "panic")
Expect(err).To(Succeed())
// paul creates chani's namespace so that the namespace exists
Expect(CreateNamespace(ctx, paulClient, chaniNamespace)).To(Succeed())

_, err = chaniClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: chaniNamespace},
}, metav1.CreateOptions{})
Expect(err).ToNot(BeNil())
// check that chani can't get her namespace, indirectly showing
// that the spicedb write was rolled back
Expect(k8serrors.IsNotFound(GetNamespace(ctx, chaniClient, chaniNamespace))).To(BeTrue())

Expect(failpoint.Disable("github.com/authzed/spicedb-kubeapi-proxy/pkg/proxy/panicKubeReadResp")).To(Succeed())
// confirm the relationship doesn't exist
Expect(len(GetAllTuples(ctx, &v1.RelationshipFilter{
ResourceType: "namespace",
OptionalResourceId: chaniNamespace,
OptionalRelation: "creator",
OptionalSubjectFilter: &v1.SubjectFilter{SubjectType: "user", OptionalSubjectId: "chani"},
}))).To(BeZero())
})

// Chani can get her namespace
_, err = chaniClient.CoreV1().Namespaces().Get(ctx, chaniNamespace, metav1.GetOptions{})
Expect(err).To(Succeed())
It("ensures only one write at a time happens for a given object", func(ctx context.Context) {
// both attempt to create the namespace
start := make(chan struct{})
errs := make(chan error, 2)

var wg sync.WaitGroup
wg.Add(2)

// in theory, these two requests could be run serially, but in
// practice they seem to always actually run in parallel as
// intended.
go func() {
defer GinkgoRecover()
<-start
errs <- CreateNamespace(ctx, paulClient, paulNamespace)
wg.Done()
}()
go func() {
defer GinkgoRecover()
<-start
errs <- CreateNamespace(ctx, chaniClient, paulNamespace)
wg.Done()
}()
start <- struct{}{}
start <- struct{}{}
wg.Wait()
close(errs)

allErrs := make([]error, 0)
for err := range errs {
if err != nil {
allErrs = append(allErrs, err)
}
}
Expect(len(allErrs)).ToNot(BeZero())
Expect(k8serrors.IsConflict(allErrs[0]) || // pessimistic lock
k8serrors.IsAlreadyExists(allErrs[0]), // optimistic lock
).To(BeTrue())
})
}

When("optimistic locking is used", func() {
BeforeEach(func() {
proxySrv.LockMode = proxy.OptimisticWriteToSpiceDBAndKube
})
AssertDualWriteBehavior()
})

When("pessimistic locking is used", func() {
BeforeEach(func() {
proxySrv.LockMode = proxy.LockingWriteToSpiceDBAndKube
})
AssertDualWriteBehavior()
})
})
})
36 changes: 36 additions & 0 deletions e2e/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
//go:build e2e

package e2e

import (
"context"
"io"

v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
"github.com/authzed/spicedb/pkg/tuple"
. "github.com/onsi/gomega"
"github.com/samber/lo"
)

// GetAllTuples collects all tuples matching the filter from SpiceDB
func GetAllTuples(ctx context.Context, filter *v1.RelationshipFilter) []*v1.ReadRelationshipsResponse {
client, err := proxySrv.SpiceDBClient.ReadRelationships(ctx, &v1.ReadRelationshipsRequest{
Consistency: &v1.Consistency{Requirement: &v1.Consistency_FullyConsistent{FullyConsistent: true}},
RelationshipFilter: filter,
})
Expect(err).To(Succeed())
results := make([]*v1.ReadRelationshipsResponse, 0)
for resp, err := client.Recv(); err != io.EOF; resp, err = client.Recv() {
Expect(err).To(Succeed())
results = append(results, resp)
}
return results
}

// RelRespToStrings converts a slice of *v1.ReadRelationshipsResponse to a slice
// of tuple strings.
func RelRespToStrings(relResps []*v1.ReadRelationshipsResponse) []string {
return lo.Map(relResps, func(item *v1.ReadRelationshipsResponse, _ int) string {
return tuple.MustRelString(item.Relationship)
})
}
Loading

0 comments on commit e9c8ce0

Please sign in to comment.