From 2089c400019165516c6ec3983ea882e678a0a909 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 16 Jul 2021 22:16:37 +0300 Subject: [PATCH] Do not store additional metadata in .metadata.json in the bucket itself --- pkg/driver/controllerserver.go | 67 +++++----------------------------- pkg/driver/nodeserver.go | 47 ++++++++++++++++-------- pkg/s3/client.go | 34 ----------------- 3 files changed, 42 insertions(+), 106 deletions(-) diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go index 9d120d5..e332570 100644 --- a/pkg/driver/controllerserver.go +++ b/pkg/driver/controllerserver.go @@ -22,7 +22,6 @@ import ( "fmt" "io" "path" - "regexp" "strings" "github.com/ctrox/csi-s3/pkg/mounter" @@ -43,24 +42,9 @@ type controllerServer struct { func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { params := req.GetParameters() capacityBytes := int64(req.GetCapacityRange().GetRequiredBytes()) - mounterType := params[mounter.TypeKey] volumeID := sanitizeVolumeID(req.GetName()) bucketName := volumeID prefix := "" - mountOptions := make([]string, 0) - mountOptStr := params[mounter.OptionsKey] - if mountOptStr != "" { - re, _ := regexp.Compile(`([^\s"]+|"([^"\\]+|\\")*")+`) - re2, _ := regexp.Compile(`"([^"\\]+|\\")*"`) - re3, _ := regexp.Compile(`\\(.)`) - for _, opt := range re.FindAll([]byte(mountOptStr), -1) { - // Unquote options - opt = re2.ReplaceAllFunc(opt, func(q []byte) []byte { - return re3.ReplaceAll(q[1 : len(q)-1], []byte("$1")) - }) - mountOptions = append(mountOptions, string(opt)) - } - } // check if bucket name is overridden if nameOverride, ok := params[mounter.BucketKey]; ok { @@ -84,14 +68,6 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol glog.V(4).Infof("Got a request to create volume %s", volumeID) - meta := &s3.FSMeta{ - BucketName: bucketName, - Prefix: prefix, - Mounter: mounterType, - MountOptions: mountOptions, - CapacityBytes: capacityBytes, - } - client, err := s3.NewClientFromSecret(req.GetSecrets()) if err != nil { return nil, fmt.Errorf("failed to initialize S3 client: %s", err) @@ -102,18 +78,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, fmt.Errorf("failed to check if bucket %s exists: %v", volumeID, err) } - if exists { - // get meta, ignore errors as it could just mean meta does not exist yet - m, err := client.GetFSMeta(bucketName, prefix) - if err == nil { - // Check if volume capacity requested is bigger than the already existing capacity - if capacityBytes > m.CapacityBytes { - return nil, status.Error( - codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", volumeID), - ) - } - } - } else { + if !exists { if err = client.CreateBucket(bucketName); err != nil { return nil, fmt.Errorf("failed to create bucket %s: %v", bucketName, err) } @@ -123,16 +88,19 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, fmt.Errorf("failed to create prefix %s: %v", prefix, err) } - if err := client.SetFSMeta(meta); err != nil { - return nil, fmt.Errorf("error setting bucket metadata: %w", err) - } - glog.V(4).Infof("create volume %s", volumeID) + // DeleteVolume lacks VolumeContext, but publish&unpublish requests have it, + // so we don't need to store additional metadata anywhere + context := make(map[string]string) + for k, v := range params { + context[k] = v + } + context["capacity"] = fmt.Sprintf("%v", capacityBytes) return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ VolumeId: volumeID, CapacityBytes: capacityBytes, - VolumeContext: req.GetParameters(), + VolumeContext: context, }, }, nil } @@ -140,7 +108,6 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { volumeID := req.GetVolumeId() bucketName, prefix := volumeIDToBucketPrefix(volumeID) - var meta *s3.FSMeta // Check arguments if len(volumeID) == 0 { @@ -158,11 +125,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return nil, fmt.Errorf("failed to initialize S3 client: %s", err) } - if meta, err = client.GetFSMeta(bucketName, prefix); err != nil { - glog.V(5).Infof("FSMeta of volume %s does not exist, ignoring delete request", volumeID) - return &csi.DeleteVolumeResponse{}, nil - } - var deleteErr error if prefix == "" { // prefix is empty, we delete the whole bucket @@ -178,10 +140,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } if deleteErr != nil { - glog.Warning("remove volume failed, will ensure fsmeta exists to avoid losing control over volume") - if err := client.SetFSMeta(meta); err != nil { - glog.Error(err) - } return nil, deleteErr } @@ -196,7 +154,7 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req if req.GetVolumeCapabilities() == nil { return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request") } - bucketName, prefix := volumeIDToBucketPrefix(req.GetVolumeId()) + bucketName, _ := volumeIDToBucketPrefix(req.GetVolumeId()) client, err := s3.NewClientFromSecret(req.GetSecrets()) if err != nil { @@ -212,11 +170,6 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req return nil, status.Error(codes.NotFound, fmt.Sprintf("bucket of volume with id %s does not exist", req.GetVolumeId())) } - if _, err := client.GetFSMeta(bucketName, prefix); err != nil { - // return an error if the fsmeta of the requested volume does not exist - return nil, status.Error(codes.NotFound, fmt.Sprintf("fsmeta of volume with id %s does not exist", req.GetVolumeId())) - } - // We currently only support RWO supportedAccessMode := &csi.VolumeCapability_AccessMode{ Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go index e6e733d..4ad76a2 100644 --- a/pkg/driver/nodeserver.go +++ b/pkg/driver/nodeserver.go @@ -19,6 +19,8 @@ package driver import ( "fmt" "os" + "regexp" + "strconv" "github.com/ctrox/csi-s3/pkg/mounter" "github.com/ctrox/csi-s3/pkg/s3" @@ -37,6 +39,31 @@ type nodeServer struct { *csicommon.DefaultNodeServer } +func getMeta(bucketName, prefix string, context map[string]string) *s3.FSMeta { + mountOptions := make([]string, 0) + mountOptStr := context[mounter.OptionsKey] + if mountOptStr != "" { + re, _ := regexp.Compile(`([^\s"]+|"([^"\\]+|\\")*")+`) + re2, _ := regexp.Compile(`"([^"\\]+|\\")*"`) + re3, _ := regexp.Compile(`\\(.)`) + for _, opt := range re.FindAll([]byte(mountOptStr), -1) { + // Unquote options + opt = re2.ReplaceAllFunc(opt, func(q []byte) []byte { + return re3.ReplaceAll(q[1 : len(q)-1], []byte("$1")) + }) + mountOptions = append(mountOptions, string(opt)) + } + } + capacity, _ := strconv.ParseInt(context["capacity"], 10, 64) + return &s3.FSMeta{ + BucketName: bucketName, + Prefix: prefix, + Mounter: context[mounter.TypeKey], + MountOptions: mountOptions, + CapacityBytes: capacity, + } +} + func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { volumeID := req.GetVolumeId() targetPath := req.GetTargetPath() @@ -65,29 +92,21 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis return &csi.NodePublishVolumeResponse{}, nil } - deviceID := "" - if req.GetPublishContext() != nil { - deviceID = req.GetPublishContext()[deviceID] - } - // TODO: Implement readOnly & mountFlags readOnly := req.GetReadonly() // TODO: check if attrib is correct with context. attrib := req.GetVolumeContext() mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags() - glog.V(4).Infof("target %v\ndevice %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n", - targetPath, deviceID, readOnly, volumeID, attrib, mountFlags) + glog.V(4).Infof("target %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n", + targetPath, readOnly, volumeID, attrib, mountFlags) s3, err := s3.NewClientFromSecret(req.GetSecrets()) if err != nil { return nil, fmt.Errorf("failed to initialize S3 client: %s", err) } - meta, err := s3.GetFSMeta(bucketName, prefix) - if err != nil { - return nil, err - } + meta := getMeta(bucketName, prefix, req.VolumeContext) mounter, err := mounter.New(meta, s3.Config) if err != nil { return nil, err @@ -150,10 +169,8 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol if err != nil { return nil, fmt.Errorf("failed to initialize S3 client: %s", err) } - meta, err := client.GetFSMeta(bucketName, prefix) - if err != nil { - return nil, err - } + + meta := getMeta(bucketName, prefix, req.VolumeContext) mounter, err := mounter.New(meta, client.Config) if err != nil { return nil, err diff --git a/pkg/s3/client.go b/pkg/s3/client.go index 340bde6..17976a3 100644 --- a/pkg/s3/client.go +++ b/pkg/s3/client.go @@ -3,11 +3,8 @@ package s3 import ( "bytes" "context" - "encoding/json" "fmt" - "io" "net/url" - "path" "github.com/golang/glog" "github.com/minio/minio-go/v7" @@ -221,34 +218,3 @@ func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error { return nil } - -func (client *s3Client) SetFSMeta(meta *FSMeta) error { - b := new(bytes.Buffer) - json.NewEncoder(b).Encode(meta) - opts := minio.PutObjectOptions{ContentType: "application/json"} - _, err := client.minio.PutObject( - client.ctx, meta.BucketName, path.Join(meta.Prefix, metadataName), b, int64(b.Len()), opts, - ) - return err -} - -func (client *s3Client) GetFSMeta(bucketName, prefix string) (*FSMeta, error) { - opts := minio.GetObjectOptions{} - obj, err := client.minio.GetObject(client.ctx, bucketName, path.Join(prefix, metadataName), opts) - if err != nil { - return &FSMeta{}, err - } - objInfo, err := obj.Stat() - if err != nil { - return &FSMeta{}, err - } - b := make([]byte, objInfo.Size) - _, err = obj.Read(b) - - if err != nil && err != io.EOF { - return &FSMeta{}, err - } - var meta FSMeta - err = json.Unmarshal(b, &meta) - return &meta, err -}