Implement stage/unstage

This helps the reliability of s3backer as the fuse mount
is done on NodeStageVolume and only once per volume per
node.
This commit is contained in:
Cyrill Troxler 2018-07-26 22:43:51 +02:00
parent 1fe218a568
commit 0010066fe3
8 changed files with 157 additions and 95 deletions

View file

@ -11,9 +11,10 @@ import (
// Mounter interface which can be implemented // Mounter interface which can be implemented
// by the different mounter types // by the different mounter types
type Mounter interface { type Mounter interface {
Format() error Stage(stagePath string) error
Mount(targetPath string) error Unstage(stagePath string) error
Unmount(targetPath string) error Mount(source string, target string) error
Unmount(target string) error
} }
const ( const (

View file

@ -38,13 +38,17 @@ func newGoofysMounter(bucket string, cfg *Config) (Mounter, error) {
}, nil }, nil
} }
func (goofys *goofysMounter) Format() error { func (goofys *goofysMounter) Stage(stageTarget string) error {
return nil return nil
} }
func (goofys *goofysMounter) Mount(targetPath string) error { func (goofys *goofysMounter) Unstage(stageTarget string) error {
return nil
}
func (goofys *goofysMounter) Mount(source string, target string) error {
goofysCfg := &goofysApi.Config{ goofysCfg := &goofysApi.Config{
MountPoint: targetPath, MountPoint: target,
Endpoint: goofys.endpoint, Endpoint: goofys.endpoint,
Region: goofys.region, Region: goofys.region,
DirMode: 0755, DirMode: 0755,

View file

@ -2,7 +2,6 @@ package s3
import ( import (
"fmt" "fmt"
"io/ioutil"
"os" "os"
"os/exec" "os/exec"
"path" "path"
@ -19,13 +18,11 @@ type s3backerMounter struct {
accessKeyID string accessKeyID string
secretAccessKey string secretAccessKey string
size int64 size int64
initMountPath string
} }
const ( const (
s3backerCmd = "s3backer" s3backerCmd = "s3backer"
s3backerFsType = "xfs" s3backerFsType = "xfs"
s3backerMountBase = "/mnt"
s3backerDevice = "file" s3backerDevice = "file"
// blockSize to use in k // blockSize to use in k
s3backerBlockSize = "128k" s3backerBlockSize = "128k"
@ -38,7 +35,6 @@ func newS3backerMounter(bucket string, cfg *Config) (Mounter, error) {
region: cfg.Region, region: cfg.Region,
accessKeyID: cfg.AccessKeyID, accessKeyID: cfg.AccessKeyID,
secretAccessKey: cfg.SecretAccessKey, secretAccessKey: cfg.SecretAccessKey,
initMountPath: path.Join(s3backerMountBase, bucket),
size: 1024 * 1024 * 1024 * 10, size: 1024 * 1024 * 1024 * 10,
} }
@ -49,37 +45,32 @@ func (s3backer *s3backerMounter) String() string {
return s3backer.bucket return s3backer.bucket
} }
func (s3backer *s3backerMounter) Format() error { func (s3backer *s3backerMounter) Stage(stageTarget string) error {
tmpDir, err := ioutil.TempDir("", "s3backer")
if err != nil {
return err
}
defer os.RemoveAll(tmpDir)
if err := s3backer.mountInit(tmpDir); err != nil {
return err
}
defer fuseUnmount(tmpDir, s3backerCmd)
return formatFs(s3backerFsType, path.Join(tmpDir, s3backerDevice))
}
func (s3backer *s3backerMounter) Mount(targetPath string) error {
if err := os.MkdirAll(s3backer.initMountPath, 0700); err != nil {
return err
}
// s3backer requires two mounts // s3backer requires two mounts
// first mount will fuse mount the bucket to a single 'file' // first mount will fuse mount the bucket to a single 'file'
err := s3backer.mountInit(s3backer.initMountPath) if err := s3backer.mountInit(stageTarget); err != nil {
if err != nil {
return err return err
} }
device := path.Join(s3backer.initMountPath, s3backerDevice) // ensure 'file' device is formatted
err := formatFs(s3backerFsType, path.Join(stageTarget, s3backerDevice))
if err != nil {
fuseUnmount(stageTarget, s3backerCmd)
}
return err
}
func (s3backer *s3backerMounter) Unstage(stageTarget string) error {
// Unmount the s3backer fuse mount
return fuseUnmount(stageTarget, s3backerCmd)
}
func (s3backer *s3backerMounter) Mount(source string, target string) error {
device := path.Join(source, s3backerDevice)
// second mount will mount the 'file' as a filesystem // second mount will mount the 'file' as a filesystem
err = mount.New("").Mount(device, targetPath, s3backerFsType, []string{}) err := mount.New("").Mount(device, target, s3backerFsType, []string{})
if err != nil { if err != nil {
// cleanup fuse mount // cleanup fuse mount
fuseUnmount(targetPath, s3backerCmd) fuseUnmount(target, s3backerCmd)
return err return err
} }
return nil return nil
@ -87,15 +78,7 @@ func (s3backer *s3backerMounter) Mount(targetPath string) error {
func (s3backer *s3backerMounter) Unmount(targetPath string) error { func (s3backer *s3backerMounter) Unmount(targetPath string) error {
// Unmount the filesystem first // Unmount the filesystem first
if err := mount.New("").Unmount(targetPath); err != nil { return mount.New("").Unmount(targetPath)
return err
}
// Unmount the s3backer fuse mount
err := fuseUnmount(s3backer.initMountPath, s3backerCmd)
if err != nil {
return err
}
return nil
} }
func (s3backer *s3backerMounter) mountInit(path string) error { func (s3backer *s3backerMounter) mountInit(path string) error {

View file

@ -26,17 +26,21 @@ func newS3fsMounter(bucket string, cfg *Config) (Mounter, error) {
}, nil }, nil
} }
func (s3fs *s3fsMounter) Format() error { func (s3fs *s3fsMounter) Stage(stageTarget string) error {
return nil return nil
} }
func (s3fs *s3fsMounter) Mount(targetPath string) error { func (s3fs *s3fsMounter) Unstage(stageTarget string) error {
return nil
}
func (s3fs *s3fsMounter) Mount(source string, target string) error {
if err := writes3fsPass(s3fs.pwFileContent); err != nil { if err := writes3fsPass(s3fs.pwFileContent); err != nil {
return err return err
} }
args := []string{ args := []string{
fmt.Sprintf("%s", s3fs.bucket), fmt.Sprintf("%s", s3fs.bucket),
fmt.Sprintf("%s", targetPath), fmt.Sprintf("%s", target),
"-o", "sigv2", "-o", "sigv2",
"-o", "use_path_request_style", "-o", "use_path_request_style",
"-o", fmt.Sprintf("url=%s", s3fs.url), "-o", fmt.Sprintf("url=%s", s3fs.url),
@ -44,11 +48,11 @@ func (s3fs *s3fsMounter) Mount(targetPath string) error {
"-o", "allow_other", "-o", "allow_other",
"-o", "mp_umask=000", "-o", "mp_umask=000",
} }
return fuseMount(targetPath, s3fsCmd, args) return fuseMount(target, s3fsCmd, args)
} }
func (s3fs *s3fsMounter) Unmount(targetPath string) error { func (s3fs *s3fsMounter) Unmount(target string) error {
return fuseUnmount(targetPath, s3fsCmd) return fuseUnmount(target, s3fsCmd)
} }
func writes3fsPass(pwFileContent string) error { func writes3fsPass(pwFileContent string) error {

View file

@ -59,7 +59,7 @@ func newS3qlMounter(bucket string, cfg *Config) (Mounter, error) {
return s3ql, s3ql.writeConfig() return s3ql, s3ql.writeConfig()
} }
func (s3ql *s3qlMounter) Format() error { func (s3ql *s3qlMounter) Stage(stagePath string) error {
// force creation to ignore existing data // force creation to ignore existing data
args := []string{ args := []string{
s3ql.bucketURL, s3ql.bucketURL,
@ -78,17 +78,21 @@ func (s3ql *s3qlMounter) Format() error {
return nil return nil
} }
func (s3ql *s3qlMounter) Mount(targetPath string) error { func (s3ql *s3qlMounter) Unstage(stagePath string) error {
args := []string{ return nil
s3ql.bucketURL,
targetPath,
"--allow-other",
}
return fuseMount(targetPath, s3qlCmdMount, append(args, s3ql.options...))
} }
func (s3ql *s3qlMounter) Unmount(targetPath string) error { func (s3ql *s3qlMounter) Mount(source string, target string) error {
return fuseUnmount(targetPath, s3qlCmdMount) args := []string{
s3ql.bucketURL,
target,
"--allow-other",
}
return fuseMount(target, s3qlCmdMount, append(args, s3ql.options...))
}
func (s3ql *s3qlMounter) Unmount(target string) error {
return fuseUnmount(target, s3qlCmdMount)
} }
func (s3ql *s3qlMounter) writeConfig() error { func (s3ql *s3qlMounter) writeConfig() error {

View file

@ -44,23 +44,19 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
if len(req.GetVolumeId()) == 0 { if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
} }
if len(req.GetStagingTargetPath()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Staging Target path missing in request")
}
if len(req.GetTargetPath()) == 0 { if len(req.GetTargetPath()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request") return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
} }
targetPath := req.GetTargetPath() targetPath := req.GetTargetPath()
notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath) stagingPath := req.GetStagingTargetPath()
notMnt, err := checkMount(targetPath)
if err != nil { if err != nil {
if os.IsNotExist(err) {
if err = os.MkdirAll(targetPath, 0750); err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
notMnt = true
} else {
return nil, status.Error(codes.Internal, err.Error())
}
}
if !notMnt { if !notMnt {
return &csi.NodePublishVolumeResponse{}, nil return &csi.NodePublishVolumeResponse{}, nil
} }
@ -82,10 +78,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := mounter.Format(); err != nil { if err := mounter.Mount(stagingPath, targetPath); err != nil {
return nil, err
}
if err := mounter.Mount(targetPath); err != nil {
return nil, err return nil, err
} }
@ -116,16 +109,91 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return &csi.NodeUnpublishVolumeResponse{}, nil return &csi.NodeUnpublishVolumeResponse{}, nil
} }
func (ns *nodeServer) NodeStageVolume( func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
ctx context.Context,
req *csi.NodeStageVolumeRequest) ( // Check arguments
*csi.NodeStageVolumeResponse, error) { if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.Unimplemented, "") return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
} }
func (ns *nodeServer) NodeUnstageVolume( if len(req.GetStagingTargetPath()) == 0 {
ctx context.Context, return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
req *csi.NodeUnstageVolumeRequest) ( }
*csi.NodeUnstageVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "") if req.VolumeCapability == nil {
return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Volume Capability must be provided")
}
stagingPath := req.GetStagingTargetPath()
notMnt, err := checkMount(stagingPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if !notMnt {
return &csi.NodeStageVolumeResponse{}, nil
}
mounter, err := newMounter(req.GetVolumeId(), ns.s3.cfg)
if err != nil {
return nil, err
}
if err := mounter.Stage(stagingPath); err != nil {
return nil, err
}
return &csi.NodeStageVolumeResponse{}, nil
}
func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
// Check arguments
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if len(req.GetStagingTargetPath()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
mounter, err := newMounter(req.GetVolumeId(), ns.s3.cfg)
if err != nil {
return nil, err
}
if err := mounter.Unstage(req.GetStagingTargetPath()); err != nil {
return nil, err
}
return &csi.NodeUnstageVolumeResponse{}, nil
}
// NodeGetCapabilities returns the supported capabilities of the node server
func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
// currently there is a single NodeServer capability according to the spec
nscap := &csi.NodeServiceCapability{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
},
},
}
return &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability{
nscap,
},
}, nil
}
func checkMount(targetPath string) (bool, error) {
notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath)
if err != nil {
if os.IsNotExist(err) {
if err = os.MkdirAll(targetPath, 0750); err != nil {
return false, err
}
notMnt = true
} else {
return false, err
}
}
return notMnt, nil
} }

View file

@ -15,13 +15,12 @@ import (
const () const ()
var _ = Describe("S3Driver", func() { var _ = Describe("S3Driver", func() {
mntDir, err := ioutil.TempDir("", "mnt") mntDir, _ := ioutil.TempDir("", "mnt")
if err != nil { stagingDir, _ := ioutil.TempDir("", "staging")
Expect(err).NotTo(HaveOccurred())
}
AfterSuite(func() { AfterSuite(func() {
os.RemoveAll(mntDir) os.RemoveAll(mntDir)
os.RemoveAll(stagingDir)
}) })
Context("goofys", func() { Context("goofys", func() {
@ -45,6 +44,7 @@ var _ = Describe("S3Driver", func() {
Describe("CSI sanity", func() { Describe("CSI sanity", func() {
sanityCfg := &sanity.Config{ sanityCfg := &sanity.Config{
TargetPath: mntDir, TargetPath: mntDir,
StagingPath: stagingDir,
Address: csiEndpoint, Address: csiEndpoint,
TestVolumeSize: 1, TestVolumeSize: 1,
} }
@ -70,11 +70,10 @@ var _ = Describe("S3Driver", func() {
} }
go driver.Run() go driver.Run()
defer os.RemoveAll(mntDir)
Describe("CSI sanity", func() { Describe("CSI sanity", func() {
sanityCfg := &sanity.Config{ sanityCfg := &sanity.Config{
TargetPath: mntDir, TargetPath: mntDir,
StagingPath: stagingDir,
Address: csiEndpoint, Address: csiEndpoint,
TestVolumeSize: 1, TestVolumeSize: 1,
} }
@ -106,6 +105,7 @@ var _ = Describe("S3Driver", func() {
Describe("CSI sanity", func() { Describe("CSI sanity", func() {
sanityCfg := &sanity.Config{ sanityCfg := &sanity.Config{
TargetPath: mntDir, TargetPath: mntDir,
StagingPath: stagingDir,
Address: csiEndpoint, Address: csiEndpoint,
TestVolumeSize: 1, TestVolumeSize: 1,
} }
@ -132,11 +132,10 @@ var _ = Describe("S3Driver", func() {
} }
go driver.Run() go driver.Run()
defer os.RemoveAll(mntDir)
Describe("CSI sanity", func() { Describe("CSI sanity", func() {
sanityCfg := &sanity.Config{ sanityCfg := &sanity.Config{
TargetPath: mntDir, TargetPath: mntDir,
StagingPath: stagingDir,
Address: csiEndpoint, Address: csiEndpoint,
TestVolumeSize: 1, TestVolumeSize: 1,
} }

View file

@ -55,7 +55,6 @@ func findFuseMountProcess(path string, name string) (*os.Process, error) {
return os.FindProcess(p.Pid()) return os.FindProcess(p.Pid())
} }
} }
fmt.Println(p.Executable())
} }
return nil, nil return nil, nil
} }