Use volume ID as a prefix if the bucket is fixed in the storage class

With this, each volume will get its own prefix within the bucket if it
is configured in the storage class. This also ensures backwards
compatibility with older volumes that have been created in earlier
versions of csi-s3.
This commit is contained in:
Cyrill Troxler 2021-04-05 15:07:16 +02:00
parent 68ef284b9b
commit 26cb1d95e8
17 changed files with 195 additions and 105 deletions

View file

@ -21,6 +21,7 @@ import (
"encoding/hex"
"fmt"
"io"
"path"
"strings"
"github.com/ctrox/csi-s3/pkg/mounter"
@ -39,15 +40,21 @@ type controllerServer struct {
}
const (
defaultFsPrefix = "csi-fs"
defaultFsPath = "csi-fs"
)
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
params := req.GetParameters()
volumeID := sanitizeVolumeID(req.GetName())
if bucketName, bucketExists := params[mounter.BucketKey]; bucketExists {
volumeID = sanitizeVolumeID(bucketName)
bucketName := volumeID
prefix := ""
// check if bucket name is overridden
if nameOverride, ok := params[mounter.BucketKey]; ok {
bucketName = nameOverride
prefix = volumeID
volumeID = path.Join(bucketName, prefix)
}
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
@ -72,53 +79,54 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
exists, err := client.BucketExists(volumeID)
exists, err := client.BucketExists(bucketName)
if err != nil {
return nil, fmt.Errorf("failed to check if bucket %s exists: %v", volumeID, err)
}
var b *s3.Bucket
var meta *s3.FSMeta
if exists {
b, err = client.GetBucket(volumeID)
meta, err = client.GetFSMeta(bucketName, prefix)
if err != nil {
glog.Warningf("Bucket %s exists, but failed to get its metadata: %v", volumeID, err)
b = &s3.Bucket{
Name: volumeID,
meta = &s3.FSMeta{
BucketName: bucketName,
Prefix: prefix,
Mounter: mounter,
CapacityBytes: capacityBytes,
FSPath: "",
FSPath: defaultFsPath,
CreatedByCsi: false,
}
} else {
// Check if volume capacity requested is bigger than the already existing capacity
if capacityBytes > b.CapacityBytes {
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", volumeID))
if capacityBytes > meta.CapacityBytes {
return nil, status.Error(
codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", volumeID),
)
}
b.Mounter = mounter
meta.Mounter = mounter
}
} else {
if err = client.CreateBucket(volumeID); err != nil {
return nil, fmt.Errorf("failed to create volume %s: %v", volumeID, err)
if err = client.CreateBucket(bucketName); err != nil {
return nil, fmt.Errorf("failed to create bucket %s: %v", bucketName, err)
}
if err = client.CreatePrefix(volumeID, defaultFsPrefix); err != nil {
return nil, fmt.Errorf("failed to create prefix %s: %v", defaultFsPrefix, err)
if err = client.CreatePrefix(bucketName, path.Join(prefix, defaultFsPath)); err != nil {
return nil, fmt.Errorf("failed to create prefix %s: %v", path.Join(prefix, defaultFsPath), err)
}
b = &s3.Bucket{
Name: volumeID,
meta = &s3.FSMeta{
BucketName: bucketName,
Prefix: prefix,
Mounter: mounter,
CapacityBytes: capacityBytes,
FSPath: defaultFsPrefix,
FSPath: defaultFsPath,
CreatedByCsi: !exists,
}
}
if err := client.SetBucket(b); err != nil {
return nil, fmt.Errorf("Error setting bucket metadata: %v", err)
if err := client.SetFSMeta(meta); err != nil {
return nil, fmt.Errorf("error setting bucket metadata: %w", err)
}
glog.V(4).Infof("create volume %s", volumeID)
s3Vol := s3Volume{}
s3Vol.VolName = volumeID
s3Vol.VolID = volumeID
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: volumeID,
@ -130,6 +138,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
volumeID := req.GetVolumeId()
bucketName, prefix := volumeIDToBucketPrefix(volumeID)
// Check arguments
if len(volumeID) == 0 {
@ -146,17 +155,22 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
exists, err := client.BucketExists(volumeID)
exists, err := client.BucketExists(bucketName)
if err != nil {
return nil, err
}
if exists {
b, err := client.GetBucket(volumeID)
meta, err := client.GetFSMeta(bucketName, prefix)
if err != nil {
return nil, fmt.Errorf("Failed to get metadata of buckect %s", volumeID)
return nil, fmt.Errorf("failed to get metadata of buckect %s", volumeID)
}
if b.CreatedByCsi {
if err := client.RemoveBucket(volumeID); err != nil {
if prefix != "" {
if err := client.RemovePrefix(bucketName, prefix); err != nil {
return nil, fmt.Errorf("unable to remove prefix: %w", err)
}
}
if meta.CreatedByCsi {
if err := client.RemoveBucket(bucketName); err != nil {
glog.V(3).Infof("Failed to remove volume %s: %v", volumeID, err)
return nil, err
}
@ -180,18 +194,25 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
if req.GetVolumeCapabilities() == nil {
return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request")
}
bucketName, prefix := volumeIDToBucketPrefix(req.GetVolumeId())
s3, err := s3.NewClientFromSecret(req.GetSecrets())
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
exists, err := s3.BucketExists(req.GetVolumeId())
exists, err := s3.BucketExists(bucketName)
if err != nil {
return nil, err
}
if !exists {
// return an error if the volume requested does not exist
return nil, status.Error(codes.NotFound, fmt.Sprintf("Volume with id %s does not exist", req.GetVolumeId()))
// return an error if the bucket of the requested volume does not exist
return nil, status.Error(codes.NotFound, fmt.Sprintf("bucket of volume with id %s does not exist", req.GetVolumeId()))
}
if _, err := s3.GetFSMeta(bucketName, prefix); err != nil {
// return an error if the fsmeta of the requested volume does not exist
return nil, status.Error(codes.NotFound, fmt.Sprintf("fsmeta of volume with id %s does not exist", req.GetVolumeId()))
}
// We currently only support RWO
@ -229,3 +250,16 @@ func sanitizeVolumeID(volumeID string) string {
}
return volumeID
}
// volumeIDToBucketPrefix returns the bucket name and prefix based on the volumeID.
// Prefix is empty if volumeID does not have a slash in the name.
func volumeIDToBucketPrefix(volumeID string) (string, string) {
// if the volumeID has a slash in it, this volume is
// stored under a certain prefix within the bucket.
splitVolumeID := strings.Split(volumeID, "/")
if len(splitVolumeID) > 1 {
return splitVolumeID[0], splitVolumeID[1]
}
return volumeID, ""
}

View file

@ -32,13 +32,6 @@ type driver struct {
cs *controllerServer
}
type s3Volume struct {
VolName string `json:"volName"`
VolID string `json:"volID"`
VolSize int64 `json:"volSize"`
VolPath string `json:"volPath"`
}
var (
vendorVersion = "v1.1.1"
driverName = "ch.ctrox.csi.s3-driver"

View file

@ -32,6 +32,33 @@ var _ = Describe("S3Driver", func() {
StagingPath: os.TempDir() + "/goofys-staging",
Address: csiEndpoint,
SecretsFile: "../../test/secret.yaml",
TestVolumeParameters: map[string]string{
"mounter": "goofys",
"bucket": "testbucket0",
},
}
sanity.GinkgoTest(sanityCfg)
})
})
Context("goofys-no-bucket", func() {
socket := "/tmp/csi-goofys-no-bucket.sock"
csiEndpoint := "unix://" + socket
if err := os.Remove(socket); err != nil && !os.IsNotExist(err) {
Expect(err).NotTo(HaveOccurred())
}
driver, err := driver.New("test-node", csiEndpoint)
if err != nil {
log.Fatal(err)
}
go driver.Run()
Describe("CSI sanity", func() {
sanityCfg := &sanity.Config{
TargetPath: os.TempDir() + "/goofys-no-bucket-target",
StagingPath: os.TempDir() + "/goofys-no-bucket-staging",
Address: csiEndpoint,
SecretsFile: "../../test/secret.yaml",
TestVolumeParameters: map[string]string{
"mounter": "goofys",
},
@ -60,6 +87,7 @@ var _ = Describe("S3Driver", func() {
SecretsFile: "../../test/secret.yaml",
TestVolumeParameters: map[string]string{
"mounter": "s3fs",
"bucket": "testbucket1",
},
}
sanity.GinkgoTest(sanityCfg)
@ -89,6 +117,7 @@ var _ = Describe("S3Driver", func() {
SecretsFile: "../../test/secret.yaml",
TestVolumeParameters: map[string]string{
"mounter": "s3backer",
"bucket": "testbucket2",
},
}
sanity.GinkgoTest(sanityCfg)
@ -116,6 +145,7 @@ var _ = Describe("S3Driver", func() {
SecretsFile: "../../test/secret.yaml",
TestVolumeParameters: map[string]string{
"mounter": "rclone",
"bucket": "testbucket3",
},
}
sanity.GinkgoTest(sanityCfg)

View file

@ -41,6 +41,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()
stagingTargetPath := req.GetStagingTargetPath()
bucketName, prefix := volumeIDToBucketPrefix(volumeID)
// Check arguments
if req.GetVolumeCapability() == nil {
@ -82,12 +83,12 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
b, err := s3.GetBucket(volumeID)
meta, err := s3.GetFSMeta(bucketName, prefix)
if err != nil {
return nil, err
}
mounter, err := mounter.New(b, s3.Config)
mounter, err := mounter.New(meta, s3.Config)
if err != nil {
return nil, err
}
@ -95,7 +96,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, err
}
glog.V(4).Infof("s3: bucket %s successfuly mounted to %s", b.Name, targetPath)
glog.V(4).Infof("s3: volume %s successfuly mounted to %s", volumeID, targetPath)
return &csi.NodePublishVolumeResponse{}, nil
}
@ -115,7 +116,7 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
if err := mounter.FuseUnmount(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
glog.V(4).Infof("s3: bucket %s has been unmounted.", volumeID)
glog.V(4).Infof("s3: volume %s has been unmounted.", volumeID)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
@ -123,6 +124,7 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
volumeID := req.GetVolumeId()
stagingTargetPath := req.GetStagingTargetPath()
bucketName, prefix := volumeIDToBucketPrefix(volumeID)
// Check arguments
if len(volumeID) == 0 {
@ -148,11 +150,11 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
b, err := client.GetBucket(volumeID)
meta, err := client.GetFSMeta(bucketName, prefix)
if err != nil {
return nil, err
}
mounter, err := mounter.New(b, client.Config)
mounter, err := mounter.New(meta, client.Config)
if err != nil {
return nil, err
}