Merge branch 'bucket-prefixes'

This commit is contained in:
Cyrill Troxler 2021-04-05 15:10:18 +02:00
commit 9eaf79c577
17 changed files with 195 additions and 105 deletions

View file

@ -47,7 +47,7 @@ kubectl create -f csi-s3.yaml
### 3. Create the storage class
```bash
kubectl create -f storageclass.yaml
kubectl create -f examples/storageclass.yaml
```
### 4. Test the S3 driver
@ -55,10 +55,10 @@ kubectl create -f storageclass.yaml
1. Create a pvc using the new storage class:
```bash
kubectl create -f pvc.yaml
kubectl create -f examples/pvc.yaml
```
2. Check if the PVC has been bound:
1. Check if the PVC has been bound:
```bash
$ kubectl get pvc csi-s3-pvc
@ -66,15 +66,15 @@ NAME STATUS VOLUME CAPACITY ACC
csi-s3-pvc Bound pvc-c5d4634f-8507-11e8-9f33-0e243832354b 5Gi RWO csi-s3 9s
```
3. Create a test pod which mounts your volume:
1. Create a test pod which mounts your volume:
```bash
kubectl create -f pod.yaml
kubectl create -f examples/pod.yaml
```
If the pod can start, everything should be working.
4. Test the mount
1. Test the mount
```bash
$ kubectl exec -ti csi-s3-test-nginx bash
@ -87,6 +87,23 @@ If something does not work as expected, check the troubleshooting section below.
## Additional configuration
### Bucket
By default, csi-s3 will create a new bucket per volume. The bucket name will match that of the volume ID. If you want your volumes to live in a precreated bucket, you can simply specify the bucket in the storage class parameters:
```yaml
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
name: csi-s3-existing-bucket
provisioner: ch.ctrox.csi.s3-driver
parameters:
mounter: rclone
bucket: some-existing-bucket-name
```
If the bucket is specified, it will still be created if it does not exist on the backend. Every volume will get its own prefix within the bucket which matches the volume ID. When deleting a volume, also just the prefix will be deleted.
### Mounter
As S3 is not a real file system there are some limitations to consider here. Depending on what mounter you are using, you will have different levels of POSIX compability. Also depending on what S3 storage backend you are using there are not always [consistency guarantees](https://github.com/gaul/are-we-consistent-yet#observed-consistency).

View file

@ -81,7 +81,8 @@ spec:
capabilities:
add: ["SYS_ADMIN"]
allowPrivilegeEscalation: true
image: ctrox/csi-s3:v1.1.1
image: ctrox/csi-s3:dev
imagePullPolicy: Never
args:
- "--endpoint=$(CSI_ENDPOINT)"
- "--nodeid=$(NODE_ID)"

View file

@ -8,6 +8,8 @@ parameters:
# specify which mounter to use
# can be set to rclone, s3fs, goofys or s3backer
mounter: rclone
# to use an existing bucket, specify it here:
# bucket: some-existing-bucket
csi.storage.k8s.io/provisioner-secret-name: csi-s3-secret
csi.storage.k8s.io/provisioner-secret-namespace: kube-system
csi.storage.k8s.io/controller-publish-secret-name: csi-s3-secret

View file

@ -87,7 +87,8 @@ spec:
- name: socket-dir
mountPath: /var/lib/kubelet/plugins/ch.ctrox.csi.s3-driver
- name: csi-s3
image: ctrox/csi-s3:v1.1.1
image: ctrox/csi-s3:dev
imagePullPolicy: Never
args:
- "--endpoint=$(CSI_ENDPOINT)"
- "--nodeid=$(NODE_ID)"

View file

@ -21,6 +21,7 @@ import (
"encoding/hex"
"fmt"
"io"
"path"
"strings"
"github.com/ctrox/csi-s3/pkg/mounter"
@ -39,15 +40,21 @@ type controllerServer struct {
}
const (
defaultFsPrefix = "csi-fs"
defaultFsPath = "csi-fs"
)
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
params := req.GetParameters()
volumeID := sanitizeVolumeID(req.GetName())
if bucketName, bucketExists := params[mounter.BucketKey]; bucketExists {
volumeID = sanitizeVolumeID(bucketName)
bucketName := volumeID
prefix := ""
// check if bucket name is overridden
if nameOverride, ok := params[mounter.BucketKey]; ok {
bucketName = nameOverride
prefix = volumeID
volumeID = path.Join(bucketName, prefix)
}
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
@ -72,53 +79,54 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
exists, err := client.BucketExists(volumeID)
exists, err := client.BucketExists(bucketName)
if err != nil {
return nil, fmt.Errorf("failed to check if bucket %s exists: %v", volumeID, err)
}
var b *s3.Bucket
var meta *s3.FSMeta
if exists {
b, err = client.GetBucket(volumeID)
meta, err = client.GetFSMeta(bucketName, prefix)
if err != nil {
glog.Warningf("Bucket %s exists, but failed to get its metadata: %v", volumeID, err)
b = &s3.Bucket{
Name: volumeID,
meta = &s3.FSMeta{
BucketName: bucketName,
Prefix: prefix,
Mounter: mounter,
CapacityBytes: capacityBytes,
FSPath: "",
FSPath: defaultFsPath,
CreatedByCsi: false,
}
} else {
// Check if volume capacity requested is bigger than the already existing capacity
if capacityBytes > b.CapacityBytes {
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", volumeID))
if capacityBytes > meta.CapacityBytes {
return nil, status.Error(
codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", volumeID),
)
}
b.Mounter = mounter
meta.Mounter = mounter
}
} else {
if err = client.CreateBucket(volumeID); err != nil {
return nil, fmt.Errorf("failed to create volume %s: %v", volumeID, err)
if err = client.CreateBucket(bucketName); err != nil {
return nil, fmt.Errorf("failed to create bucket %s: %v", bucketName, err)
}
if err = client.CreatePrefix(volumeID, defaultFsPrefix); err != nil {
return nil, fmt.Errorf("failed to create prefix %s: %v", defaultFsPrefix, err)
if err = client.CreatePrefix(bucketName, path.Join(prefix, defaultFsPath)); err != nil {
return nil, fmt.Errorf("failed to create prefix %s: %v", path.Join(prefix, defaultFsPath), err)
}
b = &s3.Bucket{
Name: volumeID,
meta = &s3.FSMeta{
BucketName: bucketName,
Prefix: prefix,
Mounter: mounter,
CapacityBytes: capacityBytes,
FSPath: defaultFsPrefix,
FSPath: defaultFsPath,
CreatedByCsi: !exists,
}
}
if err := client.SetBucket(b); err != nil {
return nil, fmt.Errorf("Error setting bucket metadata: %v", err)
if err := client.SetFSMeta(meta); err != nil {
return nil, fmt.Errorf("error setting bucket metadata: %w", err)
}
glog.V(4).Infof("create volume %s", volumeID)
s3Vol := s3Volume{}
s3Vol.VolName = volumeID
s3Vol.VolID = volumeID
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: volumeID,
@ -130,6 +138,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
volumeID := req.GetVolumeId()
bucketName, prefix := volumeIDToBucketPrefix(volumeID)
// Check arguments
if len(volumeID) == 0 {
@ -146,17 +155,22 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
exists, err := client.BucketExists(volumeID)
exists, err := client.BucketExists(bucketName)
if err != nil {
return nil, err
}
if exists {
b, err := client.GetBucket(volumeID)
meta, err := client.GetFSMeta(bucketName, prefix)
if err != nil {
return nil, fmt.Errorf("Failed to get metadata of buckect %s", volumeID)
return nil, fmt.Errorf("failed to get metadata of buckect %s", volumeID)
}
if b.CreatedByCsi {
if err := client.RemoveBucket(volumeID); err != nil {
if prefix != "" {
if err := client.RemovePrefix(bucketName, prefix); err != nil {
return nil, fmt.Errorf("unable to remove prefix: %w", err)
}
}
if meta.CreatedByCsi {
if err := client.RemoveBucket(bucketName); err != nil {
glog.V(3).Infof("Failed to remove volume %s: %v", volumeID, err)
return nil, err
}
@ -180,18 +194,25 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
if req.GetVolumeCapabilities() == nil {
return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request")
}
bucketName, prefix := volumeIDToBucketPrefix(req.GetVolumeId())
s3, err := s3.NewClientFromSecret(req.GetSecrets())
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
exists, err := s3.BucketExists(req.GetVolumeId())
exists, err := s3.BucketExists(bucketName)
if err != nil {
return nil, err
}
if !exists {
// return an error if the volume requested does not exist
return nil, status.Error(codes.NotFound, fmt.Sprintf("Volume with id %s does not exist", req.GetVolumeId()))
// return an error if the bucket of the requested volume does not exist
return nil, status.Error(codes.NotFound, fmt.Sprintf("bucket of volume with id %s does not exist", req.GetVolumeId()))
}
if _, err := s3.GetFSMeta(bucketName, prefix); err != nil {
// return an error if the fsmeta of the requested volume does not exist
return nil, status.Error(codes.NotFound, fmt.Sprintf("fsmeta of volume with id %s does not exist", req.GetVolumeId()))
}
// We currently only support RWO
@ -229,3 +250,16 @@ func sanitizeVolumeID(volumeID string) string {
}
return volumeID
}
// volumeIDToBucketPrefix returns the bucket name and prefix based on the volumeID.
// Prefix is empty if volumeID does not have a slash in the name.
func volumeIDToBucketPrefix(volumeID string) (string, string) {
// if the volumeID has a slash in it, this volume is
// stored under a certain prefix within the bucket.
splitVolumeID := strings.Split(volumeID, "/")
if len(splitVolumeID) > 1 {
return splitVolumeID[0], splitVolumeID[1]
}
return volumeID, ""
}

View file

@ -32,13 +32,6 @@ type driver struct {
cs *controllerServer
}
type s3Volume struct {
VolName string `json:"volName"`
VolID string `json:"volID"`
VolSize int64 `json:"volSize"`
VolPath string `json:"volPath"`
}
var (
vendorVersion = "v1.1.1"
driverName = "ch.ctrox.csi.s3-driver"

View file

@ -32,6 +32,33 @@ var _ = Describe("S3Driver", func() {
StagingPath: os.TempDir() + "/goofys-staging",
Address: csiEndpoint,
SecretsFile: "../../test/secret.yaml",
TestVolumeParameters: map[string]string{
"mounter": "goofys",
"bucket": "testbucket0",
},
}
sanity.GinkgoTest(sanityCfg)
})
})
Context("goofys-no-bucket", func() {
socket := "/tmp/csi-goofys-no-bucket.sock"
csiEndpoint := "unix://" + socket
if err := os.Remove(socket); err != nil && !os.IsNotExist(err) {
Expect(err).NotTo(HaveOccurred())
}
driver, err := driver.New("test-node", csiEndpoint)
if err != nil {
log.Fatal(err)
}
go driver.Run()
Describe("CSI sanity", func() {
sanityCfg := &sanity.Config{
TargetPath: os.TempDir() + "/goofys-no-bucket-target",
StagingPath: os.TempDir() + "/goofys-no-bucket-staging",
Address: csiEndpoint,
SecretsFile: "../../test/secret.yaml",
TestVolumeParameters: map[string]string{
"mounter": "goofys",
},
@ -60,6 +87,7 @@ var _ = Describe("S3Driver", func() {
SecretsFile: "../../test/secret.yaml",
TestVolumeParameters: map[string]string{
"mounter": "s3fs",
"bucket": "testbucket1",
},
}
sanity.GinkgoTest(sanityCfg)
@ -89,6 +117,7 @@ var _ = Describe("S3Driver", func() {
SecretsFile: "../../test/secret.yaml",
TestVolumeParameters: map[string]string{
"mounter": "s3backer",
"bucket": "testbucket2",
},
}
sanity.GinkgoTest(sanityCfg)
@ -116,6 +145,7 @@ var _ = Describe("S3Driver", func() {
SecretsFile: "../../test/secret.yaml",
TestVolumeParameters: map[string]string{
"mounter": "rclone",
"bucket": "testbucket3",
},
}
sanity.GinkgoTest(sanityCfg)

View file

@ -41,6 +41,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()
stagingTargetPath := req.GetStagingTargetPath()
bucketName, prefix := volumeIDToBucketPrefix(volumeID)
// Check arguments
if req.GetVolumeCapability() == nil {
@ -82,12 +83,12 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
b, err := s3.GetBucket(volumeID)
meta, err := s3.GetFSMeta(bucketName, prefix)
if err != nil {
return nil, err
}
mounter, err := mounter.New(b, s3.Config)
mounter, err := mounter.New(meta, s3.Config)
if err != nil {
return nil, err
}
@ -95,7 +96,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, err
}
glog.V(4).Infof("s3: bucket %s successfuly mounted to %s", b.Name, targetPath)
glog.V(4).Infof("s3: volume %s successfuly mounted to %s", volumeID, targetPath)
return &csi.NodePublishVolumeResponse{}, nil
}
@ -115,7 +116,7 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
if err := mounter.FuseUnmount(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
glog.V(4).Infof("s3: bucket %s has been unmounted.", volumeID)
glog.V(4).Infof("s3: volume %s has been unmounted.", volumeID)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
@ -123,6 +124,7 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
volumeID := req.GetVolumeId()
stagingTargetPath := req.GetStagingTargetPath()
bucketName, prefix := volumeIDToBucketPrefix(volumeID)
// Check arguments
if len(volumeID) == 0 {
@ -148,11 +150,11 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
b, err := client.GetBucket(volumeID)
meta, err := client.GetFSMeta(bucketName, prefix)
if err != nil {
return nil, err
}
mounter, err := mounter.New(b, client.Config)
mounter, err := mounter.New(meta, client.Config)
if err != nil {
return nil, err
}

View file

@ -3,6 +3,7 @@ package mounter
import (
"fmt"
"os"
"path"
"context"
@ -17,21 +18,21 @@ const (
// Implements Mounter
type goofysMounter struct {
bucket *s3.Bucket
meta *s3.FSMeta
endpoint string
region string
accessKeyID string
secretAccessKey string
}
func newGoofysMounter(b *s3.Bucket, cfg *s3.Config) (Mounter, error) {
func newGoofysMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) {
region := cfg.Region
// if endpoint is set we need a default region
if region == "" && cfg.Endpoint != "" {
region = defaultRegion
}
return &goofysMounter{
bucket: b,
meta: meta,
endpoint: cfg.Endpoint,
region: region,
accessKeyID: cfg.AccessKeyID,
@ -61,7 +62,7 @@ func (goofys *goofysMounter) Mount(source string, target string) error {
os.Setenv("AWS_ACCESS_KEY_ID", goofys.accessKeyID)
os.Setenv("AWS_SECRET_ACCESS_KEY", goofys.secretAccessKey)
fullPath := fmt.Sprintf("%s:%s", goofys.bucket.Name, goofys.bucket.FSPath)
fullPath := fmt.Sprintf("%s:%s", goofys.meta.BucketName, path.Join(goofys.meta.Prefix, goofys.meta.FSPath))
_, _, err := goofysApi.Mount(context.Background(), fullPath, goofysCfg)

View file

@ -34,28 +34,28 @@ const (
)
// New returns a new mounter depending on the mounterType parameter
func New(bucket *s3.Bucket, cfg *s3.Config) (Mounter, error) {
mounter := bucket.Mounter
func New(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) {
mounter := meta.Mounter
// Fall back to mounterType in cfg
if len(bucket.Mounter) == 0 {
if len(meta.Mounter) == 0 {
mounter = cfg.Mounter
}
switch mounter {
case s3fsMounterType:
return newS3fsMounter(bucket, cfg)
return newS3fsMounter(meta, cfg)
case goofysMounterType:
return newGoofysMounter(bucket, cfg)
return newGoofysMounter(meta, cfg)
case s3backerMounterType:
return newS3backerMounter(bucket, cfg)
return newS3backerMounter(meta, cfg)
case rcloneMounterType:
return newRcloneMounter(bucket, cfg)
return newRcloneMounter(meta, cfg)
default:
// default to s3backer
return newS3backerMounter(bucket, cfg)
return newS3backerMounter(meta, cfg)
}
}

View file

@ -3,13 +3,14 @@ package mounter
import (
"fmt"
"os"
"path"
"github.com/ctrox/csi-s3/pkg/s3"
)
// Implements Mounter
type rcloneMounter struct {
bucket *s3.Bucket
meta *s3.FSMeta
url string
region string
accessKeyID string
@ -20,9 +21,9 @@ const (
rcloneCmd = "rclone"
)
func newRcloneMounter(b *s3.Bucket, cfg *s3.Config) (Mounter, error) {
func newRcloneMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) {
return &rcloneMounter{
bucket: b,
meta: meta,
url: cfg.Endpoint,
region: cfg.Region,
accessKeyID: cfg.AccessKeyID,
@ -41,7 +42,7 @@ func (rclone *rcloneMounter) Unstage(stageTarget string) error {
func (rclone *rcloneMounter) Mount(source string, target string) error {
args := []string{
"mount",
fmt.Sprintf(":s3:%s/%s", rclone.bucket.Name, rclone.bucket.FSPath),
fmt.Sprintf(":s3:%s", path.Join(rclone.meta.BucketName, rclone.meta.Prefix, rclone.meta.FSPath)),
fmt.Sprintf("%s", target),
"--daemon",
"--s3-provider=AWS",

View file

@ -14,7 +14,7 @@ import (
// Implements Mounter
type s3backerMounter struct {
bucket *s3.Bucket
meta *s3.FSMeta
url string
region string
accessKeyID string
@ -33,18 +33,18 @@ const (
S3backerLoopDevice = "/dev/loop0"
)
func newS3backerMounter(bucket *s3.Bucket, cfg *s3.Config) (Mounter, error) {
func newS3backerMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) {
url, err := url.Parse(cfg.Endpoint)
if err != nil {
return nil, err
}
url.Path = path.Join(url.Path, bucket.Name, bucket.FSPath)
url.Path = path.Join(url.Path, meta.BucketName, meta.Prefix, meta.FSPath)
// s3backer cannot work with 0 size volumes
if bucket.CapacityBytes == 0 {
bucket.CapacityBytes = s3backerDefaultSize
if meta.CapacityBytes == 0 {
meta.CapacityBytes = s3backerDefaultSize
}
s3backer := &s3backerMounter{
bucket: bucket,
meta: meta,
url: cfg.Endpoint,
region: cfg.Region,
accessKeyID: cfg.AccessKeyID,
@ -56,7 +56,7 @@ func newS3backerMounter(bucket *s3.Bucket, cfg *s3.Config) (Mounter, error) {
}
func (s3backer *s3backerMounter) String() string {
return s3backer.bucket.Name
return path.Join(s3backer.meta.BucketName, s3backer.meta.Prefix)
}
func (s3backer *s3backerMounter) Stage(stageTarget string) error {
@ -94,14 +94,14 @@ func (s3backer *s3backerMounter) Mount(source string, target string) error {
return nil
}
func (s3backer *s3backerMounter) mountInit(path string) error {
func (s3backer *s3backerMounter) mountInit(p string) error {
args := []string{
fmt.Sprintf("--blockSize=%s", s3backerBlockSize),
fmt.Sprintf("--size=%v", s3backer.bucket.CapacityBytes),
fmt.Sprintf("--prefix=%s/", s3backer.bucket.FSPath),
fmt.Sprintf("--size=%v", s3backer.meta.CapacityBytes),
fmt.Sprintf("--prefix=%s/", path.Join(s3backer.meta.Prefix, s3backer.meta.FSPath)),
"--listBlocks",
s3backer.bucket.Name,
path,
s3backer.meta.BucketName,
p,
}
if s3backer.region != "" {
args = append(args, fmt.Sprintf("--region=%s", s3backer.region))
@ -114,7 +114,7 @@ func (s3backer *s3backerMounter) mountInit(path string) error {
args = append(args, "--ssl")
}
return fuseMount(path, s3backerCmd, args)
return fuseMount(p, s3backerCmd, args)
}
func (s3backer *s3backerMounter) writePasswd() error {

View file

@ -3,13 +3,14 @@ package mounter
import (
"fmt"
"os"
"path"
"github.com/ctrox/csi-s3/pkg/s3"
)
// Implements Mounter
type s3fsMounter struct {
bucket *s3.Bucket
meta *s3.FSMeta
url string
region string
pwFileContent string
@ -19,9 +20,9 @@ const (
s3fsCmd = "s3fs"
)
func newS3fsMounter(b *s3.Bucket, cfg *s3.Config) (Mounter, error) {
func newS3fsMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) {
return &s3fsMounter{
bucket: b,
meta: meta,
url: cfg.Endpoint,
region: cfg.Region,
pwFileContent: cfg.AccessKeyID + ":" + cfg.SecretAccessKey,
@ -41,8 +42,8 @@ func (s3fs *s3fsMounter) Mount(source string, target string) error {
return err
}
args := []string{
fmt.Sprintf("%s:/%s", s3fs.bucket.Name, s3fs.bucket.FSPath),
fmt.Sprintf("%s", target),
fmt.Sprintf("%s:/%s", s3fs.meta.BucketName, path.Join(s3fs.meta.Prefix, s3fs.meta.FSPath)),
target,
"-o", "use_path_request_style",
"-o", fmt.Sprintf("url=%s", s3fs.url),
"-o", fmt.Sprintf("endpoint=%s", s3fs.region),

View file

@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/url"
"path"
"github.com/golang/glog"
"github.com/minio/minio-go/v7"
@ -32,12 +33,13 @@ type Config struct {
Mounter string
}
type Bucket struct {
Name string
Mounter string
FSPath string
CapacityBytes int64
CreatedByCsi bool
type FSMeta struct {
BucketName string `json:"Name"`
Prefix string `json:"Prefix"`
Mounter string `json:"Mounter"`
FSPath string `json:"FSPath"`
CapacityBytes int64 `json:"CapacityBytes"`
CreatedByCsi bool `json:"CreatedByCsi"`
}
func NewClient(cfg *Config) (*s3Client, error) {
@ -93,17 +95,20 @@ func (client *s3Client) CreatePrefix(bucketName string, prefix string) error {
}
func (client *s3Client) RemovePrefix(bucketName string, prefix string) error {
if err := client.removeObjects(bucketName, prefix); err != nil {
return err
}
return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
}
func (client *s3Client) RemoveBucket(bucketName string) error {
if err := client.emptyBucket(bucketName); err != nil {
if err := client.removeObjects(bucketName, ""); err != nil {
return err
}
return client.minio.RemoveBucket(client.ctx, bucketName)
}
func (client *s3Client) emptyBucket(bucketName string) error {
func (client *s3Client) removeObjects(bucketName, prefix string) error {
objectsCh := make(chan minio.ObjectInfo)
var listErr error
@ -117,7 +122,7 @@ func (client *s3Client) emptyBucket(bucketName string) error {
for object := range client.minio.ListObjects(
client.ctx,
bucketName,
minio.ListObjectsOptions{Prefix: "", Recursive: true}) {
minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) {
if object.Err != nil {
listErr = object.Err
return
@ -148,31 +153,33 @@ func (client *s3Client) emptyBucket(bucketName string) error {
return nil
}
func (client *s3Client) SetBucket(bucket *Bucket) error {
func (client *s3Client) SetFSMeta(meta *FSMeta) error {
b := new(bytes.Buffer)
json.NewEncoder(b).Encode(bucket)
json.NewEncoder(b).Encode(meta)
opts := minio.PutObjectOptions{ContentType: "application/json"}
_, err := client.minio.PutObject(client.ctx, bucket.Name, metadataName, b, int64(b.Len()), opts)
_, err := client.minio.PutObject(
client.ctx, meta.BucketName, path.Join(meta.Prefix, metadataName), b, int64(b.Len()), opts,
)
return err
}
func (client *s3Client) GetBucket(bucketName string) (*Bucket, error) {
func (client *s3Client) GetFSMeta(bucketName, prefix string) (*FSMeta, error) {
opts := minio.GetObjectOptions{}
obj, err := client.minio.GetObject(client.ctx, bucketName, metadataName, opts)
obj, err := client.minio.GetObject(client.ctx, bucketName, path.Join(prefix, metadataName), opts)
if err != nil {
return &Bucket{}, err
return &FSMeta{}, err
}
objInfo, err := obj.Stat()
if err != nil {
return &Bucket{}, err
return &FSMeta{}, err
}
b := make([]byte, objInfo.Size)
_, err = obj.Read(b)
if err != nil && err != io.EOF {
return &Bucket{}, err
return &FSMeta{}, err
}
var meta Bucket
var meta FSMeta
err = json.Unmarshal(b, &meta)
return &meta, err
}