From dc987329196f8542557e382b69a646078b32c97a Mon Sep 17 00:00:00 2001
From: boxjan <boxjan@outlook.com>
Date: Thu, 8 Apr 2021 00:06:56 +0800
Subject: [PATCH] fix problem * rclone use 1.54.1 * add EnsureFSMetaExist func
 * fix removeObjects have err check * add removeObjectsOneByOne, when
 removeObjects failed, will try to use it * fix repeat in yaml

---
 cmd/s3driver/Dockerfile            |   2 +-
 cmd/s3driver/Dockerfile.full       |   2 +-
 deploy/kubernetes/attacher.yaml    |   3 -
 deploy/kubernetes/csi-s3.yaml      |   3 +-
 deploy/kubernetes/provisioner.yaml |   3 -
 pkg/driver/controllerserver.go     |  27 +++++---
 pkg/s3/client.go                   | 106 +++++++++++++++++++++++++----
 7 files changed, 115 insertions(+), 31 deletions(-)

diff --git a/cmd/s3driver/Dockerfile b/cmd/s3driver/Dockerfile
index 87fea7f..ec9e2ea 100644
--- a/cmd/s3driver/Dockerfile
+++ b/cmd/s3driver/Dockerfile
@@ -17,7 +17,7 @@ RUN apt-get update && \
   rm -rf /var/lib/apt/lists/*
 
 # install rclone
-ARG RCLONE_VERSION=v1.47.0
+ARG RCLONE_VERSION=v1.54.1
 RUN cd /tmp \
   && curl -O https://downloads.rclone.org/${RCLONE_VERSION}/rclone-${RCLONE_VERSION}-linux-amd64.zip \
   && unzip /tmp/rclone-${RCLONE_VERSION}-linux-amd64.zip \
diff --git a/cmd/s3driver/Dockerfile.full b/cmd/s3driver/Dockerfile.full
index a8da20d..7dae6ab 100644
--- a/cmd/s3driver/Dockerfile.full
+++ b/cmd/s3driver/Dockerfile.full
@@ -45,7 +45,7 @@ RUN apt-get update && \
   rm -rf /var/lib/apt/lists/*
 
 # install rclone
-ARG RCLONE_VERSION=v1.47.0
+ARG RCLONE_VERSION=v1.54.1
 RUN cd /tmp \
   && curl -O https://downloads.rclone.org/${RCLONE_VERSION}/rclone-${RCLONE_VERSION}-linux-amd64.zip \
   && unzip /tmp/rclone-${RCLONE_VERSION}-linux-amd64.zip \
diff --git a/deploy/kubernetes/attacher.yaml b/deploy/kubernetes/attacher.yaml
index bccc7f8..e587d21 100644
--- a/deploy/kubernetes/attacher.yaml
+++ b/deploy/kubernetes/attacher.yaml
@@ -59,9 +59,6 @@ metadata:
   name: csi-attacher-s3
   namespace: kube-system
 spec:
-  selector:
-    matchLabels:
-      app: "csi-attacher-s3"
   serviceName: "csi-attacher-s3"
   replicas: 1
   selector:
diff --git a/deploy/kubernetes/csi-s3.yaml b/deploy/kubernetes/csi-s3.yaml
index 8f2b376..1482cac 100644
--- a/deploy/kubernetes/csi-s3.yaml
+++ b/deploy/kubernetes/csi-s3.yaml
@@ -82,7 +82,7 @@ spec:
               add: ["SYS_ADMIN"]
             allowPrivilegeEscalation: true
           image: ctrox/csi-s3:v1.2.0-rc.1
-          imagePullPolicy: Never
+          imagePullPolicy: "Always"
           args:
             - "--endpoint=$(CSI_ENDPOINT)"
             - "--nodeid=$(NODE_ID)"
@@ -94,7 +94,6 @@ spec:
               valueFrom:
                 fieldRef:
                   fieldPath: spec.nodeName
-          imagePullPolicy: "Always"
           volumeMounts:
             - name: plugin-dir
               mountPath: /csi
diff --git a/deploy/kubernetes/provisioner.yaml b/deploy/kubernetes/provisioner.yaml
index b543458..40c9e71 100644
--- a/deploy/kubernetes/provisioner.yaml
+++ b/deploy/kubernetes/provisioner.yaml
@@ -58,9 +58,6 @@ metadata:
   name: csi-provisioner-s3
   namespace: kube-system
 spec:
-  selector:
-    matchLabels:
-      app: "csi-provisioner-s3"
   serviceName: "csi-provisioner-s3"
   replicas: 1
   selector:
diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go
index a93160c..19a3f30 100644
--- a/pkg/driver/controllerserver.go
+++ b/pkg/driver/controllerserver.go
@@ -129,6 +129,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
 func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
 	volumeID := req.GetVolumeId()
 	bucketName, prefix := volumeIDToBucketPrefix(volumeID)
+	var meta *s3.FSMeta
 
 	// Check arguments
 	if len(volumeID) == 0 {
@@ -146,25 +147,33 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
 		return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
 	}
 
-	if _, err := client.GetFSMeta(bucketName, prefix); err != nil {
+	if meta, err = client.GetFSMeta(bucketName, prefix); err != nil {
 		glog.V(5).Infof("FSMeta of volume %s does not exist, ignoring delete request", volumeID)
 		return &csi.DeleteVolumeResponse{}, nil
 	}
 
+	var deleteErr error
 	if prefix == "" {
 		// prefix is empty, we delete the whole bucket
 		if err := client.RemoveBucket(bucketName); err != nil {
-			glog.V(3).Infof("Failed to remove volume %s: %v", volumeID, err)
-			return nil, err
+			deleteErr = err
 		}
 		glog.V(4).Infof("Bucket %s removed", bucketName)
 	} else {
 		if err := client.RemovePrefix(bucketName, prefix); err != nil {
-			return nil, fmt.Errorf("unable to remove prefix: %w", err)
+			deleteErr = fmt.Errorf("unable to remove prefix: %w", err)
 		}
 		glog.V(4).Infof("Prefix %s removed", prefix)
 	}
 
+	if deleteErr != nil {
+		glog.Warning("remove volume failed, will ensure fsmeta exist, or may will lost control")
+		if err := client.EnsureFSMetaExist(meta, bucketName, prefix); err != nil {
+			glog.Error(err)
+		}
+		return nil, deleteErr
+	}
+
 	return &csi.DeleteVolumeResponse{}, nil
 }
 
@@ -178,11 +187,11 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
 	}
 	bucketName, prefix := volumeIDToBucketPrefix(req.GetVolumeId())
 
-	s3, err := s3.NewClientFromSecret(req.GetSecrets())
+	client, err := s3.NewClientFromSecret(req.GetSecrets())
 	if err != nil {
 		return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
 	}
-	exists, err := s3.BucketExists(bucketName)
+	exists, err := client.BucketExists(bucketName)
 	if err != nil {
 		return nil, err
 	}
@@ -192,7 +201,7 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
 		return nil, status.Error(codes.NotFound, fmt.Sprintf("bucket of volume with id %s does not exist", req.GetVolumeId()))
 	}
 
-	if _, err := s3.GetFSMeta(bucketName, prefix); err != nil {
+	if _, err := client.GetFSMeta(bucketName, prefix); err != nil {
 		// return an error if the fsmeta of the requested volume does not exist
 		return nil, status.Error(codes.NotFound, fmt.Sprintf("fsmeta of volume with id %s does not exist", req.GetVolumeId()))
 	}
@@ -202,8 +211,8 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
 		Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
 	}
 
-	for _, cap := range req.VolumeCapabilities {
-		if cap.GetAccessMode().GetMode() != supportedAccessMode.GetMode() {
+	for _, capability := range req.VolumeCapabilities {
+		if capability.GetAccessMode().GetMode() != supportedAccessMode.GetMode() {
 			return &csi.ValidateVolumeCapabilitiesResponse{Message: "Only single node writer is supported"}, nil
 		}
 	}
diff --git a/pkg/s3/client.go b/pkg/s3/client.go
index 6e7dadb..a6a0302 100644
--- a/pkg/s3/client.go
+++ b/pkg/s3/client.go
@@ -94,17 +94,35 @@ func (client *s3Client) CreatePrefix(bucketName string, prefix string) error {
 }
 
 func (client *s3Client) RemovePrefix(bucketName string, prefix string) error {
-	if err := client.removeObjects(bucketName, prefix); err != nil {
-		return err
+	var err error
+
+	if err = client.removeObjects(bucketName, prefix); err == nil {
+		return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
 	}
-	return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
+
+	glog.Warning("removeObjects failed with: ", err, ", will try removeObjectsOneByOne")
+
+	if err = client.removeObjectsOneByOne(bucketName, prefix); err == nil {
+		return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
+	}
+
+	return err
 }
 
 func (client *s3Client) RemoveBucket(bucketName string) error {
-	if err := client.removeObjects(bucketName, ""); err != nil {
-		return err
+	var err error
+
+	if err = client.removeObjects(bucketName, ""); err == nil {
+		return client.minio.RemoveBucket(client.ctx, bucketName)
 	}
-	return client.minio.RemoveBucket(client.ctx, bucketName)
+
+	glog.Warning("removeObjects failed with: ", err, ", will try removeObjectsOneByOne")
+
+	if err = client.removeObjectsOneByOne(bucketName, ""); err == nil {
+		return client.minio.RemoveBucket(client.ctx, bucketName)
+	}
+
+	return err
 }
 
 func (client *s3Client) removeObjects(bucketName, prefix string) error {
@@ -114,10 +132,6 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error {
 	go func() {
 		defer close(objectsCh)
 
-		doneCh := make(chan struct{})
-
-		defer close(doneCh)
-
 		for object := range client.minio.ListObjects(
 			client.ctx,
 			bucketName,
@@ -141,10 +155,14 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error {
 			GovernanceBypass: true,
 		}
 		errorCh := client.minio.RemoveObjects(client.ctx, bucketName, objectsCh, opts)
+		haveErrWhenRemoveObjects := false
 		for e := range errorCh {
-			glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
+			if e.Err.Error() != "EOF" {
+				glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
+				haveErrWhenRemoveObjects = true
+			}
 		}
-		if len(errorCh) != 0 {
+		if haveErrWhenRemoveObjects {
 			return fmt.Errorf("Failed to remove all objects of bucket %s", bucketName)
 		}
 	}
@@ -152,6 +170,60 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error {
 	return nil
 }
 
+// will delete files one by one without file lock
+func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error {
+	objectsCh := make(chan minio.ObjectInfo, 1)
+	removeErrCh := make(chan minio.RemoveObjectError, 1)
+	var listErr error
+
+	go func() {
+		defer close(objectsCh)
+
+		for object := range client.minio.ListObjects(client.ctx, bucketName,
+			minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) {
+			if object.Err != nil {
+				listErr = object.Err
+				return
+			}
+			objectsCh <- object
+		}
+	}()
+
+	if listErr != nil {
+		glog.Error("Error listing objects", listErr)
+		return listErr
+	}
+
+	go func() {
+		defer close(removeErrCh)
+
+		for object := range objectsCh {
+			err := client.minio.RemoveObject(client.ctx, bucketName, object.Key,
+				minio.RemoveObjectOptions{VersionID: object.VersionID})
+			if err != nil {
+				removeErrCh <- minio.RemoveObjectError{
+					ObjectName: object.Key,
+					VersionID:  object.VersionID,
+					Err:        err,
+				}
+			}
+		}
+	}()
+
+	haveErrWhenRemoveObjects := false
+	for e := range removeErrCh {
+		if e.Err.Error() != "EOF" {
+			glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
+			haveErrWhenRemoveObjects = true
+		}
+	}
+	if haveErrWhenRemoveObjects {
+		return fmt.Errorf("Failed to remove all objects of path %s", bucketName)
+	}
+
+	return nil
+}
+
 func (client *s3Client) SetFSMeta(meta *FSMeta) error {
 	b := new(bytes.Buffer)
 	json.NewEncoder(b).Encode(meta)
@@ -182,3 +254,13 @@ func (client *s3Client) GetFSMeta(bucketName, prefix string) (*FSMeta, error) {
 	err = json.Unmarshal(b, &meta)
 	return &meta, err
 }
+
+func (client *s3Client) EnsureFSMetaExist(meta *FSMeta, bucketName, prefix string) error {
+	if _, err := client.GetFSMeta(bucketName, prefix); err != nil {
+		glog.Warningf("%s/%s get meta failed with: %s, will set meta of it", bucketName, prefix, err)
+		if err := client.SetFSMeta(meta); err != nil {
+			return fmt.Errorf("%s/%s get meta failed with: %s, may will lost control of it", bucketName, prefix, err)
+		}
+	}
+	return nil
+}