diff --git a/pkg/s3/client.go b/pkg/s3/client.go index 91c22f8..ebb75bf 100644 --- a/pkg/s3/client.go +++ b/pkg/s3/client.go @@ -168,11 +168,13 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error { } // will delete files one by one without file lock -// FIXME Delete in parallel (if we want to delete objects at all!) func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error { + parallelism := 16 objectsCh := make(chan minio.ObjectInfo, 1) - removeErrCh := make(chan minio.RemoveObjectError, 1) + guardCh := make(chan int, parallelism) var listErr error + totalObjects := 0 + removeErrors := 0 go func() { defer close(objectsCh) @@ -183,6 +185,7 @@ func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error { listErr = object.Err return } + totalObjects++ objectsCh <- object } }() @@ -192,29 +195,27 @@ func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error { return listErr } - go func() { - defer close(removeErrCh) - - for object := range objectsCh { + for object := range objectsCh { + guardCh <- 1 + go func() { err := client.minio.RemoveObject(client.ctx, bucketName, object.Key, minio.RemoveObjectOptions{VersionID: object.VersionID}) if err != nil { - removeErrCh <- minio.RemoveObjectError{ - ObjectName: object.Key, - VersionID: object.VersionID, - Err: err, - } + glog.Errorf("Failed to remove object %s, error: %s", object.Key, err) + removeErrors++ } - } - }() - - haveErrWhenRemoveObjects := false - for e := range removeErrCh { - glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err) - haveErrWhenRemoveObjects = true + <- guardCh + }() } - if haveErrWhenRemoveObjects { - return fmt.Errorf("Failed to remove all objects of path %s", bucketName) + for i := 0; i < parallelism; i++ { + guardCh <- 1 + } + for i := 0; i < parallelism; i++ { + <- guardCh + } + + if removeErrors > 0 { + return fmt.Errorf("Failed to remove %v objects out of total %v of path %s", removeErrors, totalObjects, bucketName) } return nil