diff --git a/pkg/s3/mounter.go b/pkg/s3/mounter.go index b05ae8f..0d8426a 100644 --- a/pkg/s3/mounter.go +++ b/pkg/s3/mounter.go @@ -11,9 +11,10 @@ import ( // Mounter interface which can be implemented // by the different mounter types type Mounter interface { - Format() error - Mount(targetPath string) error - Unmount(targetPath string) error + Stage(stagePath string) error + Unstage(stagePath string) error + Mount(source string, target string) error + Unmount(target string) error } const ( diff --git a/pkg/s3/mounter_goofys.go b/pkg/s3/mounter_goofys.go index 2573c3a..31a37c4 100644 --- a/pkg/s3/mounter_goofys.go +++ b/pkg/s3/mounter_goofys.go @@ -38,13 +38,17 @@ func newGoofysMounter(bucket string, cfg *Config) (Mounter, error) { }, nil } -func (goofys *goofysMounter) Format() error { +func (goofys *goofysMounter) Stage(stageTarget string) error { return nil } -func (goofys *goofysMounter) Mount(targetPath string) error { +func (goofys *goofysMounter) Unstage(stageTarget string) error { + return nil +} + +func (goofys *goofysMounter) Mount(source string, target string) error { goofysCfg := &goofysApi.Config{ - MountPoint: targetPath, + MountPoint: target, Endpoint: goofys.endpoint, Region: goofys.region, DirMode: 0755, diff --git a/pkg/s3/mounter_s3backer.go b/pkg/s3/mounter_s3backer.go index f5ee1c0..bf6dbd8 100644 --- a/pkg/s3/mounter_s3backer.go +++ b/pkg/s3/mounter_s3backer.go @@ -2,7 +2,6 @@ package s3 import ( "fmt" - "io/ioutil" "os" "os/exec" "path" @@ -19,14 +18,12 @@ type s3backerMounter struct { accessKeyID string secretAccessKey string size int64 - initMountPath string } const ( - s3backerCmd = "s3backer" - s3backerFsType = "xfs" - s3backerMountBase = "/mnt" - s3backerDevice = "file" + s3backerCmd = "s3backer" + s3backerFsType = "xfs" + s3backerDevice = "file" // blockSize to use in k s3backerBlockSize = "128k" ) @@ -38,7 +35,6 @@ func newS3backerMounter(bucket string, cfg *Config) (Mounter, error) { region: cfg.Region, accessKeyID: cfg.AccessKeyID, secretAccessKey: cfg.SecretAccessKey, - initMountPath: path.Join(s3backerMountBase, bucket), size: 1024 * 1024 * 1024 * 10, } @@ -49,37 +45,32 @@ func (s3backer *s3backerMounter) String() string { return s3backer.bucket } -func (s3backer *s3backerMounter) Format() error { - tmpDir, err := ioutil.TempDir("", "s3backer") - if err != nil { - return err - } - defer os.RemoveAll(tmpDir) - - if err := s3backer.mountInit(tmpDir); err != nil { - return err - } - defer fuseUnmount(tmpDir, s3backerCmd) - - return formatFs(s3backerFsType, path.Join(tmpDir, s3backerDevice)) -} - -func (s3backer *s3backerMounter) Mount(targetPath string) error { - if err := os.MkdirAll(s3backer.initMountPath, 0700); err != nil { - return err - } +func (s3backer *s3backerMounter) Stage(stageTarget string) error { // s3backer requires two mounts // first mount will fuse mount the bucket to a single 'file' - err := s3backer.mountInit(s3backer.initMountPath) - if err != nil { + if err := s3backer.mountInit(stageTarget); err != nil { return err } - device := path.Join(s3backer.initMountPath, s3backerDevice) + // ensure 'file' device is formatted + err := formatFs(s3backerFsType, path.Join(stageTarget, s3backerDevice)) + if err != nil { + fuseUnmount(stageTarget, s3backerCmd) + } + return err +} + +func (s3backer *s3backerMounter) Unstage(stageTarget string) error { + // Unmount the s3backer fuse mount + return fuseUnmount(stageTarget, s3backerCmd) +} + +func (s3backer *s3backerMounter) Mount(source string, target string) error { + device := path.Join(source, s3backerDevice) // second mount will mount the 'file' as a filesystem - err = mount.New("").Mount(device, targetPath, s3backerFsType, []string{}) + err := mount.New("").Mount(device, target, s3backerFsType, []string{}) if err != nil { // cleanup fuse mount - fuseUnmount(targetPath, s3backerCmd) + fuseUnmount(target, s3backerCmd) return err } return nil @@ -87,15 +78,7 @@ func (s3backer *s3backerMounter) Mount(targetPath string) error { func (s3backer *s3backerMounter) Unmount(targetPath string) error { // Unmount the filesystem first - if err := mount.New("").Unmount(targetPath); err != nil { - return err - } - // Unmount the s3backer fuse mount - err := fuseUnmount(s3backer.initMountPath, s3backerCmd) - if err != nil { - return err - } - return nil + return mount.New("").Unmount(targetPath) } func (s3backer *s3backerMounter) mountInit(path string) error { diff --git a/pkg/s3/mounter_s3fs.go b/pkg/s3/mounter_s3fs.go index 318d564..2b90aa3 100644 --- a/pkg/s3/mounter_s3fs.go +++ b/pkg/s3/mounter_s3fs.go @@ -26,17 +26,21 @@ func newS3fsMounter(bucket string, cfg *Config) (Mounter, error) { }, nil } -func (s3fs *s3fsMounter) Format() error { +func (s3fs *s3fsMounter) Stage(stageTarget string) error { return nil } -func (s3fs *s3fsMounter) Mount(targetPath string) error { +func (s3fs *s3fsMounter) Unstage(stageTarget string) error { + return nil +} + +func (s3fs *s3fsMounter) Mount(source string, target string) error { if err := writes3fsPass(s3fs.pwFileContent); err != nil { return err } args := []string{ fmt.Sprintf("%s", s3fs.bucket), - fmt.Sprintf("%s", targetPath), + fmt.Sprintf("%s", target), "-o", "sigv2", "-o", "use_path_request_style", "-o", fmt.Sprintf("url=%s", s3fs.url), @@ -44,11 +48,11 @@ func (s3fs *s3fsMounter) Mount(targetPath string) error { "-o", "allow_other", "-o", "mp_umask=000", } - return fuseMount(targetPath, s3fsCmd, args) + return fuseMount(target, s3fsCmd, args) } -func (s3fs *s3fsMounter) Unmount(targetPath string) error { - return fuseUnmount(targetPath, s3fsCmd) +func (s3fs *s3fsMounter) Unmount(target string) error { + return fuseUnmount(target, s3fsCmd) } func writes3fsPass(pwFileContent string) error { diff --git a/pkg/s3/mounter_s3ql.go b/pkg/s3/mounter_s3ql.go index 2d85971..3bd095b 100644 --- a/pkg/s3/mounter_s3ql.go +++ b/pkg/s3/mounter_s3ql.go @@ -59,7 +59,7 @@ func newS3qlMounter(bucket string, cfg *Config) (Mounter, error) { return s3ql, s3ql.writeConfig() } -func (s3ql *s3qlMounter) Format() error { +func (s3ql *s3qlMounter) Stage(stagePath string) error { // force creation to ignore existing data args := []string{ s3ql.bucketURL, @@ -78,17 +78,21 @@ func (s3ql *s3qlMounter) Format() error { return nil } -func (s3ql *s3qlMounter) Mount(targetPath string) error { - args := []string{ - s3ql.bucketURL, - targetPath, - "--allow-other", - } - return fuseMount(targetPath, s3qlCmdMount, append(args, s3ql.options...)) +func (s3ql *s3qlMounter) Unstage(stagePath string) error { + return nil } -func (s3ql *s3qlMounter) Unmount(targetPath string) error { - return fuseUnmount(targetPath, s3qlCmdMount) +func (s3ql *s3qlMounter) Mount(source string, target string) error { + args := []string{ + s3ql.bucketURL, + target, + "--allow-other", + } + return fuseMount(target, s3qlCmdMount, append(args, s3ql.options...)) +} + +func (s3ql *s3qlMounter) Unmount(target string) error { + return fuseUnmount(target, s3qlCmdMount) } func (s3ql *s3qlMounter) writeConfig() error { diff --git a/pkg/s3/nodeserver.go b/pkg/s3/nodeserver.go index 6e5fae8..f86d6d4 100644 --- a/pkg/s3/nodeserver.go +++ b/pkg/s3/nodeserver.go @@ -44,23 +44,19 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis if len(req.GetVolumeId()) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } + if len(req.GetStagingTargetPath()) == 0 { + return nil, status.Error(codes.InvalidArgument, "Staging Target path missing in request") + } if len(req.GetTargetPath()) == 0 { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } targetPath := req.GetTargetPath() - notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath) + stagingPath := req.GetStagingTargetPath() + notMnt, err := checkMount(targetPath) if err != nil { - if os.IsNotExist(err) { - if err = os.MkdirAll(targetPath, 0750); err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - notMnt = true - } else { - return nil, status.Error(codes.Internal, err.Error()) - } + return nil, status.Error(codes.Internal, err.Error()) } - if !notMnt { return &csi.NodePublishVolumeResponse{}, nil } @@ -82,10 +78,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis if err != nil { return nil, err } - if err := mounter.Format(); err != nil { - return nil, err - } - if err := mounter.Mount(targetPath); err != nil { + if err := mounter.Mount(stagingPath, targetPath); err != nil { return nil, err } @@ -116,16 +109,91 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu return &csi.NodeUnpublishVolumeResponse{}, nil } -func (ns *nodeServer) NodeStageVolume( - ctx context.Context, - req *csi.NodeStageVolumeRequest) ( - *csi.NodeStageVolumeResponse, error) { - return nil, status.Error(codes.Unimplemented, "") +func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { + + // Check arguments + if len(req.GetVolumeId()) == 0 { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + + if len(req.GetStagingTargetPath()) == 0 { + return nil, status.Error(codes.InvalidArgument, "Target path missing in request") + } + + if req.VolumeCapability == nil { + return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Volume Capability must be provided") + } + + stagingPath := req.GetStagingTargetPath() + notMnt, err := checkMount(stagingPath) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + if !notMnt { + return &csi.NodeStageVolumeResponse{}, nil + } + + mounter, err := newMounter(req.GetVolumeId(), ns.s3.cfg) + if err != nil { + return nil, err + } + if err := mounter.Stage(stagingPath); err != nil { + return nil, err + } + + return &csi.NodeStageVolumeResponse{}, nil } -func (ns *nodeServer) NodeUnstageVolume( - ctx context.Context, - req *csi.NodeUnstageVolumeRequest) ( - *csi.NodeUnstageVolumeResponse, error) { - return nil, status.Error(codes.Unimplemented, "") +func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { + + // Check arguments + if len(req.GetVolumeId()) == 0 { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + if len(req.GetStagingTargetPath()) == 0 { + return nil, status.Error(codes.InvalidArgument, "Target path missing in request") + } + + mounter, err := newMounter(req.GetVolumeId(), ns.s3.cfg) + if err != nil { + return nil, err + } + if err := mounter.Unstage(req.GetStagingTargetPath()); err != nil { + return nil, err + } + + return &csi.NodeUnstageVolumeResponse{}, nil +} + +// NodeGetCapabilities returns the supported capabilities of the node server +func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { + // currently there is a single NodeServer capability according to the spec + nscap := &csi.NodeServiceCapability{ + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, + }, + }, + } + + return &csi.NodeGetCapabilitiesResponse{ + Capabilities: []*csi.NodeServiceCapability{ + nscap, + }, + }, nil +} + +func checkMount(targetPath string) (bool, error) { + notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath) + if err != nil { + if os.IsNotExist(err) { + if err = os.MkdirAll(targetPath, 0750); err != nil { + return false, err + } + notMnt = true + } else { + return false, err + } + } + return notMnt, nil } diff --git a/pkg/s3/s3-driver_suite_test.go b/pkg/s3/s3-driver_suite_test.go index ad850e7..451132d 100644 --- a/pkg/s3/s3-driver_suite_test.go +++ b/pkg/s3/s3-driver_suite_test.go @@ -15,13 +15,12 @@ import ( const () var _ = Describe("S3Driver", func() { - mntDir, err := ioutil.TempDir("", "mnt") - if err != nil { - Expect(err).NotTo(HaveOccurred()) - } + mntDir, _ := ioutil.TempDir("", "mnt") + stagingDir, _ := ioutil.TempDir("", "staging") AfterSuite(func() { os.RemoveAll(mntDir) + os.RemoveAll(stagingDir) }) Context("goofys", func() { @@ -45,6 +44,7 @@ var _ = Describe("S3Driver", func() { Describe("CSI sanity", func() { sanityCfg := &sanity.Config{ TargetPath: mntDir, + StagingPath: stagingDir, Address: csiEndpoint, TestVolumeSize: 1, } @@ -70,11 +70,10 @@ var _ = Describe("S3Driver", func() { } go driver.Run() - defer os.RemoveAll(mntDir) - Describe("CSI sanity", func() { sanityCfg := &sanity.Config{ TargetPath: mntDir, + StagingPath: stagingDir, Address: csiEndpoint, TestVolumeSize: 1, } @@ -106,6 +105,7 @@ var _ = Describe("S3Driver", func() { Describe("CSI sanity", func() { sanityCfg := &sanity.Config{ TargetPath: mntDir, + StagingPath: stagingDir, Address: csiEndpoint, TestVolumeSize: 1, } @@ -132,11 +132,10 @@ var _ = Describe("S3Driver", func() { } go driver.Run() - defer os.RemoveAll(mntDir) - Describe("CSI sanity", func() { sanityCfg := &sanity.Config{ TargetPath: mntDir, + StagingPath: stagingDir, Address: csiEndpoint, TestVolumeSize: 1, } diff --git a/pkg/s3/util.go b/pkg/s3/util.go index 87f1c96..3022b8e 100644 --- a/pkg/s3/util.go +++ b/pkg/s3/util.go @@ -55,7 +55,6 @@ func findFuseMountProcess(path string, name string) (*os.Process, error) { return os.FindProcess(p.Pid()) } } - fmt.Println(p.Executable()) } return nil, nil }