fix problem
* rclone use 1.54.1 * add EnsureFSMetaExist func * fix removeObjects have err check * add removeObjectsOneByOne, when removeObjects failed, will try to use it * fix repeat in yaml
This commit is contained in:
parent
ebffbeb54c
commit
7b5a3f6f4d
7 changed files with 115 additions and 31 deletions
|
@ -17,7 +17,7 @@ RUN apt-get update && \
|
||||||
rm -rf /var/lib/apt/lists/*
|
rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
# install rclone
|
# install rclone
|
||||||
ARG RCLONE_VERSION=v1.47.0
|
ARG RCLONE_VERSION=v1.54.1
|
||||||
RUN cd /tmp \
|
RUN cd /tmp \
|
||||||
&& curl -O https://downloads.rclone.org/${RCLONE_VERSION}/rclone-${RCLONE_VERSION}-linux-amd64.zip \
|
&& curl -O https://downloads.rclone.org/${RCLONE_VERSION}/rclone-${RCLONE_VERSION}-linux-amd64.zip \
|
||||||
&& unzip /tmp/rclone-${RCLONE_VERSION}-linux-amd64.zip \
|
&& unzip /tmp/rclone-${RCLONE_VERSION}-linux-amd64.zip \
|
||||||
|
|
|
@ -45,7 +45,7 @@ RUN apt-get update && \
|
||||||
rm -rf /var/lib/apt/lists/*
|
rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
# install rclone
|
# install rclone
|
||||||
ARG RCLONE_VERSION=v1.47.0
|
ARG RCLONE_VERSION=v1.54.1
|
||||||
RUN cd /tmp \
|
RUN cd /tmp \
|
||||||
&& curl -O https://downloads.rclone.org/${RCLONE_VERSION}/rclone-${RCLONE_VERSION}-linux-amd64.zip \
|
&& curl -O https://downloads.rclone.org/${RCLONE_VERSION}/rclone-${RCLONE_VERSION}-linux-amd64.zip \
|
||||||
&& unzip /tmp/rclone-${RCLONE_VERSION}-linux-amd64.zip \
|
&& unzip /tmp/rclone-${RCLONE_VERSION}-linux-amd64.zip \
|
||||||
|
|
|
@ -59,9 +59,6 @@ metadata:
|
||||||
name: csi-attacher-s3
|
name: csi-attacher-s3
|
||||||
namespace: kube-system
|
namespace: kube-system
|
||||||
spec:
|
spec:
|
||||||
selector:
|
|
||||||
matchLabels:
|
|
||||||
app: "csi-attacher-s3"
|
|
||||||
serviceName: "csi-attacher-s3"
|
serviceName: "csi-attacher-s3"
|
||||||
replicas: 1
|
replicas: 1
|
||||||
selector:
|
selector:
|
||||||
|
|
|
@ -82,7 +82,7 @@ spec:
|
||||||
add: ["SYS_ADMIN"]
|
add: ["SYS_ADMIN"]
|
||||||
allowPrivilegeEscalation: true
|
allowPrivilegeEscalation: true
|
||||||
image: ctrox/csi-s3:v1.2.0-rc.1
|
image: ctrox/csi-s3:v1.2.0-rc.1
|
||||||
imagePullPolicy: Never
|
imagePullPolicy: "Always"
|
||||||
args:
|
args:
|
||||||
- "--endpoint=$(CSI_ENDPOINT)"
|
- "--endpoint=$(CSI_ENDPOINT)"
|
||||||
- "--nodeid=$(NODE_ID)"
|
- "--nodeid=$(NODE_ID)"
|
||||||
|
@ -94,7 +94,6 @@ spec:
|
||||||
valueFrom:
|
valueFrom:
|
||||||
fieldRef:
|
fieldRef:
|
||||||
fieldPath: spec.nodeName
|
fieldPath: spec.nodeName
|
||||||
imagePullPolicy: "Always"
|
|
||||||
volumeMounts:
|
volumeMounts:
|
||||||
- name: plugin-dir
|
- name: plugin-dir
|
||||||
mountPath: /csi
|
mountPath: /csi
|
||||||
|
|
|
@ -58,9 +58,6 @@ metadata:
|
||||||
name: csi-provisioner-s3
|
name: csi-provisioner-s3
|
||||||
namespace: kube-system
|
namespace: kube-system
|
||||||
spec:
|
spec:
|
||||||
selector:
|
|
||||||
matchLabels:
|
|
||||||
app: "csi-provisioner-s3"
|
|
||||||
serviceName: "csi-provisioner-s3"
|
serviceName: "csi-provisioner-s3"
|
||||||
replicas: 1
|
replicas: 1
|
||||||
selector:
|
selector:
|
||||||
|
|
|
@ -129,6 +129,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
||||||
func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
|
func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
|
||||||
volumeID := req.GetVolumeId()
|
volumeID := req.GetVolumeId()
|
||||||
bucketName, prefix := volumeIDToBucketPrefix(volumeID)
|
bucketName, prefix := volumeIDToBucketPrefix(volumeID)
|
||||||
|
var meta *s3.FSMeta
|
||||||
|
|
||||||
// Check arguments
|
// Check arguments
|
||||||
if len(volumeID) == 0 {
|
if len(volumeID) == 0 {
|
||||||
|
@ -146,25 +147,33 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||||
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
|
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := client.GetFSMeta(bucketName, prefix); err != nil {
|
if meta, err = client.GetFSMeta(bucketName, prefix); err != nil {
|
||||||
glog.V(5).Infof("FSMeta of volume %s does not exist, ignoring delete request", volumeID)
|
glog.V(5).Infof("FSMeta of volume %s does not exist, ignoring delete request", volumeID)
|
||||||
return &csi.DeleteVolumeResponse{}, nil
|
return &csi.DeleteVolumeResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var deleteErr error
|
||||||
if prefix == "" {
|
if prefix == "" {
|
||||||
// prefix is empty, we delete the whole bucket
|
// prefix is empty, we delete the whole bucket
|
||||||
if err := client.RemoveBucket(bucketName); err != nil {
|
if err := client.RemoveBucket(bucketName); err != nil {
|
||||||
glog.V(3).Infof("Failed to remove volume %s: %v", volumeID, err)
|
deleteErr = err
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
glog.V(4).Infof("Bucket %s removed", bucketName)
|
glog.V(4).Infof("Bucket %s removed", bucketName)
|
||||||
} else {
|
} else {
|
||||||
if err := client.RemovePrefix(bucketName, prefix); err != nil {
|
if err := client.RemovePrefix(bucketName, prefix); err != nil {
|
||||||
return nil, fmt.Errorf("unable to remove prefix: %w", err)
|
deleteErr = fmt.Errorf("unable to remove prefix: %w", err)
|
||||||
}
|
}
|
||||||
glog.V(4).Infof("Prefix %s removed", prefix)
|
glog.V(4).Infof("Prefix %s removed", prefix)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if deleteErr != nil {
|
||||||
|
glog.Warning("remove volume failed, will ensure fsmeta exist, or may will lost control")
|
||||||
|
if err := client.EnsureFSMetaExist(meta, bucketName, prefix); err != nil {
|
||||||
|
glog.Error(err)
|
||||||
|
}
|
||||||
|
return nil, deleteErr
|
||||||
|
}
|
||||||
|
|
||||||
return &csi.DeleteVolumeResponse{}, nil
|
return &csi.DeleteVolumeResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -178,11 +187,11 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
|
||||||
}
|
}
|
||||||
bucketName, prefix := volumeIDToBucketPrefix(req.GetVolumeId())
|
bucketName, prefix := volumeIDToBucketPrefix(req.GetVolumeId())
|
||||||
|
|
||||||
s3, err := s3.NewClientFromSecret(req.GetSecrets())
|
client, err := s3.NewClientFromSecret(req.GetSecrets())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
|
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
|
||||||
}
|
}
|
||||||
exists, err := s3.BucketExists(bucketName)
|
exists, err := client.BucketExists(bucketName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -192,7 +201,7 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
|
||||||
return nil, status.Error(codes.NotFound, fmt.Sprintf("bucket of volume with id %s does not exist", req.GetVolumeId()))
|
return nil, status.Error(codes.NotFound, fmt.Sprintf("bucket of volume with id %s does not exist", req.GetVolumeId()))
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := s3.GetFSMeta(bucketName, prefix); err != nil {
|
if _, err := client.GetFSMeta(bucketName, prefix); err != nil {
|
||||||
// return an error if the fsmeta of the requested volume does not exist
|
// return an error if the fsmeta of the requested volume does not exist
|
||||||
return nil, status.Error(codes.NotFound, fmt.Sprintf("fsmeta of volume with id %s does not exist", req.GetVolumeId()))
|
return nil, status.Error(codes.NotFound, fmt.Sprintf("fsmeta of volume with id %s does not exist", req.GetVolumeId()))
|
||||||
}
|
}
|
||||||
|
@ -202,8 +211,8 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
|
||||||
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
|
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, cap := range req.VolumeCapabilities {
|
for _, capability := range req.VolumeCapabilities {
|
||||||
if cap.GetAccessMode().GetMode() != supportedAccessMode.GetMode() {
|
if capability.GetAccessMode().GetMode() != supportedAccessMode.GetMode() {
|
||||||
return &csi.ValidateVolumeCapabilitiesResponse{Message: "Only single node writer is supported"}, nil
|
return &csi.ValidateVolumeCapabilitiesResponse{Message: "Only single node writer is supported"}, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
106
pkg/s3/client.go
106
pkg/s3/client.go
|
@ -94,17 +94,35 @@ func (client *s3Client) CreatePrefix(bucketName string, prefix string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *s3Client) RemovePrefix(bucketName string, prefix string) error {
|
func (client *s3Client) RemovePrefix(bucketName string, prefix string) error {
|
||||||
if err := client.removeObjects(bucketName, prefix); err != nil {
|
var err error
|
||||||
return err
|
|
||||||
|
if err = client.removeObjects(bucketName, prefix); err == nil {
|
||||||
|
return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
|
||||||
}
|
}
|
||||||
return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
|
|
||||||
|
glog.Warning("removeObjects failed with: ", err, ", will try removeObjectsOneByOne")
|
||||||
|
|
||||||
|
if err = client.removeObjectsOneByOne(bucketName, prefix); err == nil {
|
||||||
|
return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *s3Client) RemoveBucket(bucketName string) error {
|
func (client *s3Client) RemoveBucket(bucketName string) error {
|
||||||
if err := client.removeObjects(bucketName, ""); err != nil {
|
var err error
|
||||||
return err
|
|
||||||
|
if err = client.removeObjects(bucketName, ""); err == nil {
|
||||||
|
return client.minio.RemoveBucket(client.ctx, bucketName)
|
||||||
}
|
}
|
||||||
return client.minio.RemoveBucket(client.ctx, bucketName)
|
|
||||||
|
glog.Warning("removeObjects failed with: ", err, ", will try removeObjectsOneByOne")
|
||||||
|
|
||||||
|
if err = client.removeObjectsOneByOne(bucketName, ""); err == nil {
|
||||||
|
return client.minio.RemoveBucket(client.ctx, bucketName)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *s3Client) removeObjects(bucketName, prefix string) error {
|
func (client *s3Client) removeObjects(bucketName, prefix string) error {
|
||||||
|
@ -114,10 +132,6 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error {
|
||||||
go func() {
|
go func() {
|
||||||
defer close(objectsCh)
|
defer close(objectsCh)
|
||||||
|
|
||||||
doneCh := make(chan struct{})
|
|
||||||
|
|
||||||
defer close(doneCh)
|
|
||||||
|
|
||||||
for object := range client.minio.ListObjects(
|
for object := range client.minio.ListObjects(
|
||||||
client.ctx,
|
client.ctx,
|
||||||
bucketName,
|
bucketName,
|
||||||
|
@ -141,10 +155,14 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error {
|
||||||
GovernanceBypass: true,
|
GovernanceBypass: true,
|
||||||
}
|
}
|
||||||
errorCh := client.minio.RemoveObjects(client.ctx, bucketName, objectsCh, opts)
|
errorCh := client.minio.RemoveObjects(client.ctx, bucketName, objectsCh, opts)
|
||||||
|
haveErrWhenRemoveObjects := false
|
||||||
for e := range errorCh {
|
for e := range errorCh {
|
||||||
glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
|
if e.Err.Error() != "EOF" {
|
||||||
|
glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
|
||||||
|
haveErrWhenRemoveObjects = true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if len(errorCh) != 0 {
|
if haveErrWhenRemoveObjects {
|
||||||
return fmt.Errorf("Failed to remove all objects of bucket %s", bucketName)
|
return fmt.Errorf("Failed to remove all objects of bucket %s", bucketName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -152,6 +170,60 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// will delete files one by one without file lock
|
||||||
|
func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error {
|
||||||
|
objectsCh := make(chan minio.ObjectInfo, 1)
|
||||||
|
removeErrCh := make(chan minio.RemoveObjectError, 1)
|
||||||
|
var listErr error
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(objectsCh)
|
||||||
|
|
||||||
|
for object := range client.minio.ListObjects(client.ctx, bucketName,
|
||||||
|
minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) {
|
||||||
|
if object.Err != nil {
|
||||||
|
listErr = object.Err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
objectsCh <- object
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if listErr != nil {
|
||||||
|
glog.Error("Error listing objects", listErr)
|
||||||
|
return listErr
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(removeErrCh)
|
||||||
|
|
||||||
|
for object := range objectsCh {
|
||||||
|
err := client.minio.RemoveObject(client.ctx, bucketName, object.Key,
|
||||||
|
minio.RemoveObjectOptions{VersionID: object.VersionID})
|
||||||
|
if err != nil {
|
||||||
|
removeErrCh <- minio.RemoveObjectError{
|
||||||
|
ObjectName: object.Key,
|
||||||
|
VersionID: object.VersionID,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
haveErrWhenRemoveObjects := false
|
||||||
|
for e := range removeErrCh {
|
||||||
|
if e.Err.Error() != "EOF" {
|
||||||
|
glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
|
||||||
|
haveErrWhenRemoveObjects = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if haveErrWhenRemoveObjects {
|
||||||
|
return fmt.Errorf("Failed to remove all objects of path %s", bucketName)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (client *s3Client) SetFSMeta(meta *FSMeta) error {
|
func (client *s3Client) SetFSMeta(meta *FSMeta) error {
|
||||||
b := new(bytes.Buffer)
|
b := new(bytes.Buffer)
|
||||||
json.NewEncoder(b).Encode(meta)
|
json.NewEncoder(b).Encode(meta)
|
||||||
|
@ -182,3 +254,13 @@ func (client *s3Client) GetFSMeta(bucketName, prefix string) (*FSMeta, error) {
|
||||||
err = json.Unmarshal(b, &meta)
|
err = json.Unmarshal(b, &meta)
|
||||||
return &meta, err
|
return &meta, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (client *s3Client) EnsureFSMetaExist(meta *FSMeta, bucketName, prefix string) error {
|
||||||
|
if _, err := client.GetFSMeta(bucketName, prefix); err != nil {
|
||||||
|
glog.Warningf("%s/%s get meta failed with: %s, will set meta of it", bucketName, prefix, err)
|
||||||
|
if err := client.SetFSMeta(meta); err != nil {
|
||||||
|
return fmt.Errorf("%s/%s get meta failed with: %s, may will lost control of it", bucketName, prefix, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue