From 07c3e2eb6e3090acdb7fa47d8127f1af5e623b32 Mon Sep 17 00:00:00 2001
From: Vitaliy Filippov <vitalif@yourcmc.ru>
Date: Fri, 16 Jul 2021 22:16:37 +0300
Subject: [PATCH] Do not store additional metadata in .metadata.json in the
 bucket itself

---
 pkg/driver/controllerserver.go | 67 +++++-----------------------------
 pkg/driver/nodeserver.go       | 47 ++++++++++++++++--------
 pkg/s3/client.go               | 34 -----------------
 3 files changed, 42 insertions(+), 106 deletions(-)

diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go
index 9d120d5..e332570 100644
--- a/pkg/driver/controllerserver.go
+++ b/pkg/driver/controllerserver.go
@@ -22,7 +22,6 @@ import (
 	"fmt"
 	"io"
 	"path"
-	"regexp"
 	"strings"
 
 	"github.com/ctrox/csi-s3/pkg/mounter"
@@ -43,24 +42,9 @@ type controllerServer struct {
 func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
 	params := req.GetParameters()
 	capacityBytes := int64(req.GetCapacityRange().GetRequiredBytes())
-	mounterType := params[mounter.TypeKey]
 	volumeID := sanitizeVolumeID(req.GetName())
 	bucketName := volumeID
 	prefix := ""
-	mountOptions := make([]string, 0)
-	mountOptStr := params[mounter.OptionsKey]
-	if mountOptStr != "" {
-		re, _ := regexp.Compile(`([^\s"]+|"([^"\\]+|\\")*")+`)
-		re2, _ := regexp.Compile(`"([^"\\]+|\\")*"`)
-		re3, _ := regexp.Compile(`\\(.)`)
-		for _, opt := range re.FindAll([]byte(mountOptStr), -1) {
-			// Unquote options
-			opt = re2.ReplaceAllFunc(opt, func(q []byte) []byte {
-				return re3.ReplaceAll(q[1 : len(q)-1], []byte("$1"))
-			})
-			mountOptions = append(mountOptions, string(opt))
-		}
-	}
 
 	// check if bucket name is overridden
 	if nameOverride, ok := params[mounter.BucketKey]; ok {
@@ -84,14 +68,6 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
 
 	glog.V(4).Infof("Got a request to create volume %s", volumeID)
 
-	meta := &s3.FSMeta{
-		BucketName:    bucketName,
-		Prefix:        prefix,
-		Mounter:       mounterType,
-		MountOptions:  mountOptions,
-		CapacityBytes: capacityBytes,
-	}
-
 	client, err := s3.NewClientFromSecret(req.GetSecrets())
 	if err != nil {
 		return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
@@ -102,18 +78,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
 		return nil, fmt.Errorf("failed to check if bucket %s exists: %v", volumeID, err)
 	}
 
-	if exists {
-		// get meta, ignore errors as it could just mean meta does not exist yet
-		m, err := client.GetFSMeta(bucketName, prefix)
-		if err == nil {
-			// Check if volume capacity requested is bigger than the already existing capacity
-			if capacityBytes > m.CapacityBytes {
-				return nil, status.Error(
-					codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", volumeID),
-				)
-			}
-		}
-	} else {
+	if !exists {
 		if err = client.CreateBucket(bucketName); err != nil {
 			return nil, fmt.Errorf("failed to create bucket %s: %v", bucketName, err)
 		}
@@ -123,16 +88,19 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
 		return nil, fmt.Errorf("failed to create prefix %s: %v", prefix, err)
 	}
 
-	if err := client.SetFSMeta(meta); err != nil {
-		return nil, fmt.Errorf("error setting bucket metadata: %w", err)
-	}
-
 	glog.V(4).Infof("create volume %s", volumeID)
+	// DeleteVolume lacks VolumeContext, but publish&unpublish requests have it,
+	// so we don't need to store additional metadata anywhere
+	context := make(map[string]string)
+	for k, v := range params {
+		context[k] = v
+	}
+	context["capacity"] = fmt.Sprintf("%v", capacityBytes)
 	return &csi.CreateVolumeResponse{
 		Volume: &csi.Volume{
 			VolumeId:      volumeID,
 			CapacityBytes: capacityBytes,
-			VolumeContext: req.GetParameters(),
+			VolumeContext: context,
 		},
 	}, nil
 }
@@ -140,7 +108,6 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
 func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
 	volumeID := req.GetVolumeId()
 	bucketName, prefix := volumeIDToBucketPrefix(volumeID)
-	var meta *s3.FSMeta
 
 	// Check arguments
 	if len(volumeID) == 0 {
@@ -158,11 +125,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
 		return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
 	}
 
-	if meta, err = client.GetFSMeta(bucketName, prefix); err != nil {
-		glog.V(5).Infof("FSMeta of volume %s does not exist, ignoring delete request", volumeID)
-		return &csi.DeleteVolumeResponse{}, nil
-	}
-
 	var deleteErr error
 	if prefix == "" {
 		// prefix is empty, we delete the whole bucket
@@ -178,10 +140,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
 	}
 
 	if deleteErr != nil {
-		glog.Warning("remove volume failed, will ensure fsmeta exists to avoid losing control over volume")
-		if err := client.SetFSMeta(meta); err != nil {
-			glog.Error(err)
-		}
 		return nil, deleteErr
 	}
 
@@ -196,7 +154,7 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
 	if req.GetVolumeCapabilities() == nil {
 		return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request")
 	}
-	bucketName, prefix := volumeIDToBucketPrefix(req.GetVolumeId())
+	bucketName, _ := volumeIDToBucketPrefix(req.GetVolumeId())
 
 	client, err := s3.NewClientFromSecret(req.GetSecrets())
 	if err != nil {
@@ -212,11 +170,6 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
 		return nil, status.Error(codes.NotFound, fmt.Sprintf("bucket of volume with id %s does not exist", req.GetVolumeId()))
 	}
 
-	if _, err := client.GetFSMeta(bucketName, prefix); err != nil {
-		// return an error if the fsmeta of the requested volume does not exist
-		return nil, status.Error(codes.NotFound, fmt.Sprintf("fsmeta of volume with id %s does not exist", req.GetVolumeId()))
-	}
-
 	// We currently only support RWO
 	supportedAccessMode := &csi.VolumeCapability_AccessMode{
 		Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go
index e6e733d..4ad76a2 100644
--- a/pkg/driver/nodeserver.go
+++ b/pkg/driver/nodeserver.go
@@ -19,6 +19,8 @@ package driver
 import (
 	"fmt"
 	"os"
+	"regexp"
+	"strconv"
 
 	"github.com/ctrox/csi-s3/pkg/mounter"
 	"github.com/ctrox/csi-s3/pkg/s3"
@@ -37,6 +39,31 @@ type nodeServer struct {
 	*csicommon.DefaultNodeServer
 }
 
+func getMeta(bucketName, prefix string, context map[string]string) *s3.FSMeta {
+	mountOptions := make([]string, 0)
+	mountOptStr := context[mounter.OptionsKey]
+	if mountOptStr != "" {
+		re, _ := regexp.Compile(`([^\s"]+|"([^"\\]+|\\")*")+`)
+		re2, _ := regexp.Compile(`"([^"\\]+|\\")*"`)
+		re3, _ := regexp.Compile(`\\(.)`)
+		for _, opt := range re.FindAll([]byte(mountOptStr), -1) {
+			// Unquote options
+			opt = re2.ReplaceAllFunc(opt, func(q []byte) []byte {
+				return re3.ReplaceAll(q[1 : len(q)-1], []byte("$1"))
+			})
+			mountOptions = append(mountOptions, string(opt))
+		}
+	}
+	capacity, _ := strconv.ParseInt(context["capacity"], 10, 64)
+	return &s3.FSMeta{
+		BucketName:    bucketName,
+		Prefix:        prefix,
+		Mounter:       context[mounter.TypeKey],
+		MountOptions:  mountOptions,
+		CapacityBytes: capacity,
+	}
+}
+
 func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
 	volumeID := req.GetVolumeId()
 	targetPath := req.GetTargetPath()
@@ -65,29 +92,21 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
 		return &csi.NodePublishVolumeResponse{}, nil
 	}
 
-	deviceID := ""
-	if req.GetPublishContext() != nil {
-		deviceID = req.GetPublishContext()[deviceID]
-	}
-
 	// TODO: Implement readOnly & mountFlags
 	readOnly := req.GetReadonly()
 	// TODO: check if attrib is correct with context.
 	attrib := req.GetVolumeContext()
 	mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
 
-	glog.V(4).Infof("target %v\ndevice %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n",
-		targetPath, deviceID, readOnly, volumeID, attrib, mountFlags)
+	glog.V(4).Infof("target %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n",
+		targetPath, readOnly, volumeID, attrib, mountFlags)
 
 	s3, err := s3.NewClientFromSecret(req.GetSecrets())
 	if err != nil {
 		return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
 	}
-	meta, err := s3.GetFSMeta(bucketName, prefix)
-	if err != nil {
-		return nil, err
-	}
 
+	meta := getMeta(bucketName, prefix, req.VolumeContext)
 	mounter, err := mounter.New(meta, s3.Config)
 	if err != nil {
 		return nil, err
@@ -150,10 +169,8 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
 	if err != nil {
 		return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
 	}
-	meta, err := client.GetFSMeta(bucketName, prefix)
-	if err != nil {
-		return nil, err
-	}
+
+	meta := getMeta(bucketName, prefix, req.VolumeContext)
 	mounter, err := mounter.New(meta, client.Config)
 	if err != nil {
 		return nil, err
diff --git a/pkg/s3/client.go b/pkg/s3/client.go
index 340bde6..17976a3 100644
--- a/pkg/s3/client.go
+++ b/pkg/s3/client.go
@@ -3,11 +3,8 @@ package s3
 import (
 	"bytes"
 	"context"
-	"encoding/json"
 	"fmt"
-	"io"
 	"net/url"
-	"path"
 
 	"github.com/golang/glog"
 	"github.com/minio/minio-go/v7"
@@ -221,34 +218,3 @@ func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error {
 
 	return nil
 }
-
-func (client *s3Client) SetFSMeta(meta *FSMeta) error {
-	b := new(bytes.Buffer)
-	json.NewEncoder(b).Encode(meta)
-	opts := minio.PutObjectOptions{ContentType: "application/json"}
-	_, err := client.minio.PutObject(
-		client.ctx, meta.BucketName, path.Join(meta.Prefix, metadataName), b, int64(b.Len()), opts,
-	)
-	return err
-}
-
-func (client *s3Client) GetFSMeta(bucketName, prefix string) (*FSMeta, error) {
-	opts := minio.GetObjectOptions{}
-	obj, err := client.minio.GetObject(client.ctx, bucketName, path.Join(prefix, metadataName), opts)
-	if err != nil {
-		return &FSMeta{}, err
-	}
-	objInfo, err := obj.Stat()
-	if err != nil {
-		return &FSMeta{}, err
-	}
-	b := make([]byte, objInfo.Size)
-	_, err = obj.Read(b)
-
-	if err != nil && err != io.EOF {
-		return &FSMeta{}, err
-	}
-	var meta FSMeta
-	err = json.Unmarshal(b, &meta)
-	return &meta, err
-}