From dc987329196f8542557e382b69a646078b32c97a Mon Sep 17 00:00:00 2001
From: boxjan <boxjan@outlook.com>
Date: Thu, 8 Apr 2021 00:06:56 +0800
Subject: [PATCH 1/3] fix problem * rclone use 1.54.1 * add EnsureFSMetaExist
 func * fix removeObjects have err check * add removeObjectsOneByOne, when
 removeObjects failed, will try to use it * fix repeat in yaml

---
 cmd/s3driver/Dockerfile            |   2 +-
 cmd/s3driver/Dockerfile.full       |   2 +-
 deploy/kubernetes/attacher.yaml    |   3 -
 deploy/kubernetes/csi-s3.yaml      |   3 +-
 deploy/kubernetes/provisioner.yaml |   3 -
 pkg/driver/controllerserver.go     |  27 +++++---
 pkg/s3/client.go                   | 106 +++++++++++++++++++++++++----
 7 files changed, 115 insertions(+), 31 deletions(-)

diff --git a/cmd/s3driver/Dockerfile b/cmd/s3driver/Dockerfile
index 87fea7f..ec9e2ea 100644
--- a/cmd/s3driver/Dockerfile
+++ b/cmd/s3driver/Dockerfile
@@ -17,7 +17,7 @@ RUN apt-get update && \
   rm -rf /var/lib/apt/lists/*
 
 # install rclone
-ARG RCLONE_VERSION=v1.47.0
+ARG RCLONE_VERSION=v1.54.1
 RUN cd /tmp \
   && curl -O https://downloads.rclone.org/${RCLONE_VERSION}/rclone-${RCLONE_VERSION}-linux-amd64.zip \
   && unzip /tmp/rclone-${RCLONE_VERSION}-linux-amd64.zip \
diff --git a/cmd/s3driver/Dockerfile.full b/cmd/s3driver/Dockerfile.full
index a8da20d..7dae6ab 100644
--- a/cmd/s3driver/Dockerfile.full
+++ b/cmd/s3driver/Dockerfile.full
@@ -45,7 +45,7 @@ RUN apt-get update && \
   rm -rf /var/lib/apt/lists/*
 
 # install rclone
-ARG RCLONE_VERSION=v1.47.0
+ARG RCLONE_VERSION=v1.54.1
 RUN cd /tmp \
   && curl -O https://downloads.rclone.org/${RCLONE_VERSION}/rclone-${RCLONE_VERSION}-linux-amd64.zip \
   && unzip /tmp/rclone-${RCLONE_VERSION}-linux-amd64.zip \
diff --git a/deploy/kubernetes/attacher.yaml b/deploy/kubernetes/attacher.yaml
index bccc7f8..e587d21 100644
--- a/deploy/kubernetes/attacher.yaml
+++ b/deploy/kubernetes/attacher.yaml
@@ -59,9 +59,6 @@ metadata:
   name: csi-attacher-s3
   namespace: kube-system
 spec:
-  selector:
-    matchLabels:
-      app: "csi-attacher-s3"
   serviceName: "csi-attacher-s3"
   replicas: 1
   selector:
diff --git a/deploy/kubernetes/csi-s3.yaml b/deploy/kubernetes/csi-s3.yaml
index 8f2b376..1482cac 100644
--- a/deploy/kubernetes/csi-s3.yaml
+++ b/deploy/kubernetes/csi-s3.yaml
@@ -82,7 +82,7 @@ spec:
               add: ["SYS_ADMIN"]
             allowPrivilegeEscalation: true
           image: ctrox/csi-s3:v1.2.0-rc.1
-          imagePullPolicy: Never
+          imagePullPolicy: "Always"
           args:
             - "--endpoint=$(CSI_ENDPOINT)"
             - "--nodeid=$(NODE_ID)"
@@ -94,7 +94,6 @@ spec:
               valueFrom:
                 fieldRef:
                   fieldPath: spec.nodeName
-          imagePullPolicy: "Always"
           volumeMounts:
             - name: plugin-dir
               mountPath: /csi
diff --git a/deploy/kubernetes/provisioner.yaml b/deploy/kubernetes/provisioner.yaml
index b543458..40c9e71 100644
--- a/deploy/kubernetes/provisioner.yaml
+++ b/deploy/kubernetes/provisioner.yaml
@@ -58,9 +58,6 @@ metadata:
   name: csi-provisioner-s3
   namespace: kube-system
 spec:
-  selector:
-    matchLabels:
-      app: "csi-provisioner-s3"
   serviceName: "csi-provisioner-s3"
   replicas: 1
   selector:
diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go
index a93160c..19a3f30 100644
--- a/pkg/driver/controllerserver.go
+++ b/pkg/driver/controllerserver.go
@@ -129,6 +129,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
 func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
 	volumeID := req.GetVolumeId()
 	bucketName, prefix := volumeIDToBucketPrefix(volumeID)
+	var meta *s3.FSMeta
 
 	// Check arguments
 	if len(volumeID) == 0 {
@@ -146,25 +147,33 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
 		return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
 	}
 
-	if _, err := client.GetFSMeta(bucketName, prefix); err != nil {
+	if meta, err = client.GetFSMeta(bucketName, prefix); err != nil {
 		glog.V(5).Infof("FSMeta of volume %s does not exist, ignoring delete request", volumeID)
 		return &csi.DeleteVolumeResponse{}, nil
 	}
 
+	var deleteErr error
 	if prefix == "" {
 		// prefix is empty, we delete the whole bucket
 		if err := client.RemoveBucket(bucketName); err != nil {
-			glog.V(3).Infof("Failed to remove volume %s: %v", volumeID, err)
-			return nil, err
+			deleteErr = err
 		}
 		glog.V(4).Infof("Bucket %s removed", bucketName)
 	} else {
 		if err := client.RemovePrefix(bucketName, prefix); err != nil {
-			return nil, fmt.Errorf("unable to remove prefix: %w", err)
+			deleteErr = fmt.Errorf("unable to remove prefix: %w", err)
 		}
 		glog.V(4).Infof("Prefix %s removed", prefix)
 	}
 
+	if deleteErr != nil {
+		glog.Warning("remove volume failed, will ensure fsmeta exist, or may will lost control")
+		if err := client.EnsureFSMetaExist(meta, bucketName, prefix); err != nil {
+			glog.Error(err)
+		}
+		return nil, deleteErr
+	}
+
 	return &csi.DeleteVolumeResponse{}, nil
 }
 
@@ -178,11 +187,11 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
 	}
 	bucketName, prefix := volumeIDToBucketPrefix(req.GetVolumeId())
 
-	s3, err := s3.NewClientFromSecret(req.GetSecrets())
+	client, err := s3.NewClientFromSecret(req.GetSecrets())
 	if err != nil {
 		return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
 	}
-	exists, err := s3.BucketExists(bucketName)
+	exists, err := client.BucketExists(bucketName)
 	if err != nil {
 		return nil, err
 	}
@@ -192,7 +201,7 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
 		return nil, status.Error(codes.NotFound, fmt.Sprintf("bucket of volume with id %s does not exist", req.GetVolumeId()))
 	}
 
-	if _, err := s3.GetFSMeta(bucketName, prefix); err != nil {
+	if _, err := client.GetFSMeta(bucketName, prefix); err != nil {
 		// return an error if the fsmeta of the requested volume does not exist
 		return nil, status.Error(codes.NotFound, fmt.Sprintf("fsmeta of volume with id %s does not exist", req.GetVolumeId()))
 	}
@@ -202,8 +211,8 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
 		Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
 	}
 
-	for _, cap := range req.VolumeCapabilities {
-		if cap.GetAccessMode().GetMode() != supportedAccessMode.GetMode() {
+	for _, capability := range req.VolumeCapabilities {
+		if capability.GetAccessMode().GetMode() != supportedAccessMode.GetMode() {
 			return &csi.ValidateVolumeCapabilitiesResponse{Message: "Only single node writer is supported"}, nil
 		}
 	}
diff --git a/pkg/s3/client.go b/pkg/s3/client.go
index 6e7dadb..a6a0302 100644
--- a/pkg/s3/client.go
+++ b/pkg/s3/client.go
@@ -94,17 +94,35 @@ func (client *s3Client) CreatePrefix(bucketName string, prefix string) error {
 }
 
 func (client *s3Client) RemovePrefix(bucketName string, prefix string) error {
-	if err := client.removeObjects(bucketName, prefix); err != nil {
-		return err
+	var err error
+
+	if err = client.removeObjects(bucketName, prefix); err == nil {
+		return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
 	}
-	return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
+
+	glog.Warning("removeObjects failed with: ", err, ", will try removeObjectsOneByOne")
+
+	if err = client.removeObjectsOneByOne(bucketName, prefix); err == nil {
+		return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
+	}
+
+	return err
 }
 
 func (client *s3Client) RemoveBucket(bucketName string) error {
-	if err := client.removeObjects(bucketName, ""); err != nil {
-		return err
+	var err error
+
+	if err = client.removeObjects(bucketName, ""); err == nil {
+		return client.minio.RemoveBucket(client.ctx, bucketName)
 	}
-	return client.minio.RemoveBucket(client.ctx, bucketName)
+
+	glog.Warning("removeObjects failed with: ", err, ", will try removeObjectsOneByOne")
+
+	if err = client.removeObjectsOneByOne(bucketName, ""); err == nil {
+		return client.minio.RemoveBucket(client.ctx, bucketName)
+	}
+
+	return err
 }
 
 func (client *s3Client) removeObjects(bucketName, prefix string) error {
@@ -114,10 +132,6 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error {
 	go func() {
 		defer close(objectsCh)
 
-		doneCh := make(chan struct{})
-
-		defer close(doneCh)
-
 		for object := range client.minio.ListObjects(
 			client.ctx,
 			bucketName,
@@ -141,10 +155,14 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error {
 			GovernanceBypass: true,
 		}
 		errorCh := client.minio.RemoveObjects(client.ctx, bucketName, objectsCh, opts)
+		haveErrWhenRemoveObjects := false
 		for e := range errorCh {
-			glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
+			if e.Err.Error() != "EOF" {
+				glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
+				haveErrWhenRemoveObjects = true
+			}
 		}
-		if len(errorCh) != 0 {
+		if haveErrWhenRemoveObjects {
 			return fmt.Errorf("Failed to remove all objects of bucket %s", bucketName)
 		}
 	}
@@ -152,6 +170,60 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error {
 	return nil
 }
 
+// will delete files one by one without file lock
+func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error {
+	objectsCh := make(chan minio.ObjectInfo, 1)
+	removeErrCh := make(chan minio.RemoveObjectError, 1)
+	var listErr error
+
+	go func() {
+		defer close(objectsCh)
+
+		for object := range client.minio.ListObjects(client.ctx, bucketName,
+			minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) {
+			if object.Err != nil {
+				listErr = object.Err
+				return
+			}
+			objectsCh <- object
+		}
+	}()
+
+	if listErr != nil {
+		glog.Error("Error listing objects", listErr)
+		return listErr
+	}
+
+	go func() {
+		defer close(removeErrCh)
+
+		for object := range objectsCh {
+			err := client.minio.RemoveObject(client.ctx, bucketName, object.Key,
+				minio.RemoveObjectOptions{VersionID: object.VersionID})
+			if err != nil {
+				removeErrCh <- minio.RemoveObjectError{
+					ObjectName: object.Key,
+					VersionID:  object.VersionID,
+					Err:        err,
+				}
+			}
+		}
+	}()
+
+	haveErrWhenRemoveObjects := false
+	for e := range removeErrCh {
+		if e.Err.Error() != "EOF" {
+			glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
+			haveErrWhenRemoveObjects = true
+		}
+	}
+	if haveErrWhenRemoveObjects {
+		return fmt.Errorf("Failed to remove all objects of path %s", bucketName)
+	}
+
+	return nil
+}
+
 func (client *s3Client) SetFSMeta(meta *FSMeta) error {
 	b := new(bytes.Buffer)
 	json.NewEncoder(b).Encode(meta)
@@ -182,3 +254,13 @@ func (client *s3Client) GetFSMeta(bucketName, prefix string) (*FSMeta, error) {
 	err = json.Unmarshal(b, &meta)
 	return &meta, err
 }
+
+func (client *s3Client) EnsureFSMetaExist(meta *FSMeta, bucketName, prefix string) error {
+	if _, err := client.GetFSMeta(bucketName, prefix); err != nil {
+		glog.Warningf("%s/%s get meta failed with: %s, will set meta of it", bucketName, prefix, err)
+		if err := client.SetFSMeta(meta); err != nil {
+			return fmt.Errorf("%s/%s get meta failed with: %s, may will lost control of it", bucketName, prefix, err)
+		}
+	}
+	return nil
+}

From e4116a33dabd1c74ab278e24124729b5328ff9bf Mon Sep 17 00:00:00 2001
From: boxjan <boxjan@outlook.com>
Date: Thu, 8 Apr 2021 22:35:31 +0800
Subject: [PATCH 2/3] fix problem: log format; fs meta;

---
 pkg/driver/controllerserver.go |  4 ++--
 pkg/s3/client.go               | 14 ++------------
 2 files changed, 4 insertions(+), 14 deletions(-)

diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go
index 19a3f30..319c444 100644
--- a/pkg/driver/controllerserver.go
+++ b/pkg/driver/controllerserver.go
@@ -167,8 +167,8 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
 	}
 
 	if deleteErr != nil {
-		glog.Warning("remove volume failed, will ensure fsmeta exist, or may will lost control")
-		if err := client.EnsureFSMetaExist(meta, bucketName, prefix); err != nil {
+		glog.Warning("remove volume failed, will ensure fsmeta exists to avoid losing control over volume")
+		if err := client.SetFSMeta(meta); err != nil {
 			glog.Error(err)
 		}
 		return nil, deleteErr
diff --git a/pkg/s3/client.go b/pkg/s3/client.go
index a6a0302..5d06196 100644
--- a/pkg/s3/client.go
+++ b/pkg/s3/client.go
@@ -100,7 +100,7 @@ func (client *s3Client) RemovePrefix(bucketName string, prefix string) error {
 		return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
 	}
 
-	glog.Warning("removeObjects failed with: ", err, ", will try removeObjectsOneByOne")
+	glog.Warningf("removeObjects failed with: %s, will try removeObjectsOneByOne", err)
 
 	if err = client.removeObjectsOneByOne(bucketName, prefix); err == nil {
 		return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
@@ -116,7 +116,7 @@ func (client *s3Client) RemoveBucket(bucketName string) error {
 		return client.minio.RemoveBucket(client.ctx, bucketName)
 	}
 
-	glog.Warning("removeObjects failed with: ", err, ", will try removeObjectsOneByOne")
+	glog.Warningf("removeObjects failed with: %s, will try removeObjectsOneByOne", err)
 
 	if err = client.removeObjectsOneByOne(bucketName, ""); err == nil {
 		return client.minio.RemoveBucket(client.ctx, bucketName)
@@ -254,13 +254,3 @@ func (client *s3Client) GetFSMeta(bucketName, prefix string) (*FSMeta, error) {
 	err = json.Unmarshal(b, &meta)
 	return &meta, err
 }
-
-func (client *s3Client) EnsureFSMetaExist(meta *FSMeta, bucketName, prefix string) error {
-	if _, err := client.GetFSMeta(bucketName, prefix); err != nil {
-		glog.Warningf("%s/%s get meta failed with: %s, will set meta of it", bucketName, prefix, err)
-		if err := client.SetFSMeta(meta); err != nil {
-			return fmt.Errorf("%s/%s get meta failed with: %s, may will lost control of it", bucketName, prefix, err)
-		}
-	}
-	return nil
-}

From e4cdb54679e5726a3aa4ff1e00a4cd3cc557a5dc Mon Sep 17 00:00:00 2001
From: boxjan <boxjan@outlook.com>
Date: Fri, 9 Apr 2021 21:45:52 +0800
Subject: [PATCH 3/3] Although get a EOF error, we should still know it

---
 pkg/s3/client.go | 12 ++++--------
 1 file changed, 4 insertions(+), 8 deletions(-)

diff --git a/pkg/s3/client.go b/pkg/s3/client.go
index 5d06196..87d5df1 100644
--- a/pkg/s3/client.go
+++ b/pkg/s3/client.go
@@ -157,10 +157,8 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error {
 		errorCh := client.minio.RemoveObjects(client.ctx, bucketName, objectsCh, opts)
 		haveErrWhenRemoveObjects := false
 		for e := range errorCh {
-			if e.Err.Error() != "EOF" {
-				glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
-				haveErrWhenRemoveObjects = true
-			}
+			glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
+			haveErrWhenRemoveObjects = true
 		}
 		if haveErrWhenRemoveObjects {
 			return fmt.Errorf("Failed to remove all objects of bucket %s", bucketName)
@@ -212,10 +210,8 @@ func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error {
 
 	haveErrWhenRemoveObjects := false
 	for e := range removeErrCh {
-		if e.Err.Error() != "EOF" {
-			glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
-			haveErrWhenRemoveObjects = true
-		}
+		glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
+		haveErrWhenRemoveObjects = true
 	}
 	if haveErrWhenRemoveObjects {
 		return fmt.Errorf("Failed to remove all objects of path %s", bucketName)