Initial commit
This commit is contained in:
commit
419e3e6312
27 changed files with 1913 additions and 0 deletions
pkg/s3
97
pkg/s3/s3-client.go
Normal file
97
pkg/s3/s3-client.go
Normal file
|
@ -0,0 +1,97 @@
|
|||
package s3
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/minio/minio-go"
|
||||
)
|
||||
|
||||
const (
|
||||
metadataName = ".meta"
|
||||
)
|
||||
|
||||
type s3Client struct {
|
||||
cr *Credentials
|
||||
minio *minio.Client
|
||||
}
|
||||
|
||||
type bucketMetadata struct {
|
||||
CapacityBytes int64
|
||||
}
|
||||
|
||||
func newS3Client(cr *Credentials) (*s3Client, error) {
|
||||
var client = &s3Client{}
|
||||
|
||||
client.cr = cr
|
||||
u, err := url.Parse(client.cr.Endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ssl := u.Scheme == "https"
|
||||
endpoint := u.Hostname()
|
||||
if u.Port() != "" {
|
||||
endpoint = u.Hostname() + ":" + u.Port()
|
||||
}
|
||||
minioClient, err := minio.New(endpoint, client.cr.AccessKeyID, client.cr.SecretAccessKey, ssl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client.minio = minioClient
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func (client *s3Client) bucketExists(bucketName string) (bool, error) {
|
||||
return client.minio.BucketExists(bucketName)
|
||||
}
|
||||
|
||||
func (client *s3Client) createBucket(bucketName string) error {
|
||||
return client.minio.MakeBucket(bucketName, client.cr.Region)
|
||||
}
|
||||
|
||||
func (client *s3Client) removeBucket(bucketName string) error {
|
||||
if err := client.emptyBucket(bucketName); err != nil {
|
||||
return err
|
||||
}
|
||||
return client.minio.RemoveBucket(bucketName)
|
||||
}
|
||||
|
||||
func (client *s3Client) emptyBucket(bucketName string) error {
|
||||
objectsCh := make(chan string)
|
||||
var listErr error
|
||||
|
||||
go func() {
|
||||
defer close(objectsCh)
|
||||
|
||||
doneCh := make(chan struct{})
|
||||
|
||||
defer close(doneCh)
|
||||
|
||||
for object := range client.minio.ListObjects(bucketName, "", true, doneCh) {
|
||||
if object.Err != nil {
|
||||
listErr = object.Err
|
||||
return
|
||||
}
|
||||
objectsCh <- object.Key
|
||||
}
|
||||
}()
|
||||
|
||||
if listErr != nil {
|
||||
glog.Error("Error listing objects", listErr)
|
||||
return listErr
|
||||
}
|
||||
|
||||
select {
|
||||
default:
|
||||
errorCh := client.minio.RemoveObjects(bucketName, objectsCh)
|
||||
for e := range errorCh {
|
||||
glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
|
||||
}
|
||||
if len(errorCh) != 0 {
|
||||
return fmt.Errorf("Failed to remove all objects of bucket %s", bucketName)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue