fix problem
* rclone use 1.54.1 * add EnsureFSMetaExist func * fix removeObjects have err check * add removeObjectsOneByOne, when removeObjects failed, will try to use it * fix repeat in yaml
This commit is contained in:
parent
8b7849953a
commit
dc98732919
7 changed files with 115 additions and 31 deletions
pkg/s3
106
pkg/s3/client.go
106
pkg/s3/client.go
|
@ -94,17 +94,35 @@ func (client *s3Client) CreatePrefix(bucketName string, prefix string) error {
|
|||
}
|
||||
|
||||
func (client *s3Client) RemovePrefix(bucketName string, prefix string) error {
|
||||
if err := client.removeObjects(bucketName, prefix); err != nil {
|
||||
return err
|
||||
var err error
|
||||
|
||||
if err = client.removeObjects(bucketName, prefix); err == nil {
|
||||
return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
|
||||
}
|
||||
return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
|
||||
|
||||
glog.Warning("removeObjects failed with: ", err, ", will try removeObjectsOneByOne")
|
||||
|
||||
if err = client.removeObjectsOneByOne(bucketName, prefix); err == nil {
|
||||
return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (client *s3Client) RemoveBucket(bucketName string) error {
|
||||
if err := client.removeObjects(bucketName, ""); err != nil {
|
||||
return err
|
||||
var err error
|
||||
|
||||
if err = client.removeObjects(bucketName, ""); err == nil {
|
||||
return client.minio.RemoveBucket(client.ctx, bucketName)
|
||||
}
|
||||
return client.minio.RemoveBucket(client.ctx, bucketName)
|
||||
|
||||
glog.Warning("removeObjects failed with: ", err, ", will try removeObjectsOneByOne")
|
||||
|
||||
if err = client.removeObjectsOneByOne(bucketName, ""); err == nil {
|
||||
return client.minio.RemoveBucket(client.ctx, bucketName)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (client *s3Client) removeObjects(bucketName, prefix string) error {
|
||||
|
@ -114,10 +132,6 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error {
|
|||
go func() {
|
||||
defer close(objectsCh)
|
||||
|
||||
doneCh := make(chan struct{})
|
||||
|
||||
defer close(doneCh)
|
||||
|
||||
for object := range client.minio.ListObjects(
|
||||
client.ctx,
|
||||
bucketName,
|
||||
|
@ -141,10 +155,14 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error {
|
|||
GovernanceBypass: true,
|
||||
}
|
||||
errorCh := client.minio.RemoveObjects(client.ctx, bucketName, objectsCh, opts)
|
||||
haveErrWhenRemoveObjects := false
|
||||
for e := range errorCh {
|
||||
glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
|
||||
if e.Err.Error() != "EOF" {
|
||||
glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
|
||||
haveErrWhenRemoveObjects = true
|
||||
}
|
||||
}
|
||||
if len(errorCh) != 0 {
|
||||
if haveErrWhenRemoveObjects {
|
||||
return fmt.Errorf("Failed to remove all objects of bucket %s", bucketName)
|
||||
}
|
||||
}
|
||||
|
@ -152,6 +170,60 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// will delete files one by one without file lock
|
||||
func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error {
|
||||
objectsCh := make(chan minio.ObjectInfo, 1)
|
||||
removeErrCh := make(chan minio.RemoveObjectError, 1)
|
||||
var listErr error
|
||||
|
||||
go func() {
|
||||
defer close(objectsCh)
|
||||
|
||||
for object := range client.minio.ListObjects(client.ctx, bucketName,
|
||||
minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) {
|
||||
if object.Err != nil {
|
||||
listErr = object.Err
|
||||
return
|
||||
}
|
||||
objectsCh <- object
|
||||
}
|
||||
}()
|
||||
|
||||
if listErr != nil {
|
||||
glog.Error("Error listing objects", listErr)
|
||||
return listErr
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer close(removeErrCh)
|
||||
|
||||
for object := range objectsCh {
|
||||
err := client.minio.RemoveObject(client.ctx, bucketName, object.Key,
|
||||
minio.RemoveObjectOptions{VersionID: object.VersionID})
|
||||
if err != nil {
|
||||
removeErrCh <- minio.RemoveObjectError{
|
||||
ObjectName: object.Key,
|
||||
VersionID: object.VersionID,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
haveErrWhenRemoveObjects := false
|
||||
for e := range removeErrCh {
|
||||
if e.Err.Error() != "EOF" {
|
||||
glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
|
||||
haveErrWhenRemoveObjects = true
|
||||
}
|
||||
}
|
||||
if haveErrWhenRemoveObjects {
|
||||
return fmt.Errorf("Failed to remove all objects of path %s", bucketName)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (client *s3Client) SetFSMeta(meta *FSMeta) error {
|
||||
b := new(bytes.Buffer)
|
||||
json.NewEncoder(b).Encode(meta)
|
||||
|
@ -182,3 +254,13 @@ func (client *s3Client) GetFSMeta(bucketName, prefix string) (*FSMeta, error) {
|
|||
err = json.Unmarshal(b, &meta)
|
||||
return &meta, err
|
||||
}
|
||||
|
||||
func (client *s3Client) EnsureFSMetaExist(meta *FSMeta, bucketName, prefix string) error {
|
||||
if _, err := client.GetFSMeta(bucketName, prefix); err != nil {
|
||||
glog.Warningf("%s/%s get meta failed with: %s, will set meta of it", bucketName, prefix, err)
|
||||
if err := client.SetFSMeta(meta); err != nil {
|
||||
return fmt.Errorf("%s/%s get meta failed with: %s, may will lost control of it", bucketName, prefix, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue