Delete objects in parallel when deleting auto-created volumes

This commit is contained in:
Vitaliy Filippov 2021-07-27 00:04:17 +03:00
parent 426a9623b6
commit ce0d5e7704

View file

@ -168,11 +168,13 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error {
} }
// will delete files one by one without file lock // will delete files one by one without file lock
// FIXME Delete in parallel (if we want to delete objects at all!)
func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error { func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error {
parallelism := 16
objectsCh := make(chan minio.ObjectInfo, 1) objectsCh := make(chan minio.ObjectInfo, 1)
removeErrCh := make(chan minio.RemoveObjectError, 1) guardCh := make(chan int, parallelism)
var listErr error var listErr error
totalObjects := 0
removeErrors := 0
go func() { go func() {
defer close(objectsCh) defer close(objectsCh)
@ -183,6 +185,7 @@ func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error {
listErr = object.Err listErr = object.Err
return return
} }
totalObjects++
objectsCh <- object objectsCh <- object
} }
}() }()
@ -192,29 +195,27 @@ func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error {
return listErr return listErr
} }
go func() {
defer close(removeErrCh)
for object := range objectsCh { for object := range objectsCh {
guardCh <- 1
go func() {
err := client.minio.RemoveObject(client.ctx, bucketName, object.Key, err := client.minio.RemoveObject(client.ctx, bucketName, object.Key,
minio.RemoveObjectOptions{VersionID: object.VersionID}) minio.RemoveObjectOptions{VersionID: object.VersionID})
if err != nil { if err != nil {
removeErrCh <- minio.RemoveObjectError{ glog.Errorf("Failed to remove object %s, error: %s", object.Key, err)
ObjectName: object.Key, removeErrors++
VersionID: object.VersionID,
Err: err,
}
}
} }
<- guardCh
}() }()
haveErrWhenRemoveObjects := false
for e := range removeErrCh {
glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
haveErrWhenRemoveObjects = true
} }
if haveErrWhenRemoveObjects { for i := 0; i < parallelism; i++ {
return fmt.Errorf("Failed to remove all objects of path %s", bucketName) guardCh <- 1
}
for i := 0; i < parallelism; i++ {
<- guardCh
}
if removeErrors > 0 {
return fmt.Errorf("Failed to remove %v objects out of total %v of path %s", removeErrors, totalObjects, bucketName)
} }
return nil return nil