From 82ab4b09835d44e6abce76fdde8fadfe57d5d5b7 Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Sun, 22 Jul 2018 22:08:48 +0200 Subject: [PATCH] Add experimental s3backer mounter This also adds some generic handling of stale umounts. Fuse returns immediately and does not indicate that the mounter has finished writing to the backend. The process finding is sort of hacky as I could not find a better way to get to the PID from a fuse mount. --- Gopkg.lock | 8 +- Gopkg.toml | 4 + Makefile | 2 +- pkg/s3/mounter.go | 47 +++++++++- pkg/s3/mounter_goofys.go | 8 +- pkg/s3/mounter_s3backer.go | 153 +++++++++++++++++++++++++++++++++ pkg/s3/mounter_s3fs.go | 16 ++-- pkg/s3/mounter_s3ql.go | 29 +++---- pkg/s3/s3-driver_suite_test.go | 31 +++++++ pkg/s3/util.go | 64 ++++++++++++++ test/Dockerfile.s3backer | 28 ++++++ 11 files changed, 354 insertions(+), 36 deletions(-) create mode 100644 pkg/s3/mounter_s3backer.go create mode 100644 pkg/s3/util.go create mode 100644 test/Dockerfile.s3backer diff --git a/Gopkg.lock b/Gopkg.lock index d56885f..d11e6d0 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -158,6 +158,12 @@ packages = ["."] revision = "3864e76763d94a6df2f9960b16a20a33da9f9a66" +[[projects]] + branch = "master" + name = "github.com/mitchellh/go-ps" + packages = ["."] + revision = "4fdf99ab29366514c69ccccddab5dc58b8d84062" + [[projects]] name = "github.com/onsi/ginkgo" packages = [ @@ -372,6 +378,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "659f47734b56af7fba146039231841e82cc4c3c95dff2814ec3688e967790a50" + inputs-digest = "a655e5231fe7b5c962579c8f032338c53177a95a3160bef4628f0761f714f730" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 98f9a1b..eef4e5a 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -41,3 +41,7 @@ [[constraint]] name = "gopkg.in/ini.v1" version = "1.38.1" + +[[constraint]] + branch = "master" + name = "github.com/mitchellh/go-ps" diff --git a/Makefile b/Makefile index 1b55be7..24d68e4 100644 --- a/Makefile +++ b/Makefile @@ -24,7 +24,7 @@ build: if [ ! -d ./vendor ]; then dep ensure -vendor-only; fi CGO_ENABLED=0 GOOS=linux go build -a -ldflags '-extldflags "-static"' -o _output/s3driver ./cmd/s3driver test: - docker build -t $(TEST_IMAGE_TAG) -f test/Dockerfile . + docker build -t $(TEST_IMAGE_TAG) -f test/Dockerfile.s3backer . docker run --rm --privileged -v $(PWD):$(PROJECT_DIR):ro -v /dev:/dev $(TEST_IMAGE_TAG) container: build docker build -t $(IMAGE_TAG) -f cmd/s3driver/Dockerfile.s3ql . diff --git a/pkg/s3/mounter.go b/pkg/s3/mounter.go index aaaefd9..b05ae8f 100644 --- a/pkg/s3/mounter.go +++ b/pkg/s3/mounter.go @@ -1,6 +1,12 @@ package s3 -import "fmt" +import ( + "fmt" + "os/exec" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/util/mount" +) // Mounter interface which can be implemented // by the different mounter types @@ -11,9 +17,10 @@ type Mounter interface { } const ( - s3fsMounterType = "s3fs" - goofysMounterType = "goofys" - s3qlMounterType = "s3ql" + s3fsMounterType = "s3fs" + goofysMounterType = "goofys" + s3qlMounterType = "s3ql" + s3backerMounterType = "s3backer" ) // newMounter returns a new mounter depending on the mounterType parameter @@ -28,6 +35,38 @@ func newMounter(bucket string, cfg *Config) (Mounter, error) { case s3qlMounterType: return newS3qlMounter(bucket, cfg) + case s3backerMounterType: + return newS3backerMounter(bucket, cfg) + } return nil, fmt.Errorf("Error mounting bucket %s, invalid mounter specified: %s", bucket, cfg.Mounter) } + +func fuseMount(path string, command string, args []string) error { + cmd := exec.Command(command, args...) + + out, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("Error fuseMount command: %s\nargs: %s\noutput: %s", command, args, out) + } + + return nil +} + +func fuseUnmount(path string, command string) error { + if err := mount.New("").Unmount(path); err != nil { + return err + } + // as fuse quits immediately, we will try to wait until the process is done + process, err := findFuseMountProcess(path, command) + if err != nil { + glog.Errorf("Error getting PID of fuse mount: %s", err) + return nil + } + if process == nil { + glog.Warningf("Unable to find PID of fuse mount %s, it must have finished already", path) + return nil + } + glog.Infof("Found fuse pid %v of mount %s, checking if it still runs", process.Pid, path) + return waitForProcess(process, 1) +} diff --git a/pkg/s3/mounter_goofys.go b/pkg/s3/mounter_goofys.go index c3a0066..2573c3a 100644 --- a/pkg/s3/mounter_goofys.go +++ b/pkg/s3/mounter_goofys.go @@ -7,10 +7,12 @@ import ( "context" goofysApi "github.com/kahing/goofys/api" - "k8s.io/kubernetes/pkg/util/mount" ) -const defaultRegion = "us-east-1" +const ( + goofysCmd = "goofys" + defaultRegion = "us-east-1" +) // Implements Mounter type goofysMounter struct { @@ -64,5 +66,5 @@ func (goofys *goofysMounter) Mount(targetPath string) error { } func (goofys *goofysMounter) Unmount(targetPath string) error { - return mount.New("").Unmount(targetPath) + return fuseUnmount(targetPath, goofysCmd) } diff --git a/pkg/s3/mounter_s3backer.go b/pkg/s3/mounter_s3backer.go new file mode 100644 index 0000000..cc9318b --- /dev/null +++ b/pkg/s3/mounter_s3backer.go @@ -0,0 +1,153 @@ +package s3 + +import ( + "fmt" + "io/ioutil" + "os" + "os/exec" + "path" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/util/mount" +) + +// Implements Mounter +type s3backerMounter struct { + bucket string + url string + region string + accessKeyID string + secretAccessKey string + size int64 + initMountPath string +} + +const ( + s3backerCmd = "s3backer" + s3backerFsType = "xfs" + s3backerMountBase = "/mnt" + s3backerDevice = "file" + // blockSize to use in k + s3backerBlockSize = "128k" +) + +func newS3backerMounter(bucket string, cfg *Config) (Mounter, error) { + s3backer := &s3backerMounter{ + bucket: bucket, + url: cfg.Endpoint, + region: cfg.Region, + accessKeyID: cfg.AccessKeyID, + secretAccessKey: cfg.SecretAccessKey, + initMountPath: path.Join(s3backerMountBase, bucket), + size: 1024 * 1024 * 1024 * 10, + } + + return s3backer, s3backer.writePasswd() +} + +func (s3backer *s3backerMounter) String() string { + return s3backer.bucket +} + +func (s3backer *s3backerMounter) Format() error { + tmpDir, err := ioutil.TempDir("", "s3backer") + if err != nil { + return err + } + defer os.RemoveAll(tmpDir) + + if err := s3backer.mountInit(tmpDir); err != nil { + return err + } + defer fuseUnmount(tmpDir, s3backerCmd) + + return formatFs(s3backerFsType, path.Join(tmpDir, s3backerDevice)) +} + +func (s3backer *s3backerMounter) Mount(targetPath string) error { + if err := os.MkdirAll(s3backer.initMountPath, 0700); err != nil { + return err + } + // s3backer requires two mounts + // first mount will fuse mount the bucket to a single 'file' + err := s3backer.mountInit(s3backer.initMountPath) + if err != nil { + return err + } + device := path.Join(s3backer.initMountPath, s3backerDevice) + // second mount will mount the 'file' as a filesystem + err = mount.New("").Mount(device, targetPath, s3backerFsType, []string{}) + if err != nil { + // cleanup fuse mount + fuseUnmount(targetPath, s3backerCmd) + return err + } + return nil +} + +func (s3backer *s3backerMounter) Unmount(targetPath string) error { + // Unmount the filesystem first + if err := mount.New("").Unmount(targetPath); err != nil { + return err + } + // Unmount the s3backer fuse mount + err := fuseUnmount(s3backer.initMountPath, s3backerCmd) + if err != nil { + return err + } + return nil +} + +func (s3backer *s3backerMounter) mountInit(path string) error { + args := []string{ + // baseURL must end with / + fmt.Sprintf("--baseURL=%s/", s3backer.url), + fmt.Sprintf("--blockSize=%v", s3backerBlockSize), + fmt.Sprintf("--size=%v", s3backer.size), + "--listBlocks", + s3backer.bucket, + path, + } + if s3backer.region != "" { + args = append(args, fmt.Sprintf("--region=%s", s3backer.region)) + } + + return fuseMount(path, s3backerCmd, args) +} + +func (s3backer *s3backerMounter) writePasswd() error { + pwFileName := fmt.Sprintf("%s/.s3backer_passwd", os.Getenv("HOME")) + pwFile, err := os.OpenFile(pwFileName, os.O_RDWR|os.O_CREATE, 0600) + if err != nil { + return err + } + _, err = pwFile.WriteString(s3backer.accessKeyID + ":" + s3backer.secretAccessKey) + if err != nil { + return err + } + pwFile.Close() + return nil +} + +func formatFs(fsType string, device string) error { + diskMounter := &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: mount.NewOsExec()} + format, err := diskMounter.GetDiskFormat(device) + if err != nil { + return err + } + if format != "" { + glog.Infof("Disk %s is already formatted with format %s", device, format) + return nil + } + args := []string{ + device, + } + cmd := exec.Command("mkfs."+fsType, args...) + + out, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("Error formatting disk: %s", out) + } + glog.Info("Formatting fs with type %s, out: %s", fsType, out) + return nil +} diff --git a/pkg/s3/mounter_s3fs.go b/pkg/s3/mounter_s3fs.go index 325e91d..318d564 100644 --- a/pkg/s3/mounter_s3fs.go +++ b/pkg/s3/mounter_s3fs.go @@ -3,9 +3,6 @@ package s3 import ( "fmt" "os" - "os/exec" - - "k8s.io/kubernetes/pkg/util/mount" ) // Implements Mounter @@ -16,6 +13,10 @@ type s3fsMounter struct { pwFileContent string } +const ( + s3fsCmd = "s3fs" +) + func newS3fsMounter(bucket string, cfg *Config) (Mounter, error) { return &s3fsMounter{ bucket: bucket, @@ -43,16 +44,11 @@ func (s3fs *s3fsMounter) Mount(targetPath string) error { "-o", "allow_other", "-o", "mp_umask=000", } - cmd := exec.Command("s3fs", args...) - out, err := cmd.CombinedOutput() - if err != nil { - return fmt.Errorf("Error mounting using s3fs, output: %s", out) - } - return nil + return fuseMount(targetPath, s3fsCmd, args) } func (s3fs *s3fsMounter) Unmount(targetPath string) error { - return mount.New("").Unmount(targetPath) + return fuseUnmount(targetPath, s3fsCmd) } func writes3fsPass(pwFileContent string) error { diff --git a/pkg/s3/mounter_s3ql.go b/pkg/s3/mounter_s3ql.go index df4ad91..2d85971 100644 --- a/pkg/s3/mounter_s3ql.go +++ b/pkg/s3/mounter_s3ql.go @@ -3,7 +3,6 @@ package s3 import ( "bytes" "fmt" - "io" "net/url" "os" "os/exec" @@ -15,6 +14,7 @@ import ( // Implements Mounter type s3qlMounter struct { + bucket string url string bucketURL string login string @@ -41,6 +41,7 @@ func newS3qlMounter(bucket string, cfg *Config) (Mounter, error) { url.Scheme = "s3c" } s3ql := &s3qlMounter{ + bucket: bucket, url: url.String(), login: cfg.AccessKeyID, password: cfg.SecretAccessKey, @@ -67,7 +68,14 @@ func (s3ql *s3qlMounter) Format() error { p := fmt.Sprintf("%s\n%s\n", s3ql.passphrase, s3ql.passphrase) reader := bytes.NewReader([]byte(p)) - return s3qlCmd(s3qlCmdMkfs, append(args, s3ql.options...), reader) + cmd := exec.Command(s3qlCmdMkfs, append(args, s3ql.options...)...) + cmd.Stdin = reader + + out, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("Error running s3ql command: %s", out) + } + return nil } func (s3ql *s3qlMounter) Mount(targetPath string) error { @@ -76,24 +84,11 @@ func (s3ql *s3qlMounter) Mount(targetPath string) error { targetPath, "--allow-other", } - return s3qlCmd(s3qlCmdMount, append(args, s3ql.options...), nil) + return fuseMount(targetPath, s3qlCmdMount, append(args, s3ql.options...)) } func (s3ql *s3qlMounter) Unmount(targetPath string) error { - return s3qlCmd(s3qlCmdUnmount, []string{targetPath}, nil) -} - -func s3qlCmd(s3qlCmd string, args []string, stdin io.Reader) error { - cmd := exec.Command(s3qlCmd, args...) - if stdin != nil { - cmd.Stdin = stdin - } - - out, err := cmd.CombinedOutput() - if err != nil { - return fmt.Errorf("Error running s3ql command: %s", out) - } - return nil + return fuseUnmount(targetPath, s3qlCmdMount) } func (s3ql *s3qlMounter) writeConfig() error { diff --git a/pkg/s3/s3-driver_suite_test.go b/pkg/s3/s3-driver_suite_test.go index 47b3ce6..ad850e7 100644 --- a/pkg/s3/s3-driver_suite_test.go +++ b/pkg/s3/s3-driver_suite_test.go @@ -113,4 +113,35 @@ var _ = Describe("S3Driver", func() { }) }) + Context("s3backer", func() { + socket := "/tmp/csi-s3backer.sock" + csiEndpoint := "unix://" + socket + + cfg := &s3.Config{ + AccessKeyID: "FJDSJ", + SecretAccessKey: "DSG643HGDS", + Endpoint: "http://127.0.0.1:9000", + Mounter: "s3backer", + } + if err := os.Remove(socket); err != nil && !os.IsNotExist(err) { + Expect(err).NotTo(HaveOccurred()) + } + driver, err := s3.NewS3("test-node", csiEndpoint, cfg) + if err != nil { + log.Fatal(err) + } + go driver.Run() + + defer os.RemoveAll(mntDir) + + Describe("CSI sanity", func() { + sanityCfg := &sanity.Config{ + TargetPath: mntDir, + Address: csiEndpoint, + TestVolumeSize: 1, + } + sanity.GinkgoTest(sanityCfg) + }) + }) + }) diff --git a/pkg/s3/util.go b/pkg/s3/util.go new file mode 100644 index 0000000..c1b37fe --- /dev/null +++ b/pkg/s3/util.go @@ -0,0 +1,64 @@ +package s3 + +import ( + "fmt" + "io/ioutil" + "os" + "strings" + "syscall" + "time" + + "github.com/mitchellh/go-ps" + + "github.com/golang/glog" +) + +func waitForProcess(p *os.Process, backoff int) error { + if backoff == 20 { + return fmt.Errorf("Timeout waiting for PID %v to end", p.Pid) + } + if err := p.Signal(syscall.Signal(0)); err != nil { + glog.Warningf("Fuse process does not seem active or we are unprivileged: %s", err) + return nil + } + glog.Infof("Fuse process with PID %v still active, waiting...", p.Pid) + time.Sleep(time.Duration(backoff*100) * time.Millisecond) + return waitForProcess(p, backoff+1) +} + +func findFuseMountProcess(path string, name string) (*os.Process, error) { + processes, err := ps.Processes() + if err != nil { + return nil, err + } + for _, p := range processes { + if strings.Contains(p.Executable(), name) { + cmdLine, err := getCmdLine(p.Pid()) + if err != nil { + glog.Errorf("Unable to get cmdline of PID %v: %s", p.Pid(), err) + continue + } + if cmdLine == "" { + // ignore defunct processes + // TODO: debug why this happens in the first place + // seems to only happen on k8s, not on local docker + continue + } + if strings.Contains(cmdLine, path) { + glog.Infof("Found matching pid %v on path %s", p.Pid(), path) + return os.FindProcess(p.Pid()) + } + } + fmt.Println(p.Executable()) + } + return nil, nil +} + +func getCmdLine(pid int) (string, error) { + cmdLineFile := fmt.Sprintf("/proc/%v/cmdline", pid) + cmdLine, err := ioutil.ReadFile(cmdLineFile) + if err != nil { + return "", err + } + return string(cmdLine), nil +} diff --git a/test/Dockerfile.s3backer b/test/Dockerfile.s3backer new file mode 100644 index 0000000..dcbf292 --- /dev/null +++ b/test/Dockerfile.s3backer @@ -0,0 +1,28 @@ +FROM golang:stretch +LABEL maintainers="Cyrill Troxler " +LABEL description="s3 fuse csi plugin" + +RUN apt-get update && apt-get install -y \ + build-essential \ + autoconf \ + libcurl4-openssl-dev \ + libfuse-dev \ + libexpat1-dev \ + libssl-dev \ + zlib1g-dev \ + xfsprogs \ + psmisc \ + git + +RUN git clone https://github.com/archiecobbs/s3backer.git ./s3backer + +WORKDIR "./s3backer" + +RUN ["./autogen.sh"] +RUN ["./configure"] +RUN ["make"] +RUN ["make", "install"] + +RUN go get -u github.com/minio/minio && go install github.com/minio/minio/cmd + +CMD ["/go/src/github.com/ctrox/csi-s3-driver/test/test.sh"]