k8s-csi-s3/pkg/s3/client.go

224 lines
5.4 KiB
Go
Raw Normal View History

2018-07-14 08:48:22 +00:00
package s3
import (
"bytes"
2021-01-15 03:19:59 +00:00
"context"
2018-07-14 08:48:22 +00:00
"fmt"
"net/url"
"sync/atomic"
2018-07-14 08:48:22 +00:00
"github.com/golang/glog"
2021-01-15 03:19:59 +00:00
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
2018-07-14 08:48:22 +00:00
)
const (
metadataName = ".metadata.json"
2018-07-14 08:48:22 +00:00
)
type s3Client struct {
Config *Config
minio *minio.Client
ctx context.Context
2018-07-14 08:48:22 +00:00
}
// Config holds values to configure the driver
type Config struct {
AccessKeyID string
SecretAccessKey string
Region string
Endpoint string
Mounter string
}
type FSMeta struct {
BucketName string `json:"Name"`
Prefix string `json:"Prefix"`
Mounter string `json:"Mounter"`
2021-07-16 13:12:57 +00:00
MountOptions []string `json:"MountOptions"`
CapacityBytes int64 `json:"CapacityBytes"`
}
func NewClient(cfg *Config) (*s3Client, error) {
2018-07-14 08:48:22 +00:00
var client = &s3Client{}
client.Config = cfg
u, err := url.Parse(client.Config.Endpoint)
2018-07-14 08:48:22 +00:00
if err != nil {
return nil, err
}
ssl := u.Scheme == "https"
endpoint := u.Hostname()
if u.Port() != "" {
endpoint = u.Hostname() + ":" + u.Port()
}
2021-01-15 03:19:59 +00:00
minioClient, err := minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(client.Config.AccessKeyID, client.Config.SecretAccessKey, ""),
2021-01-15 03:19:59 +00:00
Secure: ssl,
})
2018-07-14 08:48:22 +00:00
if err != nil {
return nil, err
}
client.minio = minioClient
2021-01-15 03:19:59 +00:00
client.ctx = context.Background()
2018-07-14 08:48:22 +00:00
return client, nil
}
func NewClientFromSecret(secret map[string]string) (*s3Client, error) {
return NewClient(&Config{
AccessKeyID: secret["accessKeyID"],
SecretAccessKey: secret["secretAccessKey"],
Region: secret["region"],
Endpoint: secret["endpoint"],
// Mounter is set in the volume preferences, not secrets
Mounter: "",
})
}
func (client *s3Client) BucketExists(bucketName string) (bool, error) {
2021-01-15 03:19:59 +00:00
return client.minio.BucketExists(client.ctx, bucketName)
2018-07-14 08:48:22 +00:00
}
func (client *s3Client) CreateBucket(bucketName string) error {
return client.minio.MakeBucket(client.ctx, bucketName, minio.MakeBucketOptions{Region: client.Config.Region})
2018-07-14 08:48:22 +00:00
}
func (client *s3Client) CreatePrefix(bucketName string, prefix string) error {
2021-07-16 13:33:13 +00:00
if prefix != "" {
_, err := client.minio.PutObject(client.ctx, bucketName, prefix+"/", bytes.NewReader([]byte("")), 0, minio.PutObjectOptions{})
if err != nil {
return err
}
}
return nil
}
func (client *s3Client) RemovePrefix(bucketName string, prefix string) error {
var err error
if err = client.removeObjects(bucketName, prefix); err == nil {
return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
}
2021-04-08 14:35:31 +00:00
glog.Warningf("removeObjects failed with: %s, will try removeObjectsOneByOne", err)
if err = client.removeObjectsOneByOne(bucketName, prefix); err == nil {
return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
}
return err
}
func (client *s3Client) RemoveBucket(bucketName string) error {
var err error
if err = client.removeObjects(bucketName, ""); err == nil {
return client.minio.RemoveBucket(client.ctx, bucketName)
2018-07-14 08:48:22 +00:00
}
2021-04-08 14:35:31 +00:00
glog.Warningf("removeObjects failed with: %s, will try removeObjectsOneByOne", err)
if err = client.removeObjectsOneByOne(bucketName, ""); err == nil {
return client.minio.RemoveBucket(client.ctx, bucketName)
}
return err
2018-07-14 08:48:22 +00:00
}
func (client *s3Client) removeObjects(bucketName, prefix string) error {
2021-01-15 03:19:59 +00:00
objectsCh := make(chan minio.ObjectInfo)
2018-07-14 08:48:22 +00:00
var listErr error
go func() {
defer close(objectsCh)
2021-01-15 03:19:59 +00:00
for object := range client.minio.ListObjects(
client.ctx,
bucketName,
minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) {
2018-07-14 08:48:22 +00:00
if object.Err != nil {
listErr = object.Err
return
}
2021-01-15 03:19:59 +00:00
objectsCh <- object
2018-07-14 08:48:22 +00:00
}
}()
if listErr != nil {
glog.Error("Error listing objects", listErr)
return listErr
}
select {
default:
2021-01-15 03:19:59 +00:00
opts := minio.RemoveObjectsOptions{
GovernanceBypass: true,
}
errorCh := client.minio.RemoveObjects(client.ctx, bucketName, objectsCh, opts)
haveErrWhenRemoveObjects := false
2018-07-14 08:48:22 +00:00
for e := range errorCh {
glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
haveErrWhenRemoveObjects = true
2018-07-14 08:48:22 +00:00
}
if haveErrWhenRemoveObjects {
2018-07-14 08:48:22 +00:00
return fmt.Errorf("Failed to remove all objects of bucket %s", bucketName)
}
}
return nil
2018-07-14 08:48:22 +00:00
}
// will delete files one by one without file lock
func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error {
parallelism := 16
objectsCh := make(chan minio.ObjectInfo, 1)
guardCh := make(chan int, parallelism)
var listErr error
var totalObjects int64 = 0
var removeErrors int64 = 0
go func() {
defer close(objectsCh)
for object := range client.minio.ListObjects(client.ctx, bucketName,
minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) {
if object.Err != nil {
listErr = object.Err
return
}
atomic.AddInt64(&totalObjects, 1)
objectsCh <- object
}
}()
if listErr != nil {
glog.Error("Error listing objects", listErr)
return listErr
}
for object := range objectsCh {
guardCh <- 1
go func(obj minio.ObjectInfo) {
err := client.minio.RemoveObject(client.ctx, bucketName, obj.Key,
minio.RemoveObjectOptions{VersionID: obj.VersionID})
if err != nil {
glog.Errorf("Failed to remove object %s, error: %s", obj.Key, err)
atomic.AddInt64(&removeErrors, 1)
}
<- guardCh
}(object)
}
for i := 0; i < parallelism; i++ {
guardCh <- 1
}
for i := 0; i < parallelism; i++ {
<- guardCh
}
if removeErrors > 0 {
return fmt.Errorf("Failed to remove %v objects out of total %v of path %s", removeErrors, totalObjects, bucketName)
}
return nil
}