From 9ee2e2c9770e205beb916bb81696d74dbaf73908 Mon Sep 17 00:00:00 2001
From: Cyrill Troxler <cyrilltroxler@gmail.com>
Date: Mon, 5 Apr 2021 15:07:16 +0200
Subject: [PATCH] Use volume ID as a prefix if the bucket is fixed in the
 storage class

With this, each volume will get its own prefix within the bucket if it
is configured in the storage class. This also ensures backwards
compatibility with older volumes that have been created in earlier
versions of csi-s3.
---
 README.md                                     | 29 ++++--
 deploy/kubernetes/csi-s3.yaml                 |  3 +-
 deploy/kubernetes/{ => examples}/pod.yaml     |  0
 deploy/kubernetes/{ => examples}/pvc.yaml     |  0
 .../{ => examples}/storageclass.yaml          |  2 +
 deploy/kubernetes/provisioner.yaml            |  3 +-
 deploy/kubernetes/secret.yaml.sample          | 12 ---
 pkg/driver/controllerserver.go                | 98 +++++++++++++------
 pkg/driver/driver.go                          |  7 --
 pkg/driver/driver_suite_test.go               | 30 ++++++
 pkg/driver/nodeserver.go                      | 14 +--
 pkg/mounter/goofys.go                         |  9 +-
 pkg/mounter/mounter.go                        | 16 +--
 pkg/mounter/rclone.go                         |  9 +-
 pkg/mounter/s3backer.go                       | 26 ++---
 pkg/mounter/s3fs.go                           | 11 ++-
 pkg/s3/client.go                              | 43 ++++----
 17 files changed, 195 insertions(+), 117 deletions(-)
 rename deploy/kubernetes/{ => examples}/pod.yaml (100%)
 rename deploy/kubernetes/{ => examples}/pvc.yaml (100%)
 rename deploy/kubernetes/{ => examples}/storageclass.yaml (90%)
 delete mode 100644 deploy/kubernetes/secret.yaml.sample

diff --git a/README.md b/README.md
index 546de1c..4e8fc0b 100644
--- a/README.md
+++ b/README.md
@@ -47,7 +47,7 @@ kubectl create -f csi-s3.yaml
 ### 3. Create the storage class
 
 ```bash
-kubectl create -f storageclass.yaml
+kubectl create -f examples/storageclass.yaml
 ```
 
 ### 4. Test the S3 driver
@@ -55,10 +55,10 @@ kubectl create -f storageclass.yaml
 1. Create a pvc using the new storage class:
 
 ```bash
-kubectl create -f pvc.yaml
+kubectl create -f examples/pvc.yaml
 ```
 
-2. Check if the PVC has been bound:
+1. Check if the PVC has been bound:
 
 ```bash
 $ kubectl get pvc csi-s3-pvc
@@ -66,15 +66,15 @@ NAME         STATUS    VOLUME                                     CAPACITY   ACC
 csi-s3-pvc   Bound     pvc-c5d4634f-8507-11e8-9f33-0e243832354b   5Gi        RWO            csi-s3         9s
 ```
 
-3. Create a test pod which mounts your volume:
+1. Create a test pod which mounts your volume:
 
 ```bash
-kubectl create -f pod.yaml
+kubectl create -f examples/pod.yaml
 ```
 
 If the pod can start, everything should be working.
 
-4. Test the mount
+1. Test the mount
 
 ```bash
 $ kubectl exec -ti csi-s3-test-nginx bash
@@ -87,6 +87,23 @@ If something does not work as expected, check the troubleshooting section below.
 
 ## Additional configuration
 
+### Bucket
+
+By default, csi-s3 will create a new bucket per volume. The bucket name will match that of the volume ID. If you want your volumes to live in a precreated bucket, you can simply specify the bucket in the storage class parameters:
+
+```yaml
+kind: StorageClass
+apiVersion: storage.k8s.io/v1
+metadata:
+  name: csi-s3-existing-bucket
+provisioner: ch.ctrox.csi.s3-driver
+parameters:
+  mounter: rclone
+  bucket: some-existing-bucket-name
+```
+
+If the bucket is specified, it will still be created if it does not exist on the backend. Every volume will get its own prefix within the bucket which matches the volume ID. When deleting a volume, also just the prefix will be deleted.
+
 ### Mounter
 
 As S3 is not a real file system there are some limitations to consider here. Depending on what mounter you are using, you will have different levels of POSIX compability. Also depending on what S3 storage backend you are using there are not always [consistency guarantees](https://github.com/gaul/are-we-consistent-yet#observed-consistency).
diff --git a/deploy/kubernetes/csi-s3.yaml b/deploy/kubernetes/csi-s3.yaml
index 8532696..27c2e34 100644
--- a/deploy/kubernetes/csi-s3.yaml
+++ b/deploy/kubernetes/csi-s3.yaml
@@ -81,7 +81,8 @@ spec:
             capabilities:
               add: ["SYS_ADMIN"]
             allowPrivilegeEscalation: true
-          image: ctrox/csi-s3:v1.1.1
+          image: ctrox/csi-s3:dev
+          imagePullPolicy: Never
           args:
             - "--endpoint=$(CSI_ENDPOINT)"
             - "--nodeid=$(NODE_ID)"
diff --git a/deploy/kubernetes/pod.yaml b/deploy/kubernetes/examples/pod.yaml
similarity index 100%
rename from deploy/kubernetes/pod.yaml
rename to deploy/kubernetes/examples/pod.yaml
diff --git a/deploy/kubernetes/pvc.yaml b/deploy/kubernetes/examples/pvc.yaml
similarity index 100%
rename from deploy/kubernetes/pvc.yaml
rename to deploy/kubernetes/examples/pvc.yaml
diff --git a/deploy/kubernetes/storageclass.yaml b/deploy/kubernetes/examples/storageclass.yaml
similarity index 90%
rename from deploy/kubernetes/storageclass.yaml
rename to deploy/kubernetes/examples/storageclass.yaml
index ad6b39c..45e46f3 100644
--- a/deploy/kubernetes/storageclass.yaml
+++ b/deploy/kubernetes/examples/storageclass.yaml
@@ -8,6 +8,8 @@ parameters:
   # specify which mounter to use
   # can be set to rclone, s3fs, goofys or s3backer
   mounter: rclone
+  # to use an existing bucket, specify it here:
+  # bucket: some-existing-bucket
   csi.storage.k8s.io/provisioner-secret-name: csi-s3-secret
   csi.storage.k8s.io/provisioner-secret-namespace: kube-system
   csi.storage.k8s.io/controller-publish-secret-name: csi-s3-secret
diff --git a/deploy/kubernetes/provisioner.yaml b/deploy/kubernetes/provisioner.yaml
index 9aa922d..d951496 100644
--- a/deploy/kubernetes/provisioner.yaml
+++ b/deploy/kubernetes/provisioner.yaml
@@ -87,7 +87,8 @@ spec:
             - name: socket-dir
               mountPath: /var/lib/kubelet/plugins/ch.ctrox.csi.s3-driver
         - name: csi-s3
-          image: ctrox/csi-s3:v1.1.1
+          image: ctrox/csi-s3:dev
+          imagePullPolicy: Never
           args:
             - "--endpoint=$(CSI_ENDPOINT)"
             - "--nodeid=$(NODE_ID)"
diff --git a/deploy/kubernetes/secret.yaml.sample b/deploy/kubernetes/secret.yaml.sample
deleted file mode 100644
index e3e82c8..0000000
--- a/deploy/kubernetes/secret.yaml.sample
+++ /dev/null
@@ -1,12 +0,0 @@
-apiVersion: v1
-kind: Secret
-metadata:
-  namespace: kube-system
-  name: csi-s3-secret
-stringData:
-  accessKeyID: <YOUR_ACCESS_KEY_ID>
-  secretAccessKey: <YOUR_SECRET_ACCES_KEY>
-  # For AWS set it to "https://s3.<region>.amazonaws.com"
-  endpoint: https://s3.eu-central-1.amazonaws.com
-  # If not on S3, set it to ""
-  region: <S3_REGION>
diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go
index b866b41..9a1b5b0 100644
--- a/pkg/driver/controllerserver.go
+++ b/pkg/driver/controllerserver.go
@@ -21,6 +21,7 @@ import (
 	"encoding/hex"
 	"fmt"
 	"io"
+	"path"
 	"strings"
 
 	"github.com/ctrox/csi-s3/pkg/mounter"
@@ -39,15 +40,21 @@ type controllerServer struct {
 }
 
 const (
-	defaultFsPrefix = "csi-fs"
+	defaultFsPath = "csi-fs"
 )
 
 func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
 	params := req.GetParameters()
 
 	volumeID := sanitizeVolumeID(req.GetName())
-	if bucketName, bucketExists := params[mounter.BucketKey]; bucketExists {
-		volumeID = sanitizeVolumeID(bucketName)
+	bucketName := volumeID
+	prefix := ""
+
+	// check if bucket name is overridden
+	if nameOverride, ok := params[mounter.BucketKey]; ok {
+		bucketName = nameOverride
+		prefix = volumeID
+		volumeID = path.Join(bucketName, prefix)
 	}
 
 	if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
@@ -72,53 +79,54 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
 	if err != nil {
 		return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
 	}
-	exists, err := client.BucketExists(volumeID)
+	exists, err := client.BucketExists(bucketName)
 	if err != nil {
 		return nil, fmt.Errorf("failed to check if bucket %s exists: %v", volumeID, err)
 	}
-	var b *s3.Bucket
+	var meta *s3.FSMeta
 	if exists {
-		b, err = client.GetBucket(volumeID)
+		meta, err = client.GetFSMeta(bucketName, prefix)
 
 		if err != nil {
 			glog.Warningf("Bucket %s exists, but failed to get its metadata: %v", volumeID, err)
-			b = &s3.Bucket{
-				Name:          volumeID,
+			meta = &s3.FSMeta{
+				BucketName:    bucketName,
+				Prefix:        prefix,
 				Mounter:       mounter,
 				CapacityBytes: capacityBytes,
-				FSPath:        "",
+				FSPath:        defaultFsPath,
 				CreatedByCsi:  false,
 			}
 		} else {
 			// Check if volume capacity requested is bigger than the already existing capacity
-			if capacityBytes > b.CapacityBytes {
-				return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", volumeID))
+			if capacityBytes > meta.CapacityBytes {
+				return nil, status.Error(
+					codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", volumeID),
+				)
 			}
-			b.Mounter = mounter
+			meta.Mounter = mounter
 		}
 	} else {
-		if err = client.CreateBucket(volumeID); err != nil {
-			return nil, fmt.Errorf("failed to create volume %s: %v", volumeID, err)
+		if err = client.CreateBucket(bucketName); err != nil {
+			return nil, fmt.Errorf("failed to create bucket %s: %v", bucketName, err)
 		}
-		if err = client.CreatePrefix(volumeID, defaultFsPrefix); err != nil {
-			return nil, fmt.Errorf("failed to create prefix %s: %v", defaultFsPrefix, err)
+		if err = client.CreatePrefix(bucketName, path.Join(prefix, defaultFsPath)); err != nil {
+			return nil, fmt.Errorf("failed to create prefix %s: %v", path.Join(prefix, defaultFsPath), err)
 		}
-		b = &s3.Bucket{
-			Name:          volumeID,
+		meta = &s3.FSMeta{
+			BucketName:    bucketName,
+			Prefix:        prefix,
 			Mounter:       mounter,
 			CapacityBytes: capacityBytes,
-			FSPath:        defaultFsPrefix,
+			FSPath:        defaultFsPath,
 			CreatedByCsi:  !exists,
 		}
 	}
-	if err := client.SetBucket(b); err != nil {
-		return nil, fmt.Errorf("Error setting bucket metadata: %v", err)
+	if err := client.SetFSMeta(meta); err != nil {
+		return nil, fmt.Errorf("error setting bucket metadata: %w", err)
 	}
 
 	glog.V(4).Infof("create volume %s", volumeID)
-	s3Vol := s3Volume{}
-	s3Vol.VolName = volumeID
-	s3Vol.VolID = volumeID
 	return &csi.CreateVolumeResponse{
 		Volume: &csi.Volume{
 			VolumeId:      volumeID,
@@ -130,6 +138,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
 
 func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
 	volumeID := req.GetVolumeId()
+	bucketName, prefix := volumeIDToBucketPrefix(volumeID)
 
 	// Check arguments
 	if len(volumeID) == 0 {
@@ -146,17 +155,22 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
 	if err != nil {
 		return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
 	}
-	exists, err := client.BucketExists(volumeID)
+	exists, err := client.BucketExists(bucketName)
 	if err != nil {
 		return nil, err
 	}
 	if exists {
-		b, err := client.GetBucket(volumeID)
+		meta, err := client.GetFSMeta(bucketName, prefix)
 		if err != nil {
-			return nil, fmt.Errorf("Failed to get metadata of buckect %s", volumeID)
+			return nil, fmt.Errorf("failed to get metadata of buckect %s", volumeID)
 		}
-		if b.CreatedByCsi {
-			if err := client.RemoveBucket(volumeID); err != nil {
+		if prefix != "" {
+			if err := client.RemovePrefix(bucketName, prefix); err != nil {
+				return nil, fmt.Errorf("unable to remove prefix: %w", err)
+			}
+		}
+		if meta.CreatedByCsi {
+			if err := client.RemoveBucket(bucketName); err != nil {
 				glog.V(3).Infof("Failed to remove volume %s: %v", volumeID, err)
 				return nil, err
 			}
@@ -180,18 +194,25 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
 	if req.GetVolumeCapabilities() == nil {
 		return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request")
 	}
+	bucketName, prefix := volumeIDToBucketPrefix(req.GetVolumeId())
 
 	s3, err := s3.NewClientFromSecret(req.GetSecrets())
 	if err != nil {
 		return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
 	}
-	exists, err := s3.BucketExists(req.GetVolumeId())
+	exists, err := s3.BucketExists(bucketName)
 	if err != nil {
 		return nil, err
 	}
+
 	if !exists {
-		// return an error if the volume requested does not exist
-		return nil, status.Error(codes.NotFound, fmt.Sprintf("Volume with id %s does not exist", req.GetVolumeId()))
+		// return an error if the bucket of the requested volume does not exist
+		return nil, status.Error(codes.NotFound, fmt.Sprintf("bucket of volume with id %s does not exist", req.GetVolumeId()))
+	}
+
+	if _, err := s3.GetFSMeta(bucketName, prefix); err != nil {
+		// return an error if the fsmeta of the requested volume does not exist
+		return nil, status.Error(codes.NotFound, fmt.Sprintf("fsmeta of volume with id %s does not exist", req.GetVolumeId()))
 	}
 
 	// We currently only support RWO
@@ -229,3 +250,16 @@ func sanitizeVolumeID(volumeID string) string {
 	}
 	return volumeID
 }
+
+// volumeIDToBucketPrefix returns the bucket name and prefix based on the volumeID.
+// Prefix is empty if volumeID does not have a slash in the name.
+func volumeIDToBucketPrefix(volumeID string) (string, string) {
+	// if the volumeID has a slash in it, this volume is
+	// stored under a certain prefix within the bucket.
+	splitVolumeID := strings.Split(volumeID, "/")
+	if len(splitVolumeID) > 1 {
+		return splitVolumeID[0], splitVolumeID[1]
+	}
+
+	return volumeID, ""
+}
diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go
index badc955..1feeead 100644
--- a/pkg/driver/driver.go
+++ b/pkg/driver/driver.go
@@ -32,13 +32,6 @@ type driver struct {
 	cs  *controllerServer
 }
 
-type s3Volume struct {
-	VolName string `json:"volName"`
-	VolID   string `json:"volID"`
-	VolSize int64  `json:"volSize"`
-	VolPath string `json:"volPath"`
-}
-
 var (
 	vendorVersion = "v1.1.1"
 	driverName    = "ch.ctrox.csi.s3-driver"
diff --git a/pkg/driver/driver_suite_test.go b/pkg/driver/driver_suite_test.go
index 156c9d5..ccd1ee2 100644
--- a/pkg/driver/driver_suite_test.go
+++ b/pkg/driver/driver_suite_test.go
@@ -32,6 +32,33 @@ var _ = Describe("S3Driver", func() {
 				StagingPath: os.TempDir() + "/goofys-staging",
 				Address:     csiEndpoint,
 				SecretsFile: "../../test/secret.yaml",
+				TestVolumeParameters: map[string]string{
+					"mounter": "goofys",
+					"bucket":  "testbucket0",
+				},
+			}
+			sanity.GinkgoTest(sanityCfg)
+		})
+	})
+
+	Context("goofys-no-bucket", func() {
+		socket := "/tmp/csi-goofys-no-bucket.sock"
+		csiEndpoint := "unix://" + socket
+		if err := os.Remove(socket); err != nil && !os.IsNotExist(err) {
+			Expect(err).NotTo(HaveOccurred())
+		}
+		driver, err := driver.New("test-node", csiEndpoint)
+		if err != nil {
+			log.Fatal(err)
+		}
+		go driver.Run()
+
+		Describe("CSI sanity", func() {
+			sanityCfg := &sanity.Config{
+				TargetPath:  os.TempDir() + "/goofys-no-bucket-target",
+				StagingPath: os.TempDir() + "/goofys-no-bucket-staging",
+				Address:     csiEndpoint,
+				SecretsFile: "../../test/secret.yaml",
 				TestVolumeParameters: map[string]string{
 					"mounter": "goofys",
 				},
@@ -60,6 +87,7 @@ var _ = Describe("S3Driver", func() {
 				SecretsFile: "../../test/secret.yaml",
 				TestVolumeParameters: map[string]string{
 					"mounter": "s3fs",
+					"bucket":  "testbucket1",
 				},
 			}
 			sanity.GinkgoTest(sanityCfg)
@@ -89,6 +117,7 @@ var _ = Describe("S3Driver", func() {
 				SecretsFile: "../../test/secret.yaml",
 				TestVolumeParameters: map[string]string{
 					"mounter": "s3backer",
+					"bucket":  "testbucket2",
 				},
 			}
 			sanity.GinkgoTest(sanityCfg)
@@ -116,6 +145,7 @@ var _ = Describe("S3Driver", func() {
 				SecretsFile: "../../test/secret.yaml",
 				TestVolumeParameters: map[string]string{
 					"mounter": "rclone",
+					"bucket":  "testbucket3",
 				},
 			}
 			sanity.GinkgoTest(sanityCfg)
diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go
index b5969e6..e6e733d 100644
--- a/pkg/driver/nodeserver.go
+++ b/pkg/driver/nodeserver.go
@@ -41,6 +41,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
 	volumeID := req.GetVolumeId()
 	targetPath := req.GetTargetPath()
 	stagingTargetPath := req.GetStagingTargetPath()
+	bucketName, prefix := volumeIDToBucketPrefix(volumeID)
 
 	// Check arguments
 	if req.GetVolumeCapability() == nil {
@@ -82,12 +83,12 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
 	if err != nil {
 		return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
 	}
-	b, err := s3.GetBucket(volumeID)
+	meta, err := s3.GetFSMeta(bucketName, prefix)
 	if err != nil {
 		return nil, err
 	}
 
-	mounter, err := mounter.New(b, s3.Config)
+	mounter, err := mounter.New(meta, s3.Config)
 	if err != nil {
 		return nil, err
 	}
@@ -95,7 +96,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
 		return nil, err
 	}
 
-	glog.V(4).Infof("s3: bucket %s successfuly mounted to %s", b.Name, targetPath)
+	glog.V(4).Infof("s3: volume %s successfuly mounted to %s", volumeID, targetPath)
 
 	return &csi.NodePublishVolumeResponse{}, nil
 }
@@ -115,7 +116,7 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
 	if err := mounter.FuseUnmount(targetPath); err != nil {
 		return nil, status.Error(codes.Internal, err.Error())
 	}
-	glog.V(4).Infof("s3: bucket %s has been unmounted.", volumeID)
+	glog.V(4).Infof("s3: volume %s has been unmounted.", volumeID)
 
 	return &csi.NodeUnpublishVolumeResponse{}, nil
 }
@@ -123,6 +124,7 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
 func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
 	volumeID := req.GetVolumeId()
 	stagingTargetPath := req.GetStagingTargetPath()
+	bucketName, prefix := volumeIDToBucketPrefix(volumeID)
 
 	// Check arguments
 	if len(volumeID) == 0 {
@@ -148,11 +150,11 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
 	if err != nil {
 		return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
 	}
-	b, err := client.GetBucket(volumeID)
+	meta, err := client.GetFSMeta(bucketName, prefix)
 	if err != nil {
 		return nil, err
 	}
-	mounter, err := mounter.New(b, client.Config)
+	mounter, err := mounter.New(meta, client.Config)
 	if err != nil {
 		return nil, err
 	}
diff --git a/pkg/mounter/goofys.go b/pkg/mounter/goofys.go
index 054c912..e7bccc8 100644
--- a/pkg/mounter/goofys.go
+++ b/pkg/mounter/goofys.go
@@ -3,6 +3,7 @@ package mounter
 import (
 	"fmt"
 	"os"
+	"path"
 
 	"context"
 
@@ -17,21 +18,21 @@ const (
 
 // Implements Mounter
 type goofysMounter struct {
-	bucket          *s3.Bucket
+	meta            *s3.FSMeta
 	endpoint        string
 	region          string
 	accessKeyID     string
 	secretAccessKey string
 }
 
-func newGoofysMounter(b *s3.Bucket, cfg *s3.Config) (Mounter, error) {
+func newGoofysMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) {
 	region := cfg.Region
 	// if endpoint is set we need a default region
 	if region == "" && cfg.Endpoint != "" {
 		region = defaultRegion
 	}
 	return &goofysMounter{
-		bucket:          b,
+		meta:            meta,
 		endpoint:        cfg.Endpoint,
 		region:          region,
 		accessKeyID:     cfg.AccessKeyID,
@@ -61,7 +62,7 @@ func (goofys *goofysMounter) Mount(source string, target string) error {
 
 	os.Setenv("AWS_ACCESS_KEY_ID", goofys.accessKeyID)
 	os.Setenv("AWS_SECRET_ACCESS_KEY", goofys.secretAccessKey)
-	fullPath := fmt.Sprintf("%s:%s", goofys.bucket.Name, goofys.bucket.FSPath)
+	fullPath := fmt.Sprintf("%s:%s", goofys.meta.BucketName, path.Join(goofys.meta.Prefix, goofys.meta.FSPath))
 
 	_, _, err := goofysApi.Mount(context.Background(), fullPath, goofysCfg)
 
diff --git a/pkg/mounter/mounter.go b/pkg/mounter/mounter.go
index f12ee6a..5e0f45c 100644
--- a/pkg/mounter/mounter.go
+++ b/pkg/mounter/mounter.go
@@ -34,28 +34,28 @@ const (
 )
 
 // New returns a new mounter depending on the mounterType parameter
-func New(bucket *s3.Bucket, cfg *s3.Config) (Mounter, error) {
-	mounter := bucket.Mounter
+func New(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) {
+	mounter := meta.Mounter
 	// Fall back to mounterType in cfg
-	if len(bucket.Mounter) == 0 {
+	if len(meta.Mounter) == 0 {
 		mounter = cfg.Mounter
 	}
 	switch mounter {
 	case s3fsMounterType:
-		return newS3fsMounter(bucket, cfg)
+		return newS3fsMounter(meta, cfg)
 
 	case goofysMounterType:
-		return newGoofysMounter(bucket, cfg)
+		return newGoofysMounter(meta, cfg)
 
 	case s3backerMounterType:
-		return newS3backerMounter(bucket, cfg)
+		return newS3backerMounter(meta, cfg)
 
 	case rcloneMounterType:
-		return newRcloneMounter(bucket, cfg)
+		return newRcloneMounter(meta, cfg)
 
 	default:
 		// default to s3backer
-		return newS3backerMounter(bucket, cfg)
+		return newS3backerMounter(meta, cfg)
 	}
 }
 
diff --git a/pkg/mounter/rclone.go b/pkg/mounter/rclone.go
index 51ad842..038c6aa 100644
--- a/pkg/mounter/rclone.go
+++ b/pkg/mounter/rclone.go
@@ -3,13 +3,14 @@ package mounter
 import (
 	"fmt"
 	"os"
+	"path"
 
 	"github.com/ctrox/csi-s3/pkg/s3"
 )
 
 // Implements Mounter
 type rcloneMounter struct {
-	bucket          *s3.Bucket
+	meta            *s3.FSMeta
 	url             string
 	region          string
 	accessKeyID     string
@@ -20,9 +21,9 @@ const (
 	rcloneCmd = "rclone"
 )
 
-func newRcloneMounter(b *s3.Bucket, cfg *s3.Config) (Mounter, error) {
+func newRcloneMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) {
 	return &rcloneMounter{
-		bucket:          b,
+		meta:            meta,
 		url:             cfg.Endpoint,
 		region:          cfg.Region,
 		accessKeyID:     cfg.AccessKeyID,
@@ -41,7 +42,7 @@ func (rclone *rcloneMounter) Unstage(stageTarget string) error {
 func (rclone *rcloneMounter) Mount(source string, target string) error {
 	args := []string{
 		"mount",
-		fmt.Sprintf(":s3:%s/%s", rclone.bucket.Name, rclone.bucket.FSPath),
+		fmt.Sprintf(":s3:%s", path.Join(rclone.meta.BucketName, rclone.meta.Prefix, rclone.meta.FSPath)),
 		fmt.Sprintf("%s", target),
 		"--daemon",
 		"--s3-provider=AWS",
diff --git a/pkg/mounter/s3backer.go b/pkg/mounter/s3backer.go
index 04e5172..c70679d 100644
--- a/pkg/mounter/s3backer.go
+++ b/pkg/mounter/s3backer.go
@@ -14,7 +14,7 @@ import (
 
 // Implements Mounter
 type s3backerMounter struct {
-	bucket          *s3.Bucket
+	meta            *s3.FSMeta
 	url             string
 	region          string
 	accessKeyID     string
@@ -33,18 +33,18 @@ const (
 	S3backerLoopDevice = "/dev/loop0"
 )
 
-func newS3backerMounter(bucket *s3.Bucket, cfg *s3.Config) (Mounter, error) {
+func newS3backerMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) {
 	url, err := url.Parse(cfg.Endpoint)
 	if err != nil {
 		return nil, err
 	}
-	url.Path = path.Join(url.Path, bucket.Name, bucket.FSPath)
+	url.Path = path.Join(url.Path, meta.BucketName, meta.Prefix, meta.FSPath)
 	// s3backer cannot work with 0 size volumes
-	if bucket.CapacityBytes == 0 {
-		bucket.CapacityBytes = s3backerDefaultSize
+	if meta.CapacityBytes == 0 {
+		meta.CapacityBytes = s3backerDefaultSize
 	}
 	s3backer := &s3backerMounter{
-		bucket:          bucket,
+		meta:            meta,
 		url:             cfg.Endpoint,
 		region:          cfg.Region,
 		accessKeyID:     cfg.AccessKeyID,
@@ -56,7 +56,7 @@ func newS3backerMounter(bucket *s3.Bucket, cfg *s3.Config) (Mounter, error) {
 }
 
 func (s3backer *s3backerMounter) String() string {
-	return s3backer.bucket.Name
+	return path.Join(s3backer.meta.BucketName, s3backer.meta.Prefix)
 }
 
 func (s3backer *s3backerMounter) Stage(stageTarget string) error {
@@ -94,14 +94,14 @@ func (s3backer *s3backerMounter) Mount(source string, target string) error {
 	return nil
 }
 
-func (s3backer *s3backerMounter) mountInit(path string) error {
+func (s3backer *s3backerMounter) mountInit(p string) error {
 	args := []string{
 		fmt.Sprintf("--blockSize=%s", s3backerBlockSize),
-		fmt.Sprintf("--size=%v", s3backer.bucket.CapacityBytes),
-		fmt.Sprintf("--prefix=%s/", s3backer.bucket.FSPath),
+		fmt.Sprintf("--size=%v", s3backer.meta.CapacityBytes),
+		fmt.Sprintf("--prefix=%s/", path.Join(s3backer.meta.Prefix, s3backer.meta.FSPath)),
 		"--listBlocks",
-		s3backer.bucket.Name,
-		path,
+		s3backer.meta.BucketName,
+		p,
 	}
 	if s3backer.region != "" {
 		args = append(args, fmt.Sprintf("--region=%s", s3backer.region))
@@ -114,7 +114,7 @@ func (s3backer *s3backerMounter) mountInit(path string) error {
 		args = append(args, "--ssl")
 	}
 
-	return fuseMount(path, s3backerCmd, args)
+	return fuseMount(p, s3backerCmd, args)
 }
 
 func (s3backer *s3backerMounter) writePasswd() error {
diff --git a/pkg/mounter/s3fs.go b/pkg/mounter/s3fs.go
index b916e52..9649388 100644
--- a/pkg/mounter/s3fs.go
+++ b/pkg/mounter/s3fs.go
@@ -3,13 +3,14 @@ package mounter
 import (
 	"fmt"
 	"os"
+	"path"
 
 	"github.com/ctrox/csi-s3/pkg/s3"
 )
 
 // Implements Mounter
 type s3fsMounter struct {
-	bucket        *s3.Bucket
+	meta          *s3.FSMeta
 	url           string
 	region        string
 	pwFileContent string
@@ -19,9 +20,9 @@ const (
 	s3fsCmd = "s3fs"
 )
 
-func newS3fsMounter(b *s3.Bucket, cfg *s3.Config) (Mounter, error) {
+func newS3fsMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) {
 	return &s3fsMounter{
-		bucket:        b,
+		meta:          meta,
 		url:           cfg.Endpoint,
 		region:        cfg.Region,
 		pwFileContent: cfg.AccessKeyID + ":" + cfg.SecretAccessKey,
@@ -41,8 +42,8 @@ func (s3fs *s3fsMounter) Mount(source string, target string) error {
 		return err
 	}
 	args := []string{
-		fmt.Sprintf("%s:/%s", s3fs.bucket.Name, s3fs.bucket.FSPath),
-		fmt.Sprintf("%s", target),
+		fmt.Sprintf("%s:/%s", s3fs.meta.BucketName, path.Join(s3fs.meta.Prefix, s3fs.meta.FSPath)),
+		target,
 		"-o", "use_path_request_style",
 		"-o", fmt.Sprintf("url=%s", s3fs.url),
 		"-o", fmt.Sprintf("endpoint=%s", s3fs.region),
diff --git a/pkg/s3/client.go b/pkg/s3/client.go
index 1411aa1..653347e 100644
--- a/pkg/s3/client.go
+++ b/pkg/s3/client.go
@@ -7,6 +7,7 @@ import (
 	"fmt"
 	"io"
 	"net/url"
+	"path"
 
 	"github.com/golang/glog"
 	"github.com/minio/minio-go/v7"
@@ -32,12 +33,13 @@ type Config struct {
 	Mounter         string
 }
 
-type Bucket struct {
-	Name          string
-	Mounter       string
-	FSPath        string
-	CapacityBytes int64
-	CreatedByCsi  bool
+type FSMeta struct {
+	BucketName    string `json:"Name"`
+	Prefix        string `json:"Prefix"`
+	Mounter       string `json:"Mounter"`
+	FSPath        string `json:"FSPath"`
+	CapacityBytes int64  `json:"CapacityBytes"`
+	CreatedByCsi  bool   `json:"CreatedByCsi"`
 }
 
 func NewClient(cfg *Config) (*s3Client, error) {
@@ -93,17 +95,20 @@ func (client *s3Client) CreatePrefix(bucketName string, prefix string) error {
 }
 
 func (client *s3Client) RemovePrefix(bucketName string, prefix string) error {
+	if err := client.removeObjects(bucketName, prefix); err != nil {
+		return err
+	}
 	return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
 }
 
 func (client *s3Client) RemoveBucket(bucketName string) error {
-	if err := client.emptyBucket(bucketName); err != nil {
+	if err := client.removeObjects(bucketName, ""); err != nil {
 		return err
 	}
 	return client.minio.RemoveBucket(client.ctx, bucketName)
 }
 
-func (client *s3Client) emptyBucket(bucketName string) error {
+func (client *s3Client) removeObjects(bucketName, prefix string) error {
 	objectsCh := make(chan minio.ObjectInfo)
 	var listErr error
 
@@ -117,7 +122,7 @@ func (client *s3Client) emptyBucket(bucketName string) error {
 		for object := range client.minio.ListObjects(
 			client.ctx,
 			bucketName,
-			minio.ListObjectsOptions{Prefix: "", Recursive: true}) {
+			minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) {
 			if object.Err != nil {
 				listErr = object.Err
 				return
@@ -148,31 +153,33 @@ func (client *s3Client) emptyBucket(bucketName string) error {
 	return nil
 }
 
-func (client *s3Client) SetBucket(bucket *Bucket) error {
+func (client *s3Client) SetFSMeta(meta *FSMeta) error {
 	b := new(bytes.Buffer)
-	json.NewEncoder(b).Encode(bucket)
+	json.NewEncoder(b).Encode(meta)
 	opts := minio.PutObjectOptions{ContentType: "application/json"}
-	_, err := client.minio.PutObject(client.ctx, bucket.Name, metadataName, b, int64(b.Len()), opts)
+	_, err := client.minio.PutObject(
+		client.ctx, meta.BucketName, path.Join(meta.Prefix, metadataName), b, int64(b.Len()), opts,
+	)
 	return err
 }
 
-func (client *s3Client) GetBucket(bucketName string) (*Bucket, error) {
+func (client *s3Client) GetFSMeta(bucketName, prefix string) (*FSMeta, error) {
 	opts := minio.GetObjectOptions{}
-	obj, err := client.minio.GetObject(client.ctx, bucketName, metadataName, opts)
+	obj, err := client.minio.GetObject(client.ctx, bucketName, path.Join(prefix, metadataName), opts)
 	if err != nil {
-		return &Bucket{}, err
+		return &FSMeta{}, err
 	}
 	objInfo, err := obj.Stat()
 	if err != nil {
-		return &Bucket{}, err
+		return &FSMeta{}, err
 	}
 	b := make([]byte, objInfo.Size)
 	_, err = obj.Read(b)
 
 	if err != nil && err != io.EOF {
-		return &Bucket{}, err
+		return &FSMeta{}, err
 	}
-	var meta Bucket
+	var meta FSMeta
 	err = json.Unmarshal(b, &meta)
 	return &meta, err
 }