diff --git a/Gopkg.lock b/Gopkg.lock index d56885f..d11e6d0 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -158,6 +158,12 @@ packages = ["."] revision = "3864e76763d94a6df2f9960b16a20a33da9f9a66" +[[projects]] + branch = "master" + name = "github.com/mitchellh/go-ps" + packages = ["."] + revision = "4fdf99ab29366514c69ccccddab5dc58b8d84062" + [[projects]] name = "github.com/onsi/ginkgo" packages = [ @@ -372,6 +378,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "659f47734b56af7fba146039231841e82cc4c3c95dff2814ec3688e967790a50" + inputs-digest = "a655e5231fe7b5c962579c8f032338c53177a95a3160bef4628f0761f714f730" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 98f9a1b..eef4e5a 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -41,3 +41,7 @@ [[constraint]] name = "gopkg.in/ini.v1" version = "1.38.1" + +[[constraint]] + branch = "master" + name = "github.com/mitchellh/go-ps" diff --git a/Makefile b/Makefile index 1b55be7..24d68e4 100644 --- a/Makefile +++ b/Makefile @@ -24,7 +24,7 @@ build: if [ ! -d ./vendor ]; then dep ensure -vendor-only; fi CGO_ENABLED=0 GOOS=linux go build -a -ldflags '-extldflags "-static"' -o _output/s3driver ./cmd/s3driver test: - docker build -t $(TEST_IMAGE_TAG) -f test/Dockerfile . + docker build -t $(TEST_IMAGE_TAG) -f test/Dockerfile.s3backer . docker run --rm --privileged -v $(PWD):$(PROJECT_DIR):ro -v /dev:/dev $(TEST_IMAGE_TAG) container: build docker build -t $(IMAGE_TAG) -f cmd/s3driver/Dockerfile.s3ql . diff --git a/pkg/s3/mounter.go b/pkg/s3/mounter.go index aaaefd9..b05ae8f 100644 --- a/pkg/s3/mounter.go +++ b/pkg/s3/mounter.go @@ -1,6 +1,12 @@ package s3 -import "fmt" +import ( + "fmt" + "os/exec" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/util/mount" +) // Mounter interface which can be implemented // by the different mounter types @@ -11,9 +17,10 @@ type Mounter interface { } const ( - s3fsMounterType = "s3fs" - goofysMounterType = "goofys" - s3qlMounterType = "s3ql" + s3fsMounterType = "s3fs" + goofysMounterType = "goofys" + s3qlMounterType = "s3ql" + s3backerMounterType = "s3backer" ) // newMounter returns a new mounter depending on the mounterType parameter @@ -28,6 +35,38 @@ func newMounter(bucket string, cfg *Config) (Mounter, error) { case s3qlMounterType: return newS3qlMounter(bucket, cfg) + case s3backerMounterType: + return newS3backerMounter(bucket, cfg) + } return nil, fmt.Errorf("Error mounting bucket %s, invalid mounter specified: %s", bucket, cfg.Mounter) } + +func fuseMount(path string, command string, args []string) error { + cmd := exec.Command(command, args...) + + out, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("Error fuseMount command: %s\nargs: %s\noutput: %s", command, args, out) + } + + return nil +} + +func fuseUnmount(path string, command string) error { + if err := mount.New("").Unmount(path); err != nil { + return err + } + // as fuse quits immediately, we will try to wait until the process is done + process, err := findFuseMountProcess(path, command) + if err != nil { + glog.Errorf("Error getting PID of fuse mount: %s", err) + return nil + } + if process == nil { + glog.Warningf("Unable to find PID of fuse mount %s, it must have finished already", path) + return nil + } + glog.Infof("Found fuse pid %v of mount %s, checking if it still runs", process.Pid, path) + return waitForProcess(process, 1) +} diff --git a/pkg/s3/mounter_goofys.go b/pkg/s3/mounter_goofys.go index c3a0066..2573c3a 100644 --- a/pkg/s3/mounter_goofys.go +++ b/pkg/s3/mounter_goofys.go @@ -7,10 +7,12 @@ import ( "context" goofysApi "github.com/kahing/goofys/api" - "k8s.io/kubernetes/pkg/util/mount" ) -const defaultRegion = "us-east-1" +const ( + goofysCmd = "goofys" + defaultRegion = "us-east-1" +) // Implements Mounter type goofysMounter struct { @@ -64,5 +66,5 @@ func (goofys *goofysMounter) Mount(targetPath string) error { } func (goofys *goofysMounter) Unmount(targetPath string) error { - return mount.New("").Unmount(targetPath) + return fuseUnmount(targetPath, goofysCmd) } diff --git a/pkg/s3/mounter_s3backer.go b/pkg/s3/mounter_s3backer.go new file mode 100644 index 0000000..cc9318b --- /dev/null +++ b/pkg/s3/mounter_s3backer.go @@ -0,0 +1,153 @@ +package s3 + +import ( + "fmt" + "io/ioutil" + "os" + "os/exec" + "path" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/util/mount" +) + +// Implements Mounter +type s3backerMounter struct { + bucket string + url string + region string + accessKeyID string + secretAccessKey string + size int64 + initMountPath string +} + +const ( + s3backerCmd = "s3backer" + s3backerFsType = "xfs" + s3backerMountBase = "/mnt" + s3backerDevice = "file" + // blockSize to use in k + s3backerBlockSize = "128k" +) + +func newS3backerMounter(bucket string, cfg *Config) (Mounter, error) { + s3backer := &s3backerMounter{ + bucket: bucket, + url: cfg.Endpoint, + region: cfg.Region, + accessKeyID: cfg.AccessKeyID, + secretAccessKey: cfg.SecretAccessKey, + initMountPath: path.Join(s3backerMountBase, bucket), + size: 1024 * 1024 * 1024 * 10, + } + + return s3backer, s3backer.writePasswd() +} + +func (s3backer *s3backerMounter) String() string { + return s3backer.bucket +} + +func (s3backer *s3backerMounter) Format() error { + tmpDir, err := ioutil.TempDir("", "s3backer") + if err != nil { + return err + } + defer os.RemoveAll(tmpDir) + + if err := s3backer.mountInit(tmpDir); err != nil { + return err + } + defer fuseUnmount(tmpDir, s3backerCmd) + + return formatFs(s3backerFsType, path.Join(tmpDir, s3backerDevice)) +} + +func (s3backer *s3backerMounter) Mount(targetPath string) error { + if err := os.MkdirAll(s3backer.initMountPath, 0700); err != nil { + return err + } + // s3backer requires two mounts + // first mount will fuse mount the bucket to a single 'file' + err := s3backer.mountInit(s3backer.initMountPath) + if err != nil { + return err + } + device := path.Join(s3backer.initMountPath, s3backerDevice) + // second mount will mount the 'file' as a filesystem + err = mount.New("").Mount(device, targetPath, s3backerFsType, []string{}) + if err != nil { + // cleanup fuse mount + fuseUnmount(targetPath, s3backerCmd) + return err + } + return nil +} + +func (s3backer *s3backerMounter) Unmount(targetPath string) error { + // Unmount the filesystem first + if err := mount.New("").Unmount(targetPath); err != nil { + return err + } + // Unmount the s3backer fuse mount + err := fuseUnmount(s3backer.initMountPath, s3backerCmd) + if err != nil { + return err + } + return nil +} + +func (s3backer *s3backerMounter) mountInit(path string) error { + args := []string{ + // baseURL must end with / + fmt.Sprintf("--baseURL=%s/", s3backer.url), + fmt.Sprintf("--blockSize=%v", s3backerBlockSize), + fmt.Sprintf("--size=%v", s3backer.size), + "--listBlocks", + s3backer.bucket, + path, + } + if s3backer.region != "" { + args = append(args, fmt.Sprintf("--region=%s", s3backer.region)) + } + + return fuseMount(path, s3backerCmd, args) +} + +func (s3backer *s3backerMounter) writePasswd() error { + pwFileName := fmt.Sprintf("%s/.s3backer_passwd", os.Getenv("HOME")) + pwFile, err := os.OpenFile(pwFileName, os.O_RDWR|os.O_CREATE, 0600) + if err != nil { + return err + } + _, err = pwFile.WriteString(s3backer.accessKeyID + ":" + s3backer.secretAccessKey) + if err != nil { + return err + } + pwFile.Close() + return nil +} + +func formatFs(fsType string, device string) error { + diskMounter := &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: mount.NewOsExec()} + format, err := diskMounter.GetDiskFormat(device) + if err != nil { + return err + } + if format != "" { + glog.Infof("Disk %s is already formatted with format %s", device, format) + return nil + } + args := []string{ + device, + } + cmd := exec.Command("mkfs."+fsType, args...) + + out, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("Error formatting disk: %s", out) + } + glog.Info("Formatting fs with type %s, out: %s", fsType, out) + return nil +} diff --git a/pkg/s3/mounter_s3fs.go b/pkg/s3/mounter_s3fs.go index 325e91d..318d564 100644 --- a/pkg/s3/mounter_s3fs.go +++ b/pkg/s3/mounter_s3fs.go @@ -3,9 +3,6 @@ package s3 import ( "fmt" "os" - "os/exec" - - "k8s.io/kubernetes/pkg/util/mount" ) // Implements Mounter @@ -16,6 +13,10 @@ type s3fsMounter struct { pwFileContent string } +const ( + s3fsCmd = "s3fs" +) + func newS3fsMounter(bucket string, cfg *Config) (Mounter, error) { return &s3fsMounter{ bucket: bucket, @@ -43,16 +44,11 @@ func (s3fs *s3fsMounter) Mount(targetPath string) error { "-o", "allow_other", "-o", "mp_umask=000", } - cmd := exec.Command("s3fs", args...) - out, err := cmd.CombinedOutput() - if err != nil { - return fmt.Errorf("Error mounting using s3fs, output: %s", out) - } - return nil + return fuseMount(targetPath, s3fsCmd, args) } func (s3fs *s3fsMounter) Unmount(targetPath string) error { - return mount.New("").Unmount(targetPath) + return fuseUnmount(targetPath, s3fsCmd) } func writes3fsPass(pwFileContent string) error { diff --git a/pkg/s3/mounter_s3ql.go b/pkg/s3/mounter_s3ql.go index df4ad91..2d85971 100644 --- a/pkg/s3/mounter_s3ql.go +++ b/pkg/s3/mounter_s3ql.go @@ -3,7 +3,6 @@ package s3 import ( "bytes" "fmt" - "io" "net/url" "os" "os/exec" @@ -15,6 +14,7 @@ import ( // Implements Mounter type s3qlMounter struct { + bucket string url string bucketURL string login string @@ -41,6 +41,7 @@ func newS3qlMounter(bucket string, cfg *Config) (Mounter, error) { url.Scheme = "s3c" } s3ql := &s3qlMounter{ + bucket: bucket, url: url.String(), login: cfg.AccessKeyID, password: cfg.SecretAccessKey, @@ -67,7 +68,14 @@ func (s3ql *s3qlMounter) Format() error { p := fmt.Sprintf("%s\n%s\n", s3ql.passphrase, s3ql.passphrase) reader := bytes.NewReader([]byte(p)) - return s3qlCmd(s3qlCmdMkfs, append(args, s3ql.options...), reader) + cmd := exec.Command(s3qlCmdMkfs, append(args, s3ql.options...)...) + cmd.Stdin = reader + + out, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("Error running s3ql command: %s", out) + } + return nil } func (s3ql *s3qlMounter) Mount(targetPath string) error { @@ -76,24 +84,11 @@ func (s3ql *s3qlMounter) Mount(targetPath string) error { targetPath, "--allow-other", } - return s3qlCmd(s3qlCmdMount, append(args, s3ql.options...), nil) + return fuseMount(targetPath, s3qlCmdMount, append(args, s3ql.options...)) } func (s3ql *s3qlMounter) Unmount(targetPath string) error { - return s3qlCmd(s3qlCmdUnmount, []string{targetPath}, nil) -} - -func s3qlCmd(s3qlCmd string, args []string, stdin io.Reader) error { - cmd := exec.Command(s3qlCmd, args...) - if stdin != nil { - cmd.Stdin = stdin - } - - out, err := cmd.CombinedOutput() - if err != nil { - return fmt.Errorf("Error running s3ql command: %s", out) - } - return nil + return fuseUnmount(targetPath, s3qlCmdMount) } func (s3ql *s3qlMounter) writeConfig() error { diff --git a/pkg/s3/s3-driver_suite_test.go b/pkg/s3/s3-driver_suite_test.go index 47b3ce6..ad850e7 100644 --- a/pkg/s3/s3-driver_suite_test.go +++ b/pkg/s3/s3-driver_suite_test.go @@ -113,4 +113,35 @@ var _ = Describe("S3Driver", func() { }) }) + Context("s3backer", func() { + socket := "/tmp/csi-s3backer.sock" + csiEndpoint := "unix://" + socket + + cfg := &s3.Config{ + AccessKeyID: "FJDSJ", + SecretAccessKey: "DSG643HGDS", + Endpoint: "http://127.0.0.1:9000", + Mounter: "s3backer", + } + if err := os.Remove(socket); err != nil && !os.IsNotExist(err) { + Expect(err).NotTo(HaveOccurred()) + } + driver, err := s3.NewS3("test-node", csiEndpoint, cfg) + if err != nil { + log.Fatal(err) + } + go driver.Run() + + defer os.RemoveAll(mntDir) + + Describe("CSI sanity", func() { + sanityCfg := &sanity.Config{ + TargetPath: mntDir, + Address: csiEndpoint, + TestVolumeSize: 1, + } + sanity.GinkgoTest(sanityCfg) + }) + }) + }) diff --git a/pkg/s3/util.go b/pkg/s3/util.go new file mode 100644 index 0000000..c1b37fe --- /dev/null +++ b/pkg/s3/util.go @@ -0,0 +1,64 @@ +package s3 + +import ( + "fmt" + "io/ioutil" + "os" + "strings" + "syscall" + "time" + + "github.com/mitchellh/go-ps" + + "github.com/golang/glog" +) + +func waitForProcess(p *os.Process, backoff int) error { + if backoff == 20 { + return fmt.Errorf("Timeout waiting for PID %v to end", p.Pid) + } + if err := p.Signal(syscall.Signal(0)); err != nil { + glog.Warningf("Fuse process does not seem active or we are unprivileged: %s", err) + return nil + } + glog.Infof("Fuse process with PID %v still active, waiting...", p.Pid) + time.Sleep(time.Duration(backoff*100) * time.Millisecond) + return waitForProcess(p, backoff+1) +} + +func findFuseMountProcess(path string, name string) (*os.Process, error) { + processes, err := ps.Processes() + if err != nil { + return nil, err + } + for _, p := range processes { + if strings.Contains(p.Executable(), name) { + cmdLine, err := getCmdLine(p.Pid()) + if err != nil { + glog.Errorf("Unable to get cmdline of PID %v: %s", p.Pid(), err) + continue + } + if cmdLine == "" { + // ignore defunct processes + // TODO: debug why this happens in the first place + // seems to only happen on k8s, not on local docker + continue + } + if strings.Contains(cmdLine, path) { + glog.Infof("Found matching pid %v on path %s", p.Pid(), path) + return os.FindProcess(p.Pid()) + } + } + fmt.Println(p.Executable()) + } + return nil, nil +} + +func getCmdLine(pid int) (string, error) { + cmdLineFile := fmt.Sprintf("/proc/%v/cmdline", pid) + cmdLine, err := ioutil.ReadFile(cmdLineFile) + if err != nil { + return "", err + } + return string(cmdLine), nil +} diff --git a/test/Dockerfile.s3backer b/test/Dockerfile.s3backer new file mode 100644 index 0000000..dcbf292 --- /dev/null +++ b/test/Dockerfile.s3backer @@ -0,0 +1,28 @@ +FROM golang:stretch +LABEL maintainers="Cyrill Troxler " +LABEL description="s3 fuse csi plugin" + +RUN apt-get update && apt-get install -y \ + build-essential \ + autoconf \ + libcurl4-openssl-dev \ + libfuse-dev \ + libexpat1-dev \ + libssl-dev \ + zlib1g-dev \ + xfsprogs \ + psmisc \ + git + +RUN git clone https://github.com/archiecobbs/s3backer.git ./s3backer + +WORKDIR "./s3backer" + +RUN ["./autogen.sh"] +RUN ["./configure"] +RUN ["make"] +RUN ["make", "install"] + +RUN go get -u github.com/minio/minio && go install github.com/minio/minio/cmd + +CMD ["/go/src/github.com/ctrox/csi-s3-driver/test/test.sh"]