Rename packages
Create separate packages for driver, s3 client and mounters.
This commit is contained in:
parent
2b81290a2f
commit
39ebd7e830
16 changed files with 213 additions and 207 deletions
pkg/s3
|
@ -1,167 +0,0 @@
|
|||
package s3
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
)
|
||||
|
||||
const (
|
||||
metadataName = ".metadata.json"
|
||||
defaultFsPrefix = "csi-fs"
|
||||
)
|
||||
|
||||
type s3Client struct {
|
||||
cfg *Config
|
||||
minio *minio.Client
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
type bucket struct {
|
||||
Name string
|
||||
Mounter string
|
||||
FSPath string
|
||||
CapacityBytes int64
|
||||
CreatedByCsi bool
|
||||
}
|
||||
|
||||
func newS3Client(cfg *Config) (*s3Client, error) {
|
||||
var client = &s3Client{}
|
||||
|
||||
client.cfg = cfg
|
||||
u, err := url.Parse(client.cfg.Endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ssl := u.Scheme == "https"
|
||||
endpoint := u.Hostname()
|
||||
if u.Port() != "" {
|
||||
endpoint = u.Hostname() + ":" + u.Port()
|
||||
}
|
||||
minioClient, err := minio.New(endpoint, &minio.Options{
|
||||
Creds: credentials.NewStaticV4(client.cfg.AccessKeyID, client.cfg.SecretAccessKey, client.cfg.Region),
|
||||
Secure: ssl,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client.minio = minioClient
|
||||
client.ctx = context.Background()
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func newS3ClientFromSecrets(secrets map[string]string) (*s3Client, error) {
|
||||
return newS3Client(&Config{
|
||||
AccessKeyID: secrets["accessKeyID"],
|
||||
SecretAccessKey: secrets["secretAccessKey"],
|
||||
Region: secrets["region"],
|
||||
Endpoint: secrets["endpoint"],
|
||||
// Mounter is set in the volume preferences, not secrets
|
||||
Mounter: "",
|
||||
})
|
||||
}
|
||||
|
||||
func (client *s3Client) bucketExists(bucketName string) (bool, error) {
|
||||
return client.minio.BucketExists(client.ctx, bucketName)
|
||||
}
|
||||
|
||||
func (client *s3Client) createBucket(bucketName string) error {
|
||||
return client.minio.MakeBucket(client.ctx, bucketName, minio.MakeBucketOptions{Region: client.cfg.Region})
|
||||
}
|
||||
|
||||
func (client *s3Client) createPrefix(bucketName string, prefix string) error {
|
||||
_, err := client.minio.PutObject(client.ctx, bucketName, prefix+"/", bytes.NewReader([]byte("")), 0, minio.PutObjectOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (client *s3Client) removeBucket(bucketName string) error {
|
||||
if err := client.emptyBucket(bucketName); err != nil {
|
||||
return err
|
||||
}
|
||||
return client.minio.RemoveBucket(client.ctx, bucketName)
|
||||
}
|
||||
|
||||
func (client *s3Client) emptyBucket(bucketName string) error {
|
||||
objectsCh := make(chan minio.ObjectInfo)
|
||||
var listErr error
|
||||
|
||||
go func() {
|
||||
defer close(objectsCh)
|
||||
|
||||
doneCh := make(chan struct{})
|
||||
|
||||
defer close(doneCh)
|
||||
|
||||
for object := range client.minio.ListObjects(
|
||||
client.ctx,
|
||||
bucketName,
|
||||
minio.ListObjectsOptions{Prefix: "", Recursive: true}) {
|
||||
if object.Err != nil {
|
||||
listErr = object.Err
|
||||
return
|
||||
}
|
||||
objectsCh <- object
|
||||
}
|
||||
}()
|
||||
|
||||
if listErr != nil {
|
||||
glog.Error("Error listing objects", listErr)
|
||||
return listErr
|
||||
}
|
||||
|
||||
select {
|
||||
default:
|
||||
opts := minio.RemoveObjectsOptions{
|
||||
GovernanceBypass: true,
|
||||
}
|
||||
errorCh := client.minio.RemoveObjects(client.ctx, bucketName, objectsCh, opts)
|
||||
for e := range errorCh {
|
||||
glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
|
||||
}
|
||||
if len(errorCh) != 0 {
|
||||
return fmt.Errorf("Failed to remove all objects of bucket %s", bucketName)
|
||||
}
|
||||
}
|
||||
|
||||
// ensure our prefix is also removed
|
||||
return client.minio.RemoveObject(client.ctx, bucketName, defaultFsPrefix, minio.RemoveObjectOptions{})
|
||||
}
|
||||
|
||||
func (client *s3Client) setBucket(bucket *bucket) error {
|
||||
b := new(bytes.Buffer)
|
||||
json.NewEncoder(b).Encode(bucket)
|
||||
opts := minio.PutObjectOptions{ContentType: "application/json"}
|
||||
_, err := client.minio.PutObject(client.ctx, bucket.Name, metadataName, b, int64(b.Len()), opts)
|
||||
return err
|
||||
}
|
||||
|
||||
func (client *s3Client) getBucket(bucketName string) (*bucket, error) {
|
||||
opts := minio.GetObjectOptions{}
|
||||
obj, err := client.minio.GetObject(client.ctx, bucketName, metadataName, opts)
|
||||
if err != nil {
|
||||
return &bucket{}, err
|
||||
}
|
||||
objInfo, err := obj.Stat()
|
||||
if err != nil {
|
||||
return &bucket{}, err
|
||||
}
|
||||
b := make([]byte, objInfo.Size)
|
||||
_, err = obj.Read(b)
|
||||
|
||||
if err != nil && err != io.EOF {
|
||||
return &bucket{}, err
|
||||
}
|
||||
var meta bucket
|
||||
err = json.Unmarshal(b, &meta)
|
||||
return &meta, err
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue