2018-07-14 08:48:22 +00:00
|
|
|
package s3
|
|
|
|
|
|
|
|
import (
|
2018-07-27 10:56:28 +00:00
|
|
|
"bytes"
|
2021-01-15 03:19:59 +00:00
|
|
|
"context"
|
2018-07-27 10:56:28 +00:00
|
|
|
"encoding/json"
|
2018-07-14 08:48:22 +00:00
|
|
|
"fmt"
|
2018-07-27 10:56:28 +00:00
|
|
|
"io"
|
2018-07-14 08:48:22 +00:00
|
|
|
"net/url"
|
2021-04-05 13:07:16 +00:00
|
|
|
"path"
|
2018-07-14 08:48:22 +00:00
|
|
|
|
|
|
|
"github.com/golang/glog"
|
2021-01-15 03:19:59 +00:00
|
|
|
"github.com/minio/minio-go/v7"
|
|
|
|
"github.com/minio/minio-go/v7/pkg/credentials"
|
2018-07-14 08:48:22 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
2021-04-03 10:40:58 +00:00
|
|
|
metadataName = ".metadata.json"
|
2018-07-14 08:48:22 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
type s3Client struct {
|
2021-04-03 10:40:58 +00:00
|
|
|
Config *Config
|
|
|
|
minio *minio.Client
|
|
|
|
ctx context.Context
|
2018-07-14 08:48:22 +00:00
|
|
|
}
|
|
|
|
|
2021-04-03 10:40:58 +00:00
|
|
|
// Config holds values to configure the driver
|
|
|
|
type Config struct {
|
|
|
|
AccessKeyID string
|
|
|
|
SecretAccessKey string
|
|
|
|
Region string
|
|
|
|
Endpoint string
|
|
|
|
Mounter string
|
|
|
|
}
|
|
|
|
|
2021-04-05 13:07:16 +00:00
|
|
|
type FSMeta struct {
|
|
|
|
BucketName string `json:"Name"`
|
|
|
|
Prefix string `json:"Prefix"`
|
|
|
|
Mounter string `json:"Mounter"`
|
|
|
|
FSPath string `json:"FSPath"`
|
|
|
|
CapacityBytes int64 `json:"CapacityBytes"`
|
2018-07-27 10:56:28 +00:00
|
|
|
}
|
|
|
|
|
2021-04-03 10:40:58 +00:00
|
|
|
func NewClient(cfg *Config) (*s3Client, error) {
|
2018-07-14 08:48:22 +00:00
|
|
|
var client = &s3Client{}
|
|
|
|
|
2021-04-03 10:40:58 +00:00
|
|
|
client.Config = cfg
|
|
|
|
u, err := url.Parse(client.Config.Endpoint)
|
2018-07-14 08:48:22 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
ssl := u.Scheme == "https"
|
|
|
|
endpoint := u.Hostname()
|
|
|
|
if u.Port() != "" {
|
|
|
|
endpoint = u.Hostname() + ":" + u.Port()
|
|
|
|
}
|
2021-01-15 03:19:59 +00:00
|
|
|
minioClient, err := minio.New(endpoint, &minio.Options{
|
2021-04-03 10:40:58 +00:00
|
|
|
Creds: credentials.NewStaticV4(client.Config.AccessKeyID, client.Config.SecretAccessKey, client.Config.Region),
|
2021-01-15 03:19:59 +00:00
|
|
|
Secure: ssl,
|
|
|
|
})
|
2018-07-14 08:48:22 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
client.minio = minioClient
|
2021-01-15 03:19:59 +00:00
|
|
|
client.ctx = context.Background()
|
2018-07-14 08:48:22 +00:00
|
|
|
return client, nil
|
|
|
|
}
|
|
|
|
|
2021-04-03 10:40:58 +00:00
|
|
|
func NewClientFromSecret(secret map[string]string) (*s3Client, error) {
|
|
|
|
return NewClient(&Config{
|
|
|
|
AccessKeyID: secret["accessKeyID"],
|
|
|
|
SecretAccessKey: secret["secretAccessKey"],
|
|
|
|
Region: secret["region"],
|
|
|
|
Endpoint: secret["endpoint"],
|
2019-03-10 11:19:02 +00:00
|
|
|
// Mounter is set in the volume preferences, not secrets
|
|
|
|
Mounter: "",
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2021-04-03 10:40:58 +00:00
|
|
|
func (client *s3Client) BucketExists(bucketName string) (bool, error) {
|
2021-01-15 03:19:59 +00:00
|
|
|
return client.minio.BucketExists(client.ctx, bucketName)
|
2018-07-14 08:48:22 +00:00
|
|
|
}
|
|
|
|
|
2021-04-03 10:40:58 +00:00
|
|
|
func (client *s3Client) CreateBucket(bucketName string) error {
|
|
|
|
return client.minio.MakeBucket(client.ctx, bucketName, minio.MakeBucketOptions{Region: client.Config.Region})
|
2018-07-14 08:48:22 +00:00
|
|
|
}
|
|
|
|
|
2021-04-03 10:40:58 +00:00
|
|
|
func (client *s3Client) CreatePrefix(bucketName string, prefix string) error {
|
2021-01-15 03:19:59 +00:00
|
|
|
_, err := client.minio.PutObject(client.ctx, bucketName, prefix+"/", bytes.NewReader([]byte("")), 0, minio.PutObjectOptions{})
|
2018-07-27 19:37:32 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-04-03 10:40:58 +00:00
|
|
|
func (client *s3Client) RemovePrefix(bucketName string, prefix string) error {
|
2021-04-05 13:07:16 +00:00
|
|
|
if err := client.removeObjects(bucketName, prefix); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-04-03 10:40:58 +00:00
|
|
|
return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
|
|
|
|
}
|
|
|
|
|
|
|
|
func (client *s3Client) RemoveBucket(bucketName string) error {
|
2021-04-05 13:07:16 +00:00
|
|
|
if err := client.removeObjects(bucketName, ""); err != nil {
|
2018-07-14 08:48:22 +00:00
|
|
|
return err
|
|
|
|
}
|
2021-01-15 03:19:59 +00:00
|
|
|
return client.minio.RemoveBucket(client.ctx, bucketName)
|
2018-07-14 08:48:22 +00:00
|
|
|
}
|
|
|
|
|
2021-04-05 13:07:16 +00:00
|
|
|
func (client *s3Client) removeObjects(bucketName, prefix string) error {
|
2021-01-15 03:19:59 +00:00
|
|
|
objectsCh := make(chan minio.ObjectInfo)
|
2018-07-14 08:48:22 +00:00
|
|
|
var listErr error
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
defer close(objectsCh)
|
|
|
|
|
|
|
|
doneCh := make(chan struct{})
|
|
|
|
|
|
|
|
defer close(doneCh)
|
|
|
|
|
2021-01-15 03:19:59 +00:00
|
|
|
for object := range client.minio.ListObjects(
|
|
|
|
client.ctx,
|
|
|
|
bucketName,
|
2021-04-05 13:07:16 +00:00
|
|
|
minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) {
|
2018-07-14 08:48:22 +00:00
|
|
|
if object.Err != nil {
|
|
|
|
listErr = object.Err
|
|
|
|
return
|
|
|
|
}
|
2021-01-15 03:19:59 +00:00
|
|
|
objectsCh <- object
|
2018-07-14 08:48:22 +00:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
if listErr != nil {
|
|
|
|
glog.Error("Error listing objects", listErr)
|
|
|
|
return listErr
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
default:
|
2021-01-15 03:19:59 +00:00
|
|
|
opts := minio.RemoveObjectsOptions{
|
|
|
|
GovernanceBypass: true,
|
|
|
|
}
|
|
|
|
errorCh := client.minio.RemoveObjects(client.ctx, bucketName, objectsCh, opts)
|
2018-07-14 08:48:22 +00:00
|
|
|
for e := range errorCh {
|
|
|
|
glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
|
|
|
|
}
|
|
|
|
if len(errorCh) != 0 {
|
|
|
|
return fmt.Errorf("Failed to remove all objects of bucket %s", bucketName)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-04-03 10:40:58 +00:00
|
|
|
return nil
|
2018-07-14 08:48:22 +00:00
|
|
|
}
|
2018-07-27 10:56:28 +00:00
|
|
|
|
2021-04-05 13:07:16 +00:00
|
|
|
func (client *s3Client) SetFSMeta(meta *FSMeta) error {
|
2018-07-27 10:56:28 +00:00
|
|
|
b := new(bytes.Buffer)
|
2021-04-05 13:07:16 +00:00
|
|
|
json.NewEncoder(b).Encode(meta)
|
2018-07-27 10:56:28 +00:00
|
|
|
opts := minio.PutObjectOptions{ContentType: "application/json"}
|
2021-04-05 13:07:16 +00:00
|
|
|
_, err := client.minio.PutObject(
|
|
|
|
client.ctx, meta.BucketName, path.Join(meta.Prefix, metadataName), b, int64(b.Len()), opts,
|
|
|
|
)
|
2018-07-27 10:56:28 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-04-05 13:07:16 +00:00
|
|
|
func (client *s3Client) GetFSMeta(bucketName, prefix string) (*FSMeta, error) {
|
2018-07-27 10:56:28 +00:00
|
|
|
opts := minio.GetObjectOptions{}
|
2021-04-05 13:07:16 +00:00
|
|
|
obj, err := client.minio.GetObject(client.ctx, bucketName, path.Join(prefix, metadataName), opts)
|
2018-07-27 10:56:28 +00:00
|
|
|
if err != nil {
|
2021-04-05 13:07:16 +00:00
|
|
|
return &FSMeta{}, err
|
2018-07-27 10:56:28 +00:00
|
|
|
}
|
|
|
|
objInfo, err := obj.Stat()
|
|
|
|
if err != nil {
|
2021-04-05 13:07:16 +00:00
|
|
|
return &FSMeta{}, err
|
2018-07-27 10:56:28 +00:00
|
|
|
}
|
|
|
|
b := make([]byte, objInfo.Size)
|
|
|
|
_, err = obj.Read(b)
|
|
|
|
|
|
|
|
if err != nil && err != io.EOF {
|
2021-04-05 13:07:16 +00:00
|
|
|
return &FSMeta{}, err
|
2018-07-27 10:56:28 +00:00
|
|
|
}
|
2021-04-05 13:07:16 +00:00
|
|
|
var meta FSMeta
|
2018-07-27 10:56:28 +00:00
|
|
|
err = json.Unmarshal(b, &meta)
|
|
|
|
return &meta, err
|
|
|
|
}
|