From 7528b3297cb1f5078b123ea04f8fb8b90138c878 Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Sat, 3 Apr 2021 12:40:58 +0200 Subject: [PATCH] Rename packages Create separate packages for driver, s3 client and mounters. --- cmd/s3driver/main.go | 4 +- pkg/{s3 => driver}/controllerserver.go | 42 ++++--- pkg/{s3/s3-driver.go => driver/driver.go} | 24 ++-- .../driver_suite_test.go} | 15 +-- .../driver_test.go} | 2 +- pkg/{s3 => driver}/identityserver.go | 2 +- pkg/{s3 => driver}/nodeserver.go | 18 +-- .../mounter_goofys.go => mounter/goofys.go} | 7 +- pkg/{s3/util.go => mounter/mounter.go} | 113 ++++++++++++++---- .../mounter_rclone.go => mounter/rclone.go} | 8 +- .../s3backer.go} | 13 +- pkg/{s3/mounter_s3fs.go => mounter/s3fs.go} | 8 +- pkg/s3/{s3-client.go => client.go} | 69 ++++++----- pkg/s3/config.go | 10 -- pkg/s3/mounter.go | 83 ------------- test/test.sh | 2 +- 16 files changed, 213 insertions(+), 207 deletions(-) rename pkg/{s3 => driver}/controllerserver.go (87%) rename pkg/{s3/s3-driver.go => driver/driver.go} (80%) rename pkg/{s3/s3-driver_suite_test.go => driver/driver_suite_test.go} (88%) rename pkg/{s3/s3-driver_test.go => driver/driver_test.go} (97%) rename pkg/{s3 => driver}/identityserver.go (97%) rename pkg/{s3 => driver}/nodeserver.go (93%) rename pkg/{s3/mounter_goofys.go => mounter/goofys.go} (90%) rename pkg/{s3/util.go => mounter/mounter.go} (54%) rename pkg/{s3/mounter_rclone.go => mounter/rclone.go} (88%) rename pkg/{s3/mounter_s3backer.go => mounter/s3backer.go} (94%) rename pkg/{s3/mounter_s3fs.go => mounter/s3fs.go} (89%) rename pkg/s3/{s3-client.go => client.go} (64%) delete mode 100644 pkg/s3/config.go delete mode 100644 pkg/s3/mounter.go diff --git a/cmd/s3driver/main.go b/cmd/s3driver/main.go index 0426990..45334ab 100644 --- a/cmd/s3driver/main.go +++ b/cmd/s3driver/main.go @@ -21,7 +21,7 @@ import ( "log" "os" - "github.com/ctrox/csi-s3/pkg/s3" + "github.com/ctrox/csi-s3/pkg/driver" ) func init() { @@ -36,7 +36,7 @@ var ( func main() { flag.Parse() - driver, err := s3.NewS3(*nodeID, *endpoint) + driver, err := driver.New(*nodeID, *endpoint) if err != nil { log.Fatal(err) } diff --git a/pkg/s3/controllerserver.go b/pkg/driver/controllerserver.go similarity index 87% rename from pkg/s3/controllerserver.go rename to pkg/driver/controllerserver.go index 9351421..b866b41 100644 --- a/pkg/s3/controllerserver.go +++ b/pkg/driver/controllerserver.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package s3 +package driver import ( "crypto/sha1" @@ -23,6 +23,8 @@ import ( "io" "strings" + "github.com/ctrox/csi-s3/pkg/mounter" + "github.com/ctrox/csi-s3/pkg/s3" "github.com/golang/glog" "golang.org/x/net/context" "google.golang.org/grpc/codes" @@ -36,11 +38,15 @@ type controllerServer struct { *csicommon.DefaultControllerServer } +const ( + defaultFsPrefix = "csi-fs" +) + func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { params := req.GetParameters() volumeID := sanitizeVolumeID(req.GetName()) - if bucketName, bucketExists := params[bucketKey]; bucketExists { + if bucketName, bucketExists := params[mounter.BucketKey]; bucketExists { volumeID = sanitizeVolumeID(bucketName) } @@ -59,24 +65,24 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol capacityBytes := int64(req.GetCapacityRange().GetRequiredBytes()) - mounter := params[mounterTypeKey] + mounter := params[mounter.TypeKey] glog.V(4).Infof("Got a request to create volume %s", volumeID) - s3, err := newS3ClientFromSecrets(req.GetSecrets()) + client, err := s3.NewClientFromSecret(req.GetSecrets()) if err != nil { return nil, fmt.Errorf("failed to initialize S3 client: %s", err) } - exists, err := s3.bucketExists(volumeID) + exists, err := client.BucketExists(volumeID) if err != nil { return nil, fmt.Errorf("failed to check if bucket %s exists: %v", volumeID, err) } - var b *bucket + var b *s3.Bucket if exists { - b, err = s3.getBucket(volumeID) + b, err = client.GetBucket(volumeID) if err != nil { glog.Warningf("Bucket %s exists, but failed to get its metadata: %v", volumeID, err) - b = &bucket{ + b = &s3.Bucket{ Name: volumeID, Mounter: mounter, CapacityBytes: capacityBytes, @@ -91,13 +97,13 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol b.Mounter = mounter } } else { - if err = s3.createBucket(volumeID); err != nil { + if err = client.CreateBucket(volumeID); err != nil { return nil, fmt.Errorf("failed to create volume %s: %v", volumeID, err) } - if err = s3.createPrefix(volumeID, defaultFsPrefix); err != nil { + if err = client.CreatePrefix(volumeID, defaultFsPrefix); err != nil { return nil, fmt.Errorf("failed to create prefix %s: %v", defaultFsPrefix, err) } - b = &bucket{ + b = &s3.Bucket{ Name: volumeID, Mounter: mounter, CapacityBytes: capacityBytes, @@ -105,7 +111,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol CreatedByCsi: !exists, } } - if err := s3.setBucket(b); err != nil { + if err := client.SetBucket(b); err != nil { return nil, fmt.Errorf("Error setting bucket metadata: %v", err) } @@ -136,21 +142,21 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } glog.V(4).Infof("Deleting volume %s", volumeID) - s3, err := newS3ClientFromSecrets(req.GetSecrets()) + client, err := s3.NewClientFromSecret(req.GetSecrets()) if err != nil { return nil, fmt.Errorf("failed to initialize S3 client: %s", err) } - exists, err := s3.bucketExists(volumeID) + exists, err := client.BucketExists(volumeID) if err != nil { return nil, err } if exists { - b, err := s3.getBucket(volumeID) + b, err := client.GetBucket(volumeID) if err != nil { return nil, fmt.Errorf("Failed to get metadata of buckect %s", volumeID) } if b.CreatedByCsi { - if err := s3.removeBucket(volumeID); err != nil { + if err := client.RemoveBucket(volumeID); err != nil { glog.V(3).Infof("Failed to remove volume %s: %v", volumeID, err) return nil, err } @@ -175,11 +181,11 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request") } - s3, err := newS3ClientFromSecrets(req.GetSecrets()) + s3, err := s3.NewClientFromSecret(req.GetSecrets()) if err != nil { return nil, fmt.Errorf("failed to initialize S3 client: %s", err) } - exists, err := s3.bucketExists(req.GetVolumeId()) + exists, err := s3.BucketExists(req.GetVolumeId()) if err != nil { return nil, err } diff --git a/pkg/s3/s3-driver.go b/pkg/driver/driver.go similarity index 80% rename from pkg/s3/s3-driver.go rename to pkg/driver/driver.go index 141e771..badc955 100644 --- a/pkg/s3/s3-driver.go +++ b/pkg/driver/driver.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package s3 +package driver import ( "github.com/container-storage-interface/spec/lib/go/csi" @@ -23,7 +23,7 @@ import ( csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common" ) -type s3 struct { +type driver struct { driver *csicommon.CSIDriver endpoint string @@ -44,39 +44,39 @@ var ( driverName = "ch.ctrox.csi.s3-driver" ) -// NewS3 initializes the driver -func NewS3(nodeID string, endpoint string) (*s3, error) { - driver := csicommon.NewCSIDriver(driverName, vendorVersion, nodeID) - if driver == nil { +// New initializes the driver +func New(nodeID string, endpoint string) (*driver, error) { + d := csicommon.NewCSIDriver(driverName, vendorVersion, nodeID) + if d == nil { glog.Fatalln("Failed to initialize CSI Driver.") } - s3Driver := &s3{ + s3Driver := &driver{ endpoint: endpoint, - driver: driver, + driver: d, } return s3Driver, nil } -func (s3 *s3) newIdentityServer(d *csicommon.CSIDriver) *identityServer { +func (s3 *driver) newIdentityServer(d *csicommon.CSIDriver) *identityServer { return &identityServer{ DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d), } } -func (s3 *s3) newControllerServer(d *csicommon.CSIDriver) *controllerServer { +func (s3 *driver) newControllerServer(d *csicommon.CSIDriver) *controllerServer { return &controllerServer{ DefaultControllerServer: csicommon.NewDefaultControllerServer(d), } } -func (s3 *s3) newNodeServer(d *csicommon.CSIDriver) *nodeServer { +func (s3 *driver) newNodeServer(d *csicommon.CSIDriver) *nodeServer { return &nodeServer{ DefaultNodeServer: csicommon.NewDefaultNodeServer(d), } } -func (s3 *s3) Run() { +func (s3 *driver) Run() { glog.Infof("Driver: %v ", driverName) glog.Infof("Version: %v ", vendorVersion) // Initialize default library driver diff --git a/pkg/s3/s3-driver_suite_test.go b/pkg/driver/driver_suite_test.go similarity index 88% rename from pkg/s3/s3-driver_suite_test.go rename to pkg/driver/driver_suite_test.go index c216e9b..156c9d5 100644 --- a/pkg/s3/s3-driver_suite_test.go +++ b/pkg/driver/driver_suite_test.go @@ -1,10 +1,11 @@ -package s3_test +package driver_test import ( "log" "os" - "github.com/ctrox/csi-s3/pkg/s3" + "github.com/ctrox/csi-s3/pkg/driver" + "github.com/ctrox/csi-s3/pkg/mounter" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -19,7 +20,7 @@ var _ = Describe("S3Driver", func() { if err := os.Remove(socket); err != nil && !os.IsNotExist(err) { Expect(err).NotTo(HaveOccurred()) } - driver, err := s3.NewS3("test-node", csiEndpoint) + driver, err := driver.New("test-node", csiEndpoint) if err != nil { log.Fatal(err) } @@ -45,7 +46,7 @@ var _ = Describe("S3Driver", func() { if err := os.Remove(socket); err != nil && !os.IsNotExist(err) { Expect(err).NotTo(HaveOccurred()) } - driver, err := s3.NewS3("test-node", csiEndpoint) + driver, err := driver.New("test-node", csiEndpoint) if err != nil { log.Fatal(err) } @@ -73,8 +74,8 @@ var _ = Describe("S3Driver", func() { Expect(err).NotTo(HaveOccurred()) } // Clear loop device so we cover the creation of it - os.Remove(s3.S3backerLoopDevice) - driver, err := s3.NewS3("test-node", csiEndpoint) + os.Remove(mounter.S3backerLoopDevice) + driver, err := driver.New("test-node", csiEndpoint) if err != nil { log.Fatal(err) } @@ -101,7 +102,7 @@ var _ = Describe("S3Driver", func() { if err := os.Remove(socket); err != nil && !os.IsNotExist(err) { Expect(err).NotTo(HaveOccurred()) } - driver, err := s3.NewS3("test-node", csiEndpoint) + driver, err := driver.New("test-node", csiEndpoint) if err != nil { log.Fatal(err) } diff --git a/pkg/s3/s3-driver_test.go b/pkg/driver/driver_test.go similarity index 97% rename from pkg/s3/s3-driver_test.go rename to pkg/driver/driver_test.go index 5588247..ce92c81 100644 --- a/pkg/s3/s3-driver_test.go +++ b/pkg/driver/driver_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package s3 +package driver_test import ( "testing" diff --git a/pkg/s3/identityserver.go b/pkg/driver/identityserver.go similarity index 97% rename from pkg/s3/identityserver.go rename to pkg/driver/identityserver.go index 6e04fa8..eeed71d 100644 --- a/pkg/s3/identityserver.go +++ b/pkg/driver/identityserver.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package s3 +package driver import ( csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common" diff --git a/pkg/s3/nodeserver.go b/pkg/driver/nodeserver.go similarity index 93% rename from pkg/s3/nodeserver.go rename to pkg/driver/nodeserver.go index 5830e53..b5969e6 100644 --- a/pkg/s3/nodeserver.go +++ b/pkg/driver/nodeserver.go @@ -14,12 +14,14 @@ See the License for the specific language governing permissions and limitations under the License. */ -package s3 +package driver import ( "fmt" "os" + "github.com/ctrox/csi-s3/pkg/mounter" + "github.com/ctrox/csi-s3/pkg/s3" "github.com/golang/glog" "golang.org/x/net/context" @@ -76,16 +78,16 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis glog.V(4).Infof("target %v\ndevice %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n", targetPath, deviceID, readOnly, volumeID, attrib, mountFlags) - s3, err := newS3ClientFromSecrets(req.GetSecrets()) + s3, err := s3.NewClientFromSecret(req.GetSecrets()) if err != nil { return nil, fmt.Errorf("failed to initialize S3 client: %s", err) } - b, err := s3.getBucket(volumeID) + b, err := s3.GetBucket(volumeID) if err != nil { return nil, err } - mounter, err := newMounter(b, s3.cfg) + mounter, err := mounter.New(b, s3.Config) if err != nil { return nil, err } @@ -110,7 +112,7 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } - if err := fuseUnmount(targetPath); err != nil { + if err := mounter.FuseUnmount(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) } glog.V(4).Infof("s3: bucket %s has been unmounted.", volumeID) @@ -142,15 +144,15 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol if !notMnt { return &csi.NodeStageVolumeResponse{}, nil } - s3, err := newS3ClientFromSecrets(req.GetSecrets()) + client, err := s3.NewClientFromSecret(req.GetSecrets()) if err != nil { return nil, fmt.Errorf("failed to initialize S3 client: %s", err) } - b, err := s3.getBucket(volumeID) + b, err := client.GetBucket(volumeID) if err != nil { return nil, err } - mounter, err := newMounter(b, s3.cfg) + mounter, err := mounter.New(b, client.Config) if err != nil { return nil, err } diff --git a/pkg/s3/mounter_goofys.go b/pkg/mounter/goofys.go similarity index 90% rename from pkg/s3/mounter_goofys.go rename to pkg/mounter/goofys.go index 98a176b..054c912 100644 --- a/pkg/s3/mounter_goofys.go +++ b/pkg/mounter/goofys.go @@ -1,4 +1,4 @@ -package s3 +package mounter import ( "fmt" @@ -6,6 +6,7 @@ import ( "context" + "github.com/ctrox/csi-s3/pkg/s3" goofysApi "github.com/kahing/goofys/api" ) @@ -16,14 +17,14 @@ const ( // Implements Mounter type goofysMounter struct { - bucket *bucket + bucket *s3.Bucket endpoint string region string accessKeyID string secretAccessKey string } -func newGoofysMounter(b *bucket, cfg *Config) (Mounter, error) { +func newGoofysMounter(b *s3.Bucket, cfg *s3.Config) (Mounter, error) { region := cfg.Region // if endpoint is set we need a default region if region == "" && cfg.Endpoint != "" { diff --git a/pkg/s3/util.go b/pkg/mounter/mounter.go similarity index 54% rename from pkg/s3/util.go rename to pkg/mounter/mounter.go index da95a7c..f12ee6a 100644 --- a/pkg/s3/util.go +++ b/pkg/mounter/mounter.go @@ -1,4 +1,4 @@ -package s3 +package mounter import ( "errors" @@ -10,35 +10,83 @@ import ( "syscall" "time" + "github.com/ctrox/csi-s3/pkg/s3" + "github.com/golang/glog" "github.com/mitchellh/go-ps" "k8s.io/kubernetes/pkg/util/mount" - - "github.com/golang/glog" ) -func waitForProcess(p *os.Process, backoff int) error { - if backoff == 20 { - return fmt.Errorf("Timeout waiting for PID %v to end", p.Pid) +// Mounter interface which can be implemented +// by the different mounter types +type Mounter interface { + Stage(stagePath string) error + Unstage(stagePath string) error + Mount(source string, target string) error +} + +const ( + s3fsMounterType = "s3fs" + goofysMounterType = "goofys" + s3backerMounterType = "s3backer" + rcloneMounterType = "rclone" + TypeKey = "mounter" + BucketKey = "bucket" +) + +// New returns a new mounter depending on the mounterType parameter +func New(bucket *s3.Bucket, cfg *s3.Config) (Mounter, error) { + mounter := bucket.Mounter + // Fall back to mounterType in cfg + if len(bucket.Mounter) == 0 { + mounter = cfg.Mounter } - cmdLine, err := getCmdLine(p.Pid) + switch mounter { + case s3fsMounterType: + return newS3fsMounter(bucket, cfg) + + case goofysMounterType: + return newGoofysMounter(bucket, cfg) + + case s3backerMounterType: + return newS3backerMounter(bucket, cfg) + + case rcloneMounterType: + return newRcloneMounter(bucket, cfg) + + default: + // default to s3backer + return newS3backerMounter(bucket, cfg) + } +} + +func fuseMount(path string, command string, args []string) error { + cmd := exec.Command(command, args...) + glog.V(3).Infof("Mounting fuse with command: %s and args: %s", command, args) + + out, err := cmd.CombinedOutput() if err != nil { - glog.Warningf("Error checking cmdline of PID %v, assuming it is dead: %s", p.Pid, err) + return fmt.Errorf("Error fuseMount command: %s\nargs: %s\noutput: %s", command, args, out) + } + + return waitForMount(path, 10*time.Second) +} + +func FuseUnmount(path string) error { + if err := mount.New("").Unmount(path); err != nil { + return err + } + // as fuse quits immediately, we will try to wait until the process is done + process, err := findFuseMountProcess(path) + if err != nil { + glog.Errorf("Error getting PID of fuse mount: %s", err) return nil } - if cmdLine == "" { - // ignore defunct processes - // TODO: debug why this happens in the first place - // seems to only happen on k8s, not on local docker - glog.Warning("Fuse process seems dead, returning") + if process == nil { + glog.Warningf("Unable to find PID of fuse mount %s, it must have finished already", path) return nil } - if err := p.Signal(syscall.Signal(0)); err != nil { - glog.Warningf("Fuse process does not seem active or we are unprivileged: %s", err) - return nil - } - glog.Infof("Fuse process with PID %v still active, waiting...", p.Pid) - time.Sleep(time.Duration(backoff*100) * time.Millisecond) - return waitForProcess(p, backoff+1) + glog.Infof("Found fuse pid %v of mount %s, checking if it still runs", process.Pid, path) + return waitForProcess(process, 1) } func waitForMount(path string, timeout time.Duration) error { @@ -79,6 +127,31 @@ func findFuseMountProcess(path string) (*os.Process, error) { return nil, nil } +func waitForProcess(p *os.Process, backoff int) error { + if backoff == 20 { + return fmt.Errorf("Timeout waiting for PID %v to end", p.Pid) + } + cmdLine, err := getCmdLine(p.Pid) + if err != nil { + glog.Warningf("Error checking cmdline of PID %v, assuming it is dead: %s", p.Pid, err) + return nil + } + if cmdLine == "" { + // ignore defunct processes + // TODO: debug why this happens in the first place + // seems to only happen on k8s, not on local docker + glog.Warning("Fuse process seems dead, returning") + return nil + } + if err := p.Signal(syscall.Signal(0)); err != nil { + glog.Warningf("Fuse process does not seem active or we are unprivileged: %s", err) + return nil + } + glog.Infof("Fuse process with PID %v still active, waiting...", p.Pid) + time.Sleep(time.Duration(backoff*100) * time.Millisecond) + return waitForProcess(p, backoff+1) +} + func getCmdLine(pid int) (string, error) { cmdLineFile := fmt.Sprintf("/proc/%v/cmdline", pid) cmdLine, err := ioutil.ReadFile(cmdLineFile) diff --git a/pkg/s3/mounter_rclone.go b/pkg/mounter/rclone.go similarity index 88% rename from pkg/s3/mounter_rclone.go rename to pkg/mounter/rclone.go index abc2d32..51ad842 100644 --- a/pkg/s3/mounter_rclone.go +++ b/pkg/mounter/rclone.go @@ -1,13 +1,15 @@ -package s3 +package mounter import ( "fmt" "os" + + "github.com/ctrox/csi-s3/pkg/s3" ) // Implements Mounter type rcloneMounter struct { - bucket *bucket + bucket *s3.Bucket url string region string accessKeyID string @@ -18,7 +20,7 @@ const ( rcloneCmd = "rclone" ) -func newRcloneMounter(b *bucket, cfg *Config) (Mounter, error) { +func newRcloneMounter(b *s3.Bucket, cfg *s3.Config) (Mounter, error) { return &rcloneMounter{ bucket: b, url: cfg.Endpoint, diff --git a/pkg/s3/mounter_s3backer.go b/pkg/mounter/s3backer.go similarity index 94% rename from pkg/s3/mounter_s3backer.go rename to pkg/mounter/s3backer.go index 118263c..04e5172 100644 --- a/pkg/s3/mounter_s3backer.go +++ b/pkg/mounter/s3backer.go @@ -1,4 +1,4 @@ -package s3 +package mounter import ( "fmt" @@ -7,13 +7,14 @@ import ( "os/exec" "path" + "github.com/ctrox/csi-s3/pkg/s3" "github.com/golang/glog" "k8s.io/kubernetes/pkg/util/mount" ) // Implements Mounter type s3backerMounter struct { - bucket *bucket + bucket *s3.Bucket url string region string accessKeyID string @@ -32,7 +33,7 @@ const ( S3backerLoopDevice = "/dev/loop0" ) -func newS3backerMounter(bucket *bucket, cfg *Config) (Mounter, error) { +func newS3backerMounter(bucket *s3.Bucket, cfg *s3.Config) (Mounter, error) { url, err := url.Parse(cfg.Endpoint) if err != nil { return nil, err @@ -71,14 +72,14 @@ func (s3backer *s3backerMounter) Stage(stageTarget string) error { // ensure 'file' device is formatted err := formatFs(s3backerFsType, path.Join(stageTarget, s3backerDevice)) if err != nil { - fuseUnmount(stageTarget) + FuseUnmount(stageTarget) } return err } func (s3backer *s3backerMounter) Unstage(stageTarget string) error { // Unmount the s3backer fuse mount - return fuseUnmount(stageTarget) + return FuseUnmount(stageTarget) } func (s3backer *s3backerMounter) Mount(source string, target string) error { @@ -87,7 +88,7 @@ func (s3backer *s3backerMounter) Mount(source string, target string) error { err := mount.New("").Mount(device, target, s3backerFsType, []string{}) if err != nil { // cleanup fuse mount - fuseUnmount(target) + FuseUnmount(target) return err } return nil diff --git a/pkg/s3/mounter_s3fs.go b/pkg/mounter/s3fs.go similarity index 89% rename from pkg/s3/mounter_s3fs.go rename to pkg/mounter/s3fs.go index 082e9fc..b916e52 100644 --- a/pkg/s3/mounter_s3fs.go +++ b/pkg/mounter/s3fs.go @@ -1,13 +1,15 @@ -package s3 +package mounter import ( "fmt" "os" + + "github.com/ctrox/csi-s3/pkg/s3" ) // Implements Mounter type s3fsMounter struct { - bucket *bucket + bucket *s3.Bucket url string region string pwFileContent string @@ -17,7 +19,7 @@ const ( s3fsCmd = "s3fs" ) -func newS3fsMounter(b *bucket, cfg *Config) (Mounter, error) { +func newS3fsMounter(b *s3.Bucket, cfg *s3.Config) (Mounter, error) { return &s3fsMounter{ bucket: b, url: cfg.Endpoint, diff --git a/pkg/s3/s3-client.go b/pkg/s3/client.go similarity index 64% rename from pkg/s3/s3-client.go rename to pkg/s3/client.go index 7d25895..1411aa1 100644 --- a/pkg/s3/s3-client.go +++ b/pkg/s3/client.go @@ -14,17 +14,25 @@ import ( ) const ( - metadataName = ".metadata.json" - defaultFsPrefix = "csi-fs" + metadataName = ".metadata.json" ) type s3Client struct { - cfg *Config - minio *minio.Client - ctx context.Context + Config *Config + minio *minio.Client + ctx context.Context } -type bucket struct { +// Config holds values to configure the driver +type Config struct { + AccessKeyID string + SecretAccessKey string + Region string + Endpoint string + Mounter string +} + +type Bucket struct { Name string Mounter string FSPath string @@ -32,11 +40,11 @@ type bucket struct { CreatedByCsi bool } -func newS3Client(cfg *Config) (*s3Client, error) { +func NewClient(cfg *Config) (*s3Client, error) { var client = &s3Client{} - client.cfg = cfg - u, err := url.Parse(client.cfg.Endpoint) + client.Config = cfg + u, err := url.Parse(client.Config.Endpoint) if err != nil { return nil, err } @@ -46,7 +54,7 @@ func newS3Client(cfg *Config) (*s3Client, error) { endpoint = u.Hostname() + ":" + u.Port() } minioClient, err := minio.New(endpoint, &minio.Options{ - Creds: credentials.NewStaticV4(client.cfg.AccessKeyID, client.cfg.SecretAccessKey, client.cfg.Region), + Creds: credentials.NewStaticV4(client.Config.AccessKeyID, client.Config.SecretAccessKey, client.Config.Region), Secure: ssl, }) if err != nil { @@ -57,26 +65,26 @@ func newS3Client(cfg *Config) (*s3Client, error) { return client, nil } -func newS3ClientFromSecrets(secrets map[string]string) (*s3Client, error) { - return newS3Client(&Config{ - AccessKeyID: secrets["accessKeyID"], - SecretAccessKey: secrets["secretAccessKey"], - Region: secrets["region"], - Endpoint: secrets["endpoint"], +func NewClientFromSecret(secret map[string]string) (*s3Client, error) { + return NewClient(&Config{ + AccessKeyID: secret["accessKeyID"], + SecretAccessKey: secret["secretAccessKey"], + Region: secret["region"], + Endpoint: secret["endpoint"], // Mounter is set in the volume preferences, not secrets Mounter: "", }) } -func (client *s3Client) bucketExists(bucketName string) (bool, error) { +func (client *s3Client) BucketExists(bucketName string) (bool, error) { return client.minio.BucketExists(client.ctx, bucketName) } -func (client *s3Client) createBucket(bucketName string) error { - return client.minio.MakeBucket(client.ctx, bucketName, minio.MakeBucketOptions{Region: client.cfg.Region}) +func (client *s3Client) CreateBucket(bucketName string) error { + return client.minio.MakeBucket(client.ctx, bucketName, minio.MakeBucketOptions{Region: client.Config.Region}) } -func (client *s3Client) createPrefix(bucketName string, prefix string) error { +func (client *s3Client) CreatePrefix(bucketName string, prefix string) error { _, err := client.minio.PutObject(client.ctx, bucketName, prefix+"/", bytes.NewReader([]byte("")), 0, minio.PutObjectOptions{}) if err != nil { return err @@ -84,7 +92,11 @@ func (client *s3Client) createPrefix(bucketName string, prefix string) error { return nil } -func (client *s3Client) removeBucket(bucketName string) error { +func (client *s3Client) RemovePrefix(bucketName string, prefix string) error { + return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{}) +} + +func (client *s3Client) RemoveBucket(bucketName string) error { if err := client.emptyBucket(bucketName); err != nil { return err } @@ -133,11 +145,10 @@ func (client *s3Client) emptyBucket(bucketName string) error { } } - // ensure our prefix is also removed - return client.minio.RemoveObject(client.ctx, bucketName, defaultFsPrefix, minio.RemoveObjectOptions{}) + return nil } -func (client *s3Client) setBucket(bucket *bucket) error { +func (client *s3Client) SetBucket(bucket *Bucket) error { b := new(bytes.Buffer) json.NewEncoder(b).Encode(bucket) opts := minio.PutObjectOptions{ContentType: "application/json"} @@ -145,23 +156,23 @@ func (client *s3Client) setBucket(bucket *bucket) error { return err } -func (client *s3Client) getBucket(bucketName string) (*bucket, error) { +func (client *s3Client) GetBucket(bucketName string) (*Bucket, error) { opts := minio.GetObjectOptions{} obj, err := client.minio.GetObject(client.ctx, bucketName, metadataName, opts) if err != nil { - return &bucket{}, err + return &Bucket{}, err } objInfo, err := obj.Stat() if err != nil { - return &bucket{}, err + return &Bucket{}, err } b := make([]byte, objInfo.Size) _, err = obj.Read(b) if err != nil && err != io.EOF { - return &bucket{}, err + return &Bucket{}, err } - var meta bucket + var meta Bucket err = json.Unmarshal(b, &meta) return &meta, err } diff --git a/pkg/s3/config.go b/pkg/s3/config.go deleted file mode 100644 index e3d0b9a..0000000 --- a/pkg/s3/config.go +++ /dev/null @@ -1,10 +0,0 @@ -package s3 - -// Config holds values to configure the driver -type Config struct { - AccessKeyID string - SecretAccessKey string - Region string - Endpoint string - Mounter string -} diff --git a/pkg/s3/mounter.go b/pkg/s3/mounter.go deleted file mode 100644 index bda6dbe..0000000 --- a/pkg/s3/mounter.go +++ /dev/null @@ -1,83 +0,0 @@ -package s3 - -import ( - "fmt" - "os/exec" - "time" - - "github.com/golang/glog" - "k8s.io/kubernetes/pkg/util/mount" -) - -// Mounter interface which can be implemented -// by the different mounter types -type Mounter interface { - Stage(stagePath string) error - Unstage(stagePath string) error - Mount(source string, target string) error -} - -const ( - s3fsMounterType = "s3fs" - goofysMounterType = "goofys" - s3backerMounterType = "s3backer" - rcloneMounterType = "rclone" - mounterTypeKey = "mounter" - bucketKey = "bucket" -) - -// newMounter returns a new mounter depending on the mounterType parameter -func newMounter(bucket *bucket, cfg *Config) (Mounter, error) { - mounter := bucket.Mounter - // Fall back to mounterType in cfg - if len(bucket.Mounter) == 0 { - mounter = cfg.Mounter - } - switch mounter { - case s3fsMounterType: - return newS3fsMounter(bucket, cfg) - - case goofysMounterType: - return newGoofysMounter(bucket, cfg) - - case s3backerMounterType: - return newS3backerMounter(bucket, cfg) - - case rcloneMounterType: - return newRcloneMounter(bucket, cfg) - - default: - // default to s3backer - return newS3backerMounter(bucket, cfg) - } -} - -func fuseMount(path string, command string, args []string) error { - cmd := exec.Command(command, args...) - glog.V(3).Infof("Mounting fuse with command: %s and args: %s", command, args) - - out, err := cmd.CombinedOutput() - if err != nil { - return fmt.Errorf("Error fuseMount command: %s\nargs: %s\noutput: %s", command, args, out) - } - - return waitForMount(path, 10*time.Second) -} - -func fuseUnmount(path string) error { - if err := mount.New("").Unmount(path); err != nil { - return err - } - // as fuse quits immediately, we will try to wait until the process is done - process, err := findFuseMountProcess(path) - if err != nil { - glog.Errorf("Error getting PID of fuse mount: %s", err) - return nil - } - if process == nil { - glog.Warningf("Unable to find PID of fuse mount %s, it must have finished already", path) - return nil - } - glog.Infof("Found fuse pid %v of mount %s, checking if it still runs", process.Pid, path) - return waitForProcess(process, 1) -} diff --git a/test/test.sh b/test/test.sh index f17838a..7fe3d93 100755 --- a/test/test.sh +++ b/test/test.sh @@ -5,4 +5,4 @@ export MINIO_SECRET_KEY=DSG643HGDS mkdir -p /tmp/minio minio server /tmp/minio &>/dev/null & sleep 5 -go test github.com/ctrox/csi-s3/pkg/s3 -cover \ No newline at end of file +go test ./... -cover