Skip to content

Commit

Permalink
[WIP] refactor common attributes and journalling in struct groupObject
Browse files Browse the repository at this point in the history
  • Loading branch information
nixpanic committed Mar 18, 2024
1 parent e9cb542 commit 65c17e7
Show file tree
Hide file tree
Showing 7 changed files with 330 additions and 270 deletions.
14 changes: 6 additions & 8 deletions internal/rbd/group_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func getVolumesForGroup(ctx context.Context, volumeIDs []string, secrets map[str
for i, id := range volumeIDs {
volume, err := GenVolFromVolID(ctx, id, creds, secrets)
if err != nil {
return nil, err
return nil, err
}

volumes[i] = volume
Expand Down Expand Up @@ -142,11 +142,6 @@ func (cs *ControllerServer) CreateVolumeGroupSnapshot(ctx context.Context, req *
// 5. remove all rbd-images from the RBDVolumeGroup
// 6. return the RBDVolumeGroup-name and list of snapshots

config, err := getCephConfig(ctx, req.GetParameters(), req.GetSecrets())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

volumes, err := getVolumesForGroup(ctx, req.GetSourceVolumeIds(), req.GetSecrets())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
Expand All @@ -155,6 +150,11 @@ func (cs *ControllerServer) CreateVolumeGroupSnapshot(ctx context.Context, req *
defer v.Destroy()
}

config, err := getCephConfig(ctx, req.GetParameters(), req.GetSecrets())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

group, err := initVolumeGroup(ctx, config, req.GetName(), req.GetSecrets())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
Expand Down Expand Up @@ -219,8 +219,6 @@ func (cs *ControllerServer) DeleteVolumeGroupSnapshot(ctx context.Context, req *
return &csi.DeleteVolumeGroupSnapshotResponse{}, nil
}

// TODO
// sortof optional, only used for static/pre-provisioned VolumeGroupSnapshots
func (cs *ControllerServer) GetVolumeGroupSnapshot(ctx context.Context, req *csi.GetVolumeGroupSnapshotRequest) (*csi.GetVolumeGroupSnapshotResponse, error) {
snapshot, err := rbd_group.GetVolumeGroupSnapshot(ctx, req.GetGroupSnapshotId(), req.GetSecrets())
if err != nil {
Expand Down
167 changes: 167 additions & 0 deletions internal/rbd_group/journal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
Copyright 2024 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package rbd_group

import (
"context"
"errors"

"github.com/ceph/go-ceph/rados"

"github.com/ceph/ceph-csi/internal/journal"
"github.com/ceph/ceph-csi/internal/util"
)

type groupObject struct {
clusterID string

credentials *util.Credentials
secrets map[string]string
monitors string
pool string

// temporary connection attributes
conn *util.ClusterConnection
ioctx *rados.IOContext

// journalling related attributes
journal journal.VolumeGroupJournal
journalPool string

// id is a unique value for this volume group in the Ceph cluster, it
// is used to find the group in the journal.
id string

// name is used in RBD API calls as the name of this object
name string
}

func (obj *groupObject) Destroy(ctx context.Context) {
if obj.journal != nil {
obj.journal.Destroy()
obj.journal = nil
}

if obj.credentials != nil {
obj.credentials.DeleteCredentials()
obj.credentials = nil
}
}

func (obj *groupObject) resolveByID(ctx context.Context, id string, secrets map[string]string) error {
csiID := util.CSIIdentifier{}

err := csiID.DecomposeCSIID(id)
if err != nil {
return err
}

mons, _, err := util.GetMonsAndClusterID(ctx, csiID.ClusterID, false)
if err != nil {
return err
}

namespace, err := util.GetRadosNamespace(util.CsiConfigFile, csiID.ClusterID)
if err != nil {
return err
}

obj.clusterID = csiID.ClusterID
obj.monitors = mons
obj.secrets = secrets
obj.id = id

obj.credentials, err = util.NewUserCredentials(secrets)
if err != nil {
return err
}
defer func() {
if err != nil {
obj.Destroy(ctx)
}
}()

pool, err := util.GetPoolName(mons, obj.credentials, csiID.LocationID)
if err != nil {
return err
}

err = obj.SetJournalNamespace(ctx, pool, namespace)
if err != nil {
return err
}

return nil
}

// SetMonitors connects to the Ceph cluster.
func (obj *groupObject) SetMonitors(ctx context.Context, monitors string) error {
conn := &util.ClusterConnection{}
err := conn.Connect(monitors, obj.credentials)
if err != nil {
return err
}

obj.conn = conn
obj.monitors = monitors

return nil
}

// SetPool uses the connection to the Ceph cluster to create an IOContext to
// the pool.
func (obj *groupObject) SetPool(ctx context.Context, pool string) error {
if obj.conn == nil {
return ErrRBDGroupNotConnected
}

ioctx, err := obj.conn.GetIoctx(pool)
if err != nil {
return err
}

obj.pool = pool
obj.ioctx = ioctx

return nil
}

func (obj *groupObject) SetJournalNamespace(ctx context.Context, pool, namespace string) error {
if obj.conn == nil {
return ErrRBDGroupNotConnected
}

vgj := journal.NewCSIVolumeGroupJournal(groupSuffix)
vgj.SetNamespace(namespace)
err := vgj.Connect(obj.monitors, namespace, obj.credentials)
if err != nil {
return err
}

obj.journal = vgj
obj.journalPool = pool

return nil
}

func (obj *groupObject) GetID(ctx context.Context) (string, error) {
if obj.id == "" {
return "", errors.New("BUG: ID is no set")
}

return obj.id, nil
}
75 changes: 48 additions & 27 deletions internal/rbd_group/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,53 +19,78 @@ package rbd_group
import (
"context"
"fmt"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"

types "github.com/ceph/ceph-csi/internal/rbd_types"
)

// verify that rbdSnapshot type implements the Snapshot interface
var _ types.Snapshot = &rbdGroupSnapshot{}
var _ types.Snapshot = &groupSnapshot{}

// rbdGroupSnapshot describes a single snapshot that was taken as part of a group.
type rbdGroupSnapshot struct {
parent types.Volume
snapName string
snapID uint64 // not needed now, may be used for cloning in the future
// groupSnapshot describes a single snapshot that was taken as part of a group.
type groupSnapshot struct {
*groupObject

// parent is the parent RBD-image that was snapshotted
parent types.Volume

// snapID is the (RBD) ID of the snapshot, may be used for cloning in the future
snapID uint64

// group is the optional value for a VolumeGroup that was used for
group types.VolumeGroup
group *volumeGroup
}

func newGroupSnapshot(group, name string, snapID uint64) types.Snapshot {
return &rbdGroupSnapshot{
//groupName: group,
snapName: name,
snapID: snapID,
func newGroupSnapshot(group *volumeGroup, parent types.Volume, name string, snapID uint64) types.Snapshot {
gs := &groupSnapshot{
groupObject: &groupObject{
name: name,
},
parent: parent,
snapID: snapID,
group: group,
}

return gs
}

func (rgs *rbdGroupSnapshot) Destroy(ctx context.Context) {
// nothing to do yet
// GetSnapshot returns a Snapshot by the given id.
func GetSnapshot(ctx context.Context, id string, secrets map[string]string) (types.Snapshot, error) {
// TODO: use the journal to resolve the groupSnapshot by ID

gs := &groupSnapshot{}
err := gs.resolveByID(ctx, id, secrets)
if err != nil {
return nil, err
}

// TODO: resolve more attributes from the journal
gs.parent = nil
gs.group = nil

return gs, nil
}

// String returns the image-spec of the snapshot.
func (rgs *rbdGroupSnapshot) String() string {
return fmt.Sprintf("%s@%s", rgs.parent, rgs.snapName)
func (gs *groupSnapshot) String() string {
return fmt.Sprintf("%s@%s", gs.parent, gs.name)
}

func (rgs *rbdGroupSnapshot) GetCreationTime(ctx context.Context) (*time.Time, error) {
return nil, nil
func (gs *groupSnapshot) Destroy(ctx context.Context) {
// nothing to do yet

gs.groupObject.Destroy(ctx)
}

func (rgs *rbdGroupSnapshot) GetReadyToUse(ctx context.Context) (bool, error) {
return false, nil
func (gs *groupSnapshot) Delete(ctx context.Context) error {
// TODO: fail in case the parent group still exists
// TODO: remove object from the journal
return nil
}

func (rgs *rbdGroupSnapshot) ToCSISnapshot(ctx context.Context) (*csi.Snapshot, error) {
parentID, err := rgs.parent.GetID(ctx)
func (gs *groupSnapshot) ToCSISnapshot(ctx context.Context) (*csi.Snapshot, error) {
parentID, err := gs.parent.GetID(ctx)
if err != nil {
return nil, err
}
Expand All @@ -79,7 +104,3 @@ func (rgs *rbdGroupSnapshot) ToCSISnapshot(ctx context.Context) (*csi.Snapshot,
GroupSnapshotId: parentID,
}, nil
}

func (rgs *rbdGroupSnapshot) Map(ctx context.Context) (string, error) {
return "/dev/rbd123", nil
}
Loading

0 comments on commit 65c17e7

Please sign in to comment.