Use volume ID as a prefix if the bucket is fixed in the storage class

With this, each volume will get its own prefix within the bucket if it
is configured in the storage class. This also ensures backwards
compatibility with older volumes that have been created in earlier
versions of csi-s3.
This commit is contained in:
Cyrill Troxler 2021-04-05 15:07:16 +02:00
parent 1b154cd051
commit 9ee2e2c977
17 changed files with 195 additions and 117 deletions
pkg/s3

View file

@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/url"
"path"
"github.com/golang/glog"
"github.com/minio/minio-go/v7"
@ -32,12 +33,13 @@ type Config struct {
Mounter string
}
type Bucket struct {
Name string
Mounter string
FSPath string
CapacityBytes int64
CreatedByCsi bool
type FSMeta struct {
BucketName string `json:"Name"`
Prefix string `json:"Prefix"`
Mounter string `json:"Mounter"`
FSPath string `json:"FSPath"`
CapacityBytes int64 `json:"CapacityBytes"`
CreatedByCsi bool `json:"CreatedByCsi"`
}
func NewClient(cfg *Config) (*s3Client, error) {
@ -93,17 +95,20 @@ func (client *s3Client) CreatePrefix(bucketName string, prefix string) error {
}
func (client *s3Client) RemovePrefix(bucketName string, prefix string) error {
if err := client.removeObjects(bucketName, prefix); err != nil {
return err
}
return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
}
func (client *s3Client) RemoveBucket(bucketName string) error {
if err := client.emptyBucket(bucketName); err != nil {
if err := client.removeObjects(bucketName, ""); err != nil {
return err
}
return client.minio.RemoveBucket(client.ctx, bucketName)
}
func (client *s3Client) emptyBucket(bucketName string) error {
func (client *s3Client) removeObjects(bucketName, prefix string) error {
objectsCh := make(chan minio.ObjectInfo)
var listErr error
@ -117,7 +122,7 @@ func (client *s3Client) emptyBucket(bucketName string) error {
for object := range client.minio.ListObjects(
client.ctx,
bucketName,
minio.ListObjectsOptions{Prefix: "", Recursive: true}) {
minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) {
if object.Err != nil {
listErr = object.Err
return
@ -148,31 +153,33 @@ func (client *s3Client) emptyBucket(bucketName string) error {
return nil
}
func (client *s3Client) SetBucket(bucket *Bucket) error {
func (client *s3Client) SetFSMeta(meta *FSMeta) error {
b := new(bytes.Buffer)
json.NewEncoder(b).Encode(bucket)
json.NewEncoder(b).Encode(meta)
opts := minio.PutObjectOptions{ContentType: "application/json"}
_, err := client.minio.PutObject(client.ctx, bucket.Name, metadataName, b, int64(b.Len()), opts)
_, err := client.minio.PutObject(
client.ctx, meta.BucketName, path.Join(meta.Prefix, metadataName), b, int64(b.Len()), opts,
)
return err
}
func (client *s3Client) GetBucket(bucketName string) (*Bucket, error) {
func (client *s3Client) GetFSMeta(bucketName, prefix string) (*FSMeta, error) {
opts := minio.GetObjectOptions{}
obj, err := client.minio.GetObject(client.ctx, bucketName, metadataName, opts)
obj, err := client.minio.GetObject(client.ctx, bucketName, path.Join(prefix, metadataName), opts)
if err != nil {
return &Bucket{}, err
return &FSMeta{}, err
}
objInfo, err := obj.Stat()
if err != nil {
return &Bucket{}, err
return &FSMeta{}, err
}
b := make([]byte, objInfo.Size)
_, err = obj.Read(b)
if err != nil && err != io.EOF {
return &Bucket{}, err
return &FSMeta{}, err
}
var meta Bucket
var meta FSMeta
err = json.Unmarshal(b, &meta)
return &meta, err
}