From 82ab4b09835d44e6abce76fdde8fadfe57d5d5b7 Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Sun, 22 Jul 2018 22:08:48 +0200 Subject: [PATCH 01/10] Add experimental s3backer mounter This also adds some generic handling of stale umounts. Fuse returns immediately and does not indicate that the mounter has finished writing to the backend. The process finding is sort of hacky as I could not find a better way to get to the PID from a fuse mount. --- Gopkg.lock | 8 +- Gopkg.toml | 4 + Makefile | 2 +- pkg/s3/mounter.go | 47 +++++++++- pkg/s3/mounter_goofys.go | 8 +- pkg/s3/mounter_s3backer.go | 153 +++++++++++++++++++++++++++++++++ pkg/s3/mounter_s3fs.go | 16 ++-- pkg/s3/mounter_s3ql.go | 29 +++---- pkg/s3/s3-driver_suite_test.go | 31 +++++++ pkg/s3/util.go | 64 ++++++++++++++ test/Dockerfile.s3backer | 28 ++++++ 11 files changed, 354 insertions(+), 36 deletions(-) create mode 100644 pkg/s3/mounter_s3backer.go create mode 100644 pkg/s3/util.go create mode 100644 test/Dockerfile.s3backer diff --git a/Gopkg.lock b/Gopkg.lock index d56885f..d11e6d0 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -158,6 +158,12 @@ packages = ["."] revision = "3864e76763d94a6df2f9960b16a20a33da9f9a66" +[[projects]] + branch = "master" + name = "github.com/mitchellh/go-ps" + packages = ["."] + revision = "4fdf99ab29366514c69ccccddab5dc58b8d84062" + [[projects]] name = "github.com/onsi/ginkgo" packages = [ @@ -372,6 +378,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "659f47734b56af7fba146039231841e82cc4c3c95dff2814ec3688e967790a50" + inputs-digest = "a655e5231fe7b5c962579c8f032338c53177a95a3160bef4628f0761f714f730" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 98f9a1b..eef4e5a 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -41,3 +41,7 @@ [[constraint]] name = "gopkg.in/ini.v1" version = "1.38.1" + +[[constraint]] + branch = "master" + name = "github.com/mitchellh/go-ps" diff --git a/Makefile b/Makefile index 1b55be7..24d68e4 100644 --- a/Makefile +++ b/Makefile @@ -24,7 +24,7 @@ build: if [ ! -d ./vendor ]; then dep ensure -vendor-only; fi CGO_ENABLED=0 GOOS=linux go build -a -ldflags '-extldflags "-static"' -o _output/s3driver ./cmd/s3driver test: - docker build -t $(TEST_IMAGE_TAG) -f test/Dockerfile . + docker build -t $(TEST_IMAGE_TAG) -f test/Dockerfile.s3backer . docker run --rm --privileged -v $(PWD):$(PROJECT_DIR):ro -v /dev:/dev $(TEST_IMAGE_TAG) container: build docker build -t $(IMAGE_TAG) -f cmd/s3driver/Dockerfile.s3ql . diff --git a/pkg/s3/mounter.go b/pkg/s3/mounter.go index aaaefd9..b05ae8f 100644 --- a/pkg/s3/mounter.go +++ b/pkg/s3/mounter.go @@ -1,6 +1,12 @@ package s3 -import "fmt" +import ( + "fmt" + "os/exec" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/util/mount" +) // Mounter interface which can be implemented // by the different mounter types @@ -11,9 +17,10 @@ type Mounter interface { } const ( - s3fsMounterType = "s3fs" - goofysMounterType = "goofys" - s3qlMounterType = "s3ql" + s3fsMounterType = "s3fs" + goofysMounterType = "goofys" + s3qlMounterType = "s3ql" + s3backerMounterType = "s3backer" ) // newMounter returns a new mounter depending on the mounterType parameter @@ -28,6 +35,38 @@ func newMounter(bucket string, cfg *Config) (Mounter, error) { case s3qlMounterType: return newS3qlMounter(bucket, cfg) + case s3backerMounterType: + return newS3backerMounter(bucket, cfg) + } return nil, fmt.Errorf("Error mounting bucket %s, invalid mounter specified: %s", bucket, cfg.Mounter) } + +func fuseMount(path string, command string, args []string) error { + cmd := exec.Command(command, args...) + + out, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("Error fuseMount command: %s\nargs: %s\noutput: %s", command, args, out) + } + + return nil +} + +func fuseUnmount(path string, command string) error { + if err := mount.New("").Unmount(path); err != nil { + return err + } + // as fuse quits immediately, we will try to wait until the process is done + process, err := findFuseMountProcess(path, command) + if err != nil { + glog.Errorf("Error getting PID of fuse mount: %s", err) + return nil + } + if process == nil { + glog.Warningf("Unable to find PID of fuse mount %s, it must have finished already", path) + return nil + } + glog.Infof("Found fuse pid %v of mount %s, checking if it still runs", process.Pid, path) + return waitForProcess(process, 1) +} diff --git a/pkg/s3/mounter_goofys.go b/pkg/s3/mounter_goofys.go index c3a0066..2573c3a 100644 --- a/pkg/s3/mounter_goofys.go +++ b/pkg/s3/mounter_goofys.go @@ -7,10 +7,12 @@ import ( "context" goofysApi "github.com/kahing/goofys/api" - "k8s.io/kubernetes/pkg/util/mount" ) -const defaultRegion = "us-east-1" +const ( + goofysCmd = "goofys" + defaultRegion = "us-east-1" +) // Implements Mounter type goofysMounter struct { @@ -64,5 +66,5 @@ func (goofys *goofysMounter) Mount(targetPath string) error { } func (goofys *goofysMounter) Unmount(targetPath string) error { - return mount.New("").Unmount(targetPath) + return fuseUnmount(targetPath, goofysCmd) } diff --git a/pkg/s3/mounter_s3backer.go b/pkg/s3/mounter_s3backer.go new file mode 100644 index 0000000..cc9318b --- /dev/null +++ b/pkg/s3/mounter_s3backer.go @@ -0,0 +1,153 @@ +package s3 + +import ( + "fmt" + "io/ioutil" + "os" + "os/exec" + "path" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/util/mount" +) + +// Implements Mounter +type s3backerMounter struct { + bucket string + url string + region string + accessKeyID string + secretAccessKey string + size int64 + initMountPath string +} + +const ( + s3backerCmd = "s3backer" + s3backerFsType = "xfs" + s3backerMountBase = "/mnt" + s3backerDevice = "file" + // blockSize to use in k + s3backerBlockSize = "128k" +) + +func newS3backerMounter(bucket string, cfg *Config) (Mounter, error) { + s3backer := &s3backerMounter{ + bucket: bucket, + url: cfg.Endpoint, + region: cfg.Region, + accessKeyID: cfg.AccessKeyID, + secretAccessKey: cfg.SecretAccessKey, + initMountPath: path.Join(s3backerMountBase, bucket), + size: 1024 * 1024 * 1024 * 10, + } + + return s3backer, s3backer.writePasswd() +} + +func (s3backer *s3backerMounter) String() string { + return s3backer.bucket +} + +func (s3backer *s3backerMounter) Format() error { + tmpDir, err := ioutil.TempDir("", "s3backer") + if err != nil { + return err + } + defer os.RemoveAll(tmpDir) + + if err := s3backer.mountInit(tmpDir); err != nil { + return err + } + defer fuseUnmount(tmpDir, s3backerCmd) + + return formatFs(s3backerFsType, path.Join(tmpDir, s3backerDevice)) +} + +func (s3backer *s3backerMounter) Mount(targetPath string) error { + if err := os.MkdirAll(s3backer.initMountPath, 0700); err != nil { + return err + } + // s3backer requires two mounts + // first mount will fuse mount the bucket to a single 'file' + err := s3backer.mountInit(s3backer.initMountPath) + if err != nil { + return err + } + device := path.Join(s3backer.initMountPath, s3backerDevice) + // second mount will mount the 'file' as a filesystem + err = mount.New("").Mount(device, targetPath, s3backerFsType, []string{}) + if err != nil { + // cleanup fuse mount + fuseUnmount(targetPath, s3backerCmd) + return err + } + return nil +} + +func (s3backer *s3backerMounter) Unmount(targetPath string) error { + // Unmount the filesystem first + if err := mount.New("").Unmount(targetPath); err != nil { + return err + } + // Unmount the s3backer fuse mount + err := fuseUnmount(s3backer.initMountPath, s3backerCmd) + if err != nil { + return err + } + return nil +} + +func (s3backer *s3backerMounter) mountInit(path string) error { + args := []string{ + // baseURL must end with / + fmt.Sprintf("--baseURL=%s/", s3backer.url), + fmt.Sprintf("--blockSize=%v", s3backerBlockSize), + fmt.Sprintf("--size=%v", s3backer.size), + "--listBlocks", + s3backer.bucket, + path, + } + if s3backer.region != "" { + args = append(args, fmt.Sprintf("--region=%s", s3backer.region)) + } + + return fuseMount(path, s3backerCmd, args) +} + +func (s3backer *s3backerMounter) writePasswd() error { + pwFileName := fmt.Sprintf("%s/.s3backer_passwd", os.Getenv("HOME")) + pwFile, err := os.OpenFile(pwFileName, os.O_RDWR|os.O_CREATE, 0600) + if err != nil { + return err + } + _, err = pwFile.WriteString(s3backer.accessKeyID + ":" + s3backer.secretAccessKey) + if err != nil { + return err + } + pwFile.Close() + return nil +} + +func formatFs(fsType string, device string) error { + diskMounter := &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: mount.NewOsExec()} + format, err := diskMounter.GetDiskFormat(device) + if err != nil { + return err + } + if format != "" { + glog.Infof("Disk %s is already formatted with format %s", device, format) + return nil + } + args := []string{ + device, + } + cmd := exec.Command("mkfs."+fsType, args...) + + out, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("Error formatting disk: %s", out) + } + glog.Info("Formatting fs with type %s, out: %s", fsType, out) + return nil +} diff --git a/pkg/s3/mounter_s3fs.go b/pkg/s3/mounter_s3fs.go index 325e91d..318d564 100644 --- a/pkg/s3/mounter_s3fs.go +++ b/pkg/s3/mounter_s3fs.go @@ -3,9 +3,6 @@ package s3 import ( "fmt" "os" - "os/exec" - - "k8s.io/kubernetes/pkg/util/mount" ) // Implements Mounter @@ -16,6 +13,10 @@ type s3fsMounter struct { pwFileContent string } +const ( + s3fsCmd = "s3fs" +) + func newS3fsMounter(bucket string, cfg *Config) (Mounter, error) { return &s3fsMounter{ bucket: bucket, @@ -43,16 +44,11 @@ func (s3fs *s3fsMounter) Mount(targetPath string) error { "-o", "allow_other", "-o", "mp_umask=000", } - cmd := exec.Command("s3fs", args...) - out, err := cmd.CombinedOutput() - if err != nil { - return fmt.Errorf("Error mounting using s3fs, output: %s", out) - } - return nil + return fuseMount(targetPath, s3fsCmd, args) } func (s3fs *s3fsMounter) Unmount(targetPath string) error { - return mount.New("").Unmount(targetPath) + return fuseUnmount(targetPath, s3fsCmd) } func writes3fsPass(pwFileContent string) error { diff --git a/pkg/s3/mounter_s3ql.go b/pkg/s3/mounter_s3ql.go index df4ad91..2d85971 100644 --- a/pkg/s3/mounter_s3ql.go +++ b/pkg/s3/mounter_s3ql.go @@ -3,7 +3,6 @@ package s3 import ( "bytes" "fmt" - "io" "net/url" "os" "os/exec" @@ -15,6 +14,7 @@ import ( // Implements Mounter type s3qlMounter struct { + bucket string url string bucketURL string login string @@ -41,6 +41,7 @@ func newS3qlMounter(bucket string, cfg *Config) (Mounter, error) { url.Scheme = "s3c" } s3ql := &s3qlMounter{ + bucket: bucket, url: url.String(), login: cfg.AccessKeyID, password: cfg.SecretAccessKey, @@ -67,7 +68,14 @@ func (s3ql *s3qlMounter) Format() error { p := fmt.Sprintf("%s\n%s\n", s3ql.passphrase, s3ql.passphrase) reader := bytes.NewReader([]byte(p)) - return s3qlCmd(s3qlCmdMkfs, append(args, s3ql.options...), reader) + cmd := exec.Command(s3qlCmdMkfs, append(args, s3ql.options...)...) + cmd.Stdin = reader + + out, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("Error running s3ql command: %s", out) + } + return nil } func (s3ql *s3qlMounter) Mount(targetPath string) error { @@ -76,24 +84,11 @@ func (s3ql *s3qlMounter) Mount(targetPath string) error { targetPath, "--allow-other", } - return s3qlCmd(s3qlCmdMount, append(args, s3ql.options...), nil) + return fuseMount(targetPath, s3qlCmdMount, append(args, s3ql.options...)) } func (s3ql *s3qlMounter) Unmount(targetPath string) error { - return s3qlCmd(s3qlCmdUnmount, []string{targetPath}, nil) -} - -func s3qlCmd(s3qlCmd string, args []string, stdin io.Reader) error { - cmd := exec.Command(s3qlCmd, args...) - if stdin != nil { - cmd.Stdin = stdin - } - - out, err := cmd.CombinedOutput() - if err != nil { - return fmt.Errorf("Error running s3ql command: %s", out) - } - return nil + return fuseUnmount(targetPath, s3qlCmdMount) } func (s3ql *s3qlMounter) writeConfig() error { diff --git a/pkg/s3/s3-driver_suite_test.go b/pkg/s3/s3-driver_suite_test.go index 47b3ce6..ad850e7 100644 --- a/pkg/s3/s3-driver_suite_test.go +++ b/pkg/s3/s3-driver_suite_test.go @@ -113,4 +113,35 @@ var _ = Describe("S3Driver", func() { }) }) + Context("s3backer", func() { + socket := "/tmp/csi-s3backer.sock" + csiEndpoint := "unix://" + socket + + cfg := &s3.Config{ + AccessKeyID: "FJDSJ", + SecretAccessKey: "DSG643HGDS", + Endpoint: "http://127.0.0.1:9000", + Mounter: "s3backer", + } + if err := os.Remove(socket); err != nil && !os.IsNotExist(err) { + Expect(err).NotTo(HaveOccurred()) + } + driver, err := s3.NewS3("test-node", csiEndpoint, cfg) + if err != nil { + log.Fatal(err) + } + go driver.Run() + + defer os.RemoveAll(mntDir) + + Describe("CSI sanity", func() { + sanityCfg := &sanity.Config{ + TargetPath: mntDir, + Address: csiEndpoint, + TestVolumeSize: 1, + } + sanity.GinkgoTest(sanityCfg) + }) + }) + }) diff --git a/pkg/s3/util.go b/pkg/s3/util.go new file mode 100644 index 0000000..c1b37fe --- /dev/null +++ b/pkg/s3/util.go @@ -0,0 +1,64 @@ +package s3 + +import ( + "fmt" + "io/ioutil" + "os" + "strings" + "syscall" + "time" + + "github.com/mitchellh/go-ps" + + "github.com/golang/glog" +) + +func waitForProcess(p *os.Process, backoff int) error { + if backoff == 20 { + return fmt.Errorf("Timeout waiting for PID %v to end", p.Pid) + } + if err := p.Signal(syscall.Signal(0)); err != nil { + glog.Warningf("Fuse process does not seem active or we are unprivileged: %s", err) + return nil + } + glog.Infof("Fuse process with PID %v still active, waiting...", p.Pid) + time.Sleep(time.Duration(backoff*100) * time.Millisecond) + return waitForProcess(p, backoff+1) +} + +func findFuseMountProcess(path string, name string) (*os.Process, error) { + processes, err := ps.Processes() + if err != nil { + return nil, err + } + for _, p := range processes { + if strings.Contains(p.Executable(), name) { + cmdLine, err := getCmdLine(p.Pid()) + if err != nil { + glog.Errorf("Unable to get cmdline of PID %v: %s", p.Pid(), err) + continue + } + if cmdLine == "" { + // ignore defunct processes + // TODO: debug why this happens in the first place + // seems to only happen on k8s, not on local docker + continue + } + if strings.Contains(cmdLine, path) { + glog.Infof("Found matching pid %v on path %s", p.Pid(), path) + return os.FindProcess(p.Pid()) + } + } + fmt.Println(p.Executable()) + } + return nil, nil +} + +func getCmdLine(pid int) (string, error) { + cmdLineFile := fmt.Sprintf("/proc/%v/cmdline", pid) + cmdLine, err := ioutil.ReadFile(cmdLineFile) + if err != nil { + return "", err + } + return string(cmdLine), nil +} diff --git a/test/Dockerfile.s3backer b/test/Dockerfile.s3backer new file mode 100644 index 0000000..dcbf292 --- /dev/null +++ b/test/Dockerfile.s3backer @@ -0,0 +1,28 @@ +FROM golang:stretch +LABEL maintainers="Cyrill Troxler " +LABEL description="s3 fuse csi plugin" + +RUN apt-get update && apt-get install -y \ + build-essential \ + autoconf \ + libcurl4-openssl-dev \ + libfuse-dev \ + libexpat1-dev \ + libssl-dev \ + zlib1g-dev \ + xfsprogs \ + psmisc \ + git + +RUN git clone https://github.com/archiecobbs/s3backer.git ./s3backer + +WORKDIR "./s3backer" + +RUN ["./autogen.sh"] +RUN ["./configure"] +RUN ["make"] +RUN ["make", "install"] + +RUN go get -u github.com/minio/minio && go install github.com/minio/minio/cmd + +CMD ["/go/src/github.com/ctrox/csi-s3-driver/test/test.sh"] From a468d955bca1f28f241555d2c631661982e699b5 Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Mon, 23 Jul 2018 20:26:42 +0200 Subject: [PATCH 02/10] Check for defunct process before waiting for it --- pkg/s3/util.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/pkg/s3/util.go b/pkg/s3/util.go index c1b37fe..87f1c96 100644 --- a/pkg/s3/util.go +++ b/pkg/s3/util.go @@ -17,6 +17,18 @@ func waitForProcess(p *os.Process, backoff int) error { if backoff == 20 { return fmt.Errorf("Timeout waiting for PID %v to end", p.Pid) } + cmdLine, err := getCmdLine(p.Pid) + if err != nil { + glog.Warningf("Error checking cmdline of PID %v, assuming it is dead: %s", p.Pid, err) + return nil + } + if cmdLine == "" { + // ignore defunct processes + // TODO: debug why this happens in the first place + // seems to only happen on k8s, not on local docker + glog.Warning("Fuse process seems dead, returning") + return nil + } if err := p.Signal(syscall.Signal(0)); err != nil { glog.Warningf("Fuse process does not seem active or we are unprivileged: %s", err) return nil @@ -38,12 +50,6 @@ func findFuseMountProcess(path string, name string) (*os.Process, error) { glog.Errorf("Unable to get cmdline of PID %v: %s", p.Pid(), err) continue } - if cmdLine == "" { - // ignore defunct processes - // TODO: debug why this happens in the first place - // seems to only happen on k8s, not on local docker - continue - } if strings.Contains(cmdLine, path) { glog.Infof("Found matching pid %v on path %s", p.Pid(), path) return os.FindProcess(p.Pid()) From f8bd74afb9d32fa42f5a2bf6e09384fc2ab48b29 Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Mon, 23 Jul 2018 20:55:53 +0200 Subject: [PATCH 03/10] Fix log output --- pkg/s3/mounter_s3backer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/s3/mounter_s3backer.go b/pkg/s3/mounter_s3backer.go index cc9318b..f5ee1c0 100644 --- a/pkg/s3/mounter_s3backer.go +++ b/pkg/s3/mounter_s3backer.go @@ -148,6 +148,6 @@ func formatFs(fsType string, device string) error { if err != nil { return fmt.Errorf("Error formatting disk: %s", out) } - glog.Info("Formatting fs with type %s, out: %s", fsType, out) + glog.Infof("Formatting fs with type %s", fsType) return nil } From 1fe218a568aac1b1be317a796a4d3fa93217570d Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Thu, 26 Jul 2018 20:13:40 +0200 Subject: [PATCH 04/10] Format using nodeserver, not controller --- pkg/s3/controllerserver.go | 8 -------- pkg/s3/nodeserver.go | 3 +++ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/pkg/s3/controllerserver.go b/pkg/s3/controllerserver.go index f186bf1..98da7ca 100644 --- a/pkg/s3/controllerserver.go +++ b/pkg/s3/controllerserver.go @@ -61,14 +61,6 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } } - mounter, err := newMounter(volumeID, cs.s3.cfg) - if err != nil { - return nil, err - } - if err := mounter.Format(); err != nil { - return nil, err - } - glog.V(4).Infof("create volume %s", volumeID) s3Vol := s3Volume{} s3Vol.VolName = req.GetName() diff --git a/pkg/s3/nodeserver.go b/pkg/s3/nodeserver.go index c6dad2e..6e5fae8 100644 --- a/pkg/s3/nodeserver.go +++ b/pkg/s3/nodeserver.go @@ -82,6 +82,9 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis if err != nil { return nil, err } + if err := mounter.Format(); err != nil { + return nil, err + } if err := mounter.Mount(targetPath); err != nil { return nil, err } From 0010066fe398d644c125af53c4657163c141f98a Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Thu, 26 Jul 2018 22:43:51 +0200 Subject: [PATCH 05/10] Implement stage/unstage This helps the reliability of s3backer as the fuse mount is done on NodeStageVolume and only once per volume per node. --- pkg/s3/mounter.go | 7 +- pkg/s3/mounter_goofys.go | 10 ++- pkg/s3/mounter_s3backer.go | 63 +++++++----------- pkg/s3/mounter_s3fs.go | 16 +++-- pkg/s3/mounter_s3ql.go | 24 ++++--- pkg/s3/nodeserver.go | 116 ++++++++++++++++++++++++++------- pkg/s3/s3-driver_suite_test.go | 15 ++--- pkg/s3/util.go | 1 - 8 files changed, 157 insertions(+), 95 deletions(-) diff --git a/pkg/s3/mounter.go b/pkg/s3/mounter.go index b05ae8f..0d8426a 100644 --- a/pkg/s3/mounter.go +++ b/pkg/s3/mounter.go @@ -11,9 +11,10 @@ import ( // Mounter interface which can be implemented // by the different mounter types type Mounter interface { - Format() error - Mount(targetPath string) error - Unmount(targetPath string) error + Stage(stagePath string) error + Unstage(stagePath string) error + Mount(source string, target string) error + Unmount(target string) error } const ( diff --git a/pkg/s3/mounter_goofys.go b/pkg/s3/mounter_goofys.go index 2573c3a..31a37c4 100644 --- a/pkg/s3/mounter_goofys.go +++ b/pkg/s3/mounter_goofys.go @@ -38,13 +38,17 @@ func newGoofysMounter(bucket string, cfg *Config) (Mounter, error) { }, nil } -func (goofys *goofysMounter) Format() error { +func (goofys *goofysMounter) Stage(stageTarget string) error { return nil } -func (goofys *goofysMounter) Mount(targetPath string) error { +func (goofys *goofysMounter) Unstage(stageTarget string) error { + return nil +} + +func (goofys *goofysMounter) Mount(source string, target string) error { goofysCfg := &goofysApi.Config{ - MountPoint: targetPath, + MountPoint: target, Endpoint: goofys.endpoint, Region: goofys.region, DirMode: 0755, diff --git a/pkg/s3/mounter_s3backer.go b/pkg/s3/mounter_s3backer.go index f5ee1c0..bf6dbd8 100644 --- a/pkg/s3/mounter_s3backer.go +++ b/pkg/s3/mounter_s3backer.go @@ -2,7 +2,6 @@ package s3 import ( "fmt" - "io/ioutil" "os" "os/exec" "path" @@ -19,14 +18,12 @@ type s3backerMounter struct { accessKeyID string secretAccessKey string size int64 - initMountPath string } const ( - s3backerCmd = "s3backer" - s3backerFsType = "xfs" - s3backerMountBase = "/mnt" - s3backerDevice = "file" + s3backerCmd = "s3backer" + s3backerFsType = "xfs" + s3backerDevice = "file" // blockSize to use in k s3backerBlockSize = "128k" ) @@ -38,7 +35,6 @@ func newS3backerMounter(bucket string, cfg *Config) (Mounter, error) { region: cfg.Region, accessKeyID: cfg.AccessKeyID, secretAccessKey: cfg.SecretAccessKey, - initMountPath: path.Join(s3backerMountBase, bucket), size: 1024 * 1024 * 1024 * 10, } @@ -49,37 +45,32 @@ func (s3backer *s3backerMounter) String() string { return s3backer.bucket } -func (s3backer *s3backerMounter) Format() error { - tmpDir, err := ioutil.TempDir("", "s3backer") - if err != nil { - return err - } - defer os.RemoveAll(tmpDir) - - if err := s3backer.mountInit(tmpDir); err != nil { - return err - } - defer fuseUnmount(tmpDir, s3backerCmd) - - return formatFs(s3backerFsType, path.Join(tmpDir, s3backerDevice)) -} - -func (s3backer *s3backerMounter) Mount(targetPath string) error { - if err := os.MkdirAll(s3backer.initMountPath, 0700); err != nil { - return err - } +func (s3backer *s3backerMounter) Stage(stageTarget string) error { // s3backer requires two mounts // first mount will fuse mount the bucket to a single 'file' - err := s3backer.mountInit(s3backer.initMountPath) - if err != nil { + if err := s3backer.mountInit(stageTarget); err != nil { return err } - device := path.Join(s3backer.initMountPath, s3backerDevice) + // ensure 'file' device is formatted + err := formatFs(s3backerFsType, path.Join(stageTarget, s3backerDevice)) + if err != nil { + fuseUnmount(stageTarget, s3backerCmd) + } + return err +} + +func (s3backer *s3backerMounter) Unstage(stageTarget string) error { + // Unmount the s3backer fuse mount + return fuseUnmount(stageTarget, s3backerCmd) +} + +func (s3backer *s3backerMounter) Mount(source string, target string) error { + device := path.Join(source, s3backerDevice) // second mount will mount the 'file' as a filesystem - err = mount.New("").Mount(device, targetPath, s3backerFsType, []string{}) + err := mount.New("").Mount(device, target, s3backerFsType, []string{}) if err != nil { // cleanup fuse mount - fuseUnmount(targetPath, s3backerCmd) + fuseUnmount(target, s3backerCmd) return err } return nil @@ -87,15 +78,7 @@ func (s3backer *s3backerMounter) Mount(targetPath string) error { func (s3backer *s3backerMounter) Unmount(targetPath string) error { // Unmount the filesystem first - if err := mount.New("").Unmount(targetPath); err != nil { - return err - } - // Unmount the s3backer fuse mount - err := fuseUnmount(s3backer.initMountPath, s3backerCmd) - if err != nil { - return err - } - return nil + return mount.New("").Unmount(targetPath) } func (s3backer *s3backerMounter) mountInit(path string) error { diff --git a/pkg/s3/mounter_s3fs.go b/pkg/s3/mounter_s3fs.go index 318d564..2b90aa3 100644 --- a/pkg/s3/mounter_s3fs.go +++ b/pkg/s3/mounter_s3fs.go @@ -26,17 +26,21 @@ func newS3fsMounter(bucket string, cfg *Config) (Mounter, error) { }, nil } -func (s3fs *s3fsMounter) Format() error { +func (s3fs *s3fsMounter) Stage(stageTarget string) error { return nil } -func (s3fs *s3fsMounter) Mount(targetPath string) error { +func (s3fs *s3fsMounter) Unstage(stageTarget string) error { + return nil +} + +func (s3fs *s3fsMounter) Mount(source string, target string) error { if err := writes3fsPass(s3fs.pwFileContent); err != nil { return err } args := []string{ fmt.Sprintf("%s", s3fs.bucket), - fmt.Sprintf("%s", targetPath), + fmt.Sprintf("%s", target), "-o", "sigv2", "-o", "use_path_request_style", "-o", fmt.Sprintf("url=%s", s3fs.url), @@ -44,11 +48,11 @@ func (s3fs *s3fsMounter) Mount(targetPath string) error { "-o", "allow_other", "-o", "mp_umask=000", } - return fuseMount(targetPath, s3fsCmd, args) + return fuseMount(target, s3fsCmd, args) } -func (s3fs *s3fsMounter) Unmount(targetPath string) error { - return fuseUnmount(targetPath, s3fsCmd) +func (s3fs *s3fsMounter) Unmount(target string) error { + return fuseUnmount(target, s3fsCmd) } func writes3fsPass(pwFileContent string) error { diff --git a/pkg/s3/mounter_s3ql.go b/pkg/s3/mounter_s3ql.go index 2d85971..3bd095b 100644 --- a/pkg/s3/mounter_s3ql.go +++ b/pkg/s3/mounter_s3ql.go @@ -59,7 +59,7 @@ func newS3qlMounter(bucket string, cfg *Config) (Mounter, error) { return s3ql, s3ql.writeConfig() } -func (s3ql *s3qlMounter) Format() error { +func (s3ql *s3qlMounter) Stage(stagePath string) error { // force creation to ignore existing data args := []string{ s3ql.bucketURL, @@ -78,17 +78,21 @@ func (s3ql *s3qlMounter) Format() error { return nil } -func (s3ql *s3qlMounter) Mount(targetPath string) error { - args := []string{ - s3ql.bucketURL, - targetPath, - "--allow-other", - } - return fuseMount(targetPath, s3qlCmdMount, append(args, s3ql.options...)) +func (s3ql *s3qlMounter) Unstage(stagePath string) error { + return nil } -func (s3ql *s3qlMounter) Unmount(targetPath string) error { - return fuseUnmount(targetPath, s3qlCmdMount) +func (s3ql *s3qlMounter) Mount(source string, target string) error { + args := []string{ + s3ql.bucketURL, + target, + "--allow-other", + } + return fuseMount(target, s3qlCmdMount, append(args, s3ql.options...)) +} + +func (s3ql *s3qlMounter) Unmount(target string) error { + return fuseUnmount(target, s3qlCmdMount) } func (s3ql *s3qlMounter) writeConfig() error { diff --git a/pkg/s3/nodeserver.go b/pkg/s3/nodeserver.go index 6e5fae8..f86d6d4 100644 --- a/pkg/s3/nodeserver.go +++ b/pkg/s3/nodeserver.go @@ -44,23 +44,19 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis if len(req.GetVolumeId()) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } + if len(req.GetStagingTargetPath()) == 0 { + return nil, status.Error(codes.InvalidArgument, "Staging Target path missing in request") + } if len(req.GetTargetPath()) == 0 { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } targetPath := req.GetTargetPath() - notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath) + stagingPath := req.GetStagingTargetPath() + notMnt, err := checkMount(targetPath) if err != nil { - if os.IsNotExist(err) { - if err = os.MkdirAll(targetPath, 0750); err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - notMnt = true - } else { - return nil, status.Error(codes.Internal, err.Error()) - } + return nil, status.Error(codes.Internal, err.Error()) } - if !notMnt { return &csi.NodePublishVolumeResponse{}, nil } @@ -82,10 +78,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis if err != nil { return nil, err } - if err := mounter.Format(); err != nil { - return nil, err - } - if err := mounter.Mount(targetPath); err != nil { + if err := mounter.Mount(stagingPath, targetPath); err != nil { return nil, err } @@ -116,16 +109,91 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu return &csi.NodeUnpublishVolumeResponse{}, nil } -func (ns *nodeServer) NodeStageVolume( - ctx context.Context, - req *csi.NodeStageVolumeRequest) ( - *csi.NodeStageVolumeResponse, error) { - return nil, status.Error(codes.Unimplemented, "") +func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { + + // Check arguments + if len(req.GetVolumeId()) == 0 { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + + if len(req.GetStagingTargetPath()) == 0 { + return nil, status.Error(codes.InvalidArgument, "Target path missing in request") + } + + if req.VolumeCapability == nil { + return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Volume Capability must be provided") + } + + stagingPath := req.GetStagingTargetPath() + notMnt, err := checkMount(stagingPath) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + if !notMnt { + return &csi.NodeStageVolumeResponse{}, nil + } + + mounter, err := newMounter(req.GetVolumeId(), ns.s3.cfg) + if err != nil { + return nil, err + } + if err := mounter.Stage(stagingPath); err != nil { + return nil, err + } + + return &csi.NodeStageVolumeResponse{}, nil } -func (ns *nodeServer) NodeUnstageVolume( - ctx context.Context, - req *csi.NodeUnstageVolumeRequest) ( - *csi.NodeUnstageVolumeResponse, error) { - return nil, status.Error(codes.Unimplemented, "") +func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { + + // Check arguments + if len(req.GetVolumeId()) == 0 { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + if len(req.GetStagingTargetPath()) == 0 { + return nil, status.Error(codes.InvalidArgument, "Target path missing in request") + } + + mounter, err := newMounter(req.GetVolumeId(), ns.s3.cfg) + if err != nil { + return nil, err + } + if err := mounter.Unstage(req.GetStagingTargetPath()); err != nil { + return nil, err + } + + return &csi.NodeUnstageVolumeResponse{}, nil +} + +// NodeGetCapabilities returns the supported capabilities of the node server +func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { + // currently there is a single NodeServer capability according to the spec + nscap := &csi.NodeServiceCapability{ + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, + }, + }, + } + + return &csi.NodeGetCapabilitiesResponse{ + Capabilities: []*csi.NodeServiceCapability{ + nscap, + }, + }, nil +} + +func checkMount(targetPath string) (bool, error) { + notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath) + if err != nil { + if os.IsNotExist(err) { + if err = os.MkdirAll(targetPath, 0750); err != nil { + return false, err + } + notMnt = true + } else { + return false, err + } + } + return notMnt, nil } diff --git a/pkg/s3/s3-driver_suite_test.go b/pkg/s3/s3-driver_suite_test.go index ad850e7..451132d 100644 --- a/pkg/s3/s3-driver_suite_test.go +++ b/pkg/s3/s3-driver_suite_test.go @@ -15,13 +15,12 @@ import ( const () var _ = Describe("S3Driver", func() { - mntDir, err := ioutil.TempDir("", "mnt") - if err != nil { - Expect(err).NotTo(HaveOccurred()) - } + mntDir, _ := ioutil.TempDir("", "mnt") + stagingDir, _ := ioutil.TempDir("", "staging") AfterSuite(func() { os.RemoveAll(mntDir) + os.RemoveAll(stagingDir) }) Context("goofys", func() { @@ -45,6 +44,7 @@ var _ = Describe("S3Driver", func() { Describe("CSI sanity", func() { sanityCfg := &sanity.Config{ TargetPath: mntDir, + StagingPath: stagingDir, Address: csiEndpoint, TestVolumeSize: 1, } @@ -70,11 +70,10 @@ var _ = Describe("S3Driver", func() { } go driver.Run() - defer os.RemoveAll(mntDir) - Describe("CSI sanity", func() { sanityCfg := &sanity.Config{ TargetPath: mntDir, + StagingPath: stagingDir, Address: csiEndpoint, TestVolumeSize: 1, } @@ -106,6 +105,7 @@ var _ = Describe("S3Driver", func() { Describe("CSI sanity", func() { sanityCfg := &sanity.Config{ TargetPath: mntDir, + StagingPath: stagingDir, Address: csiEndpoint, TestVolumeSize: 1, } @@ -132,11 +132,10 @@ var _ = Describe("S3Driver", func() { } go driver.Run() - defer os.RemoveAll(mntDir) - Describe("CSI sanity", func() { sanityCfg := &sanity.Config{ TargetPath: mntDir, + StagingPath: stagingDir, Address: csiEndpoint, TestVolumeSize: 1, } diff --git a/pkg/s3/util.go b/pkg/s3/util.go index 87f1c96..3022b8e 100644 --- a/pkg/s3/util.go +++ b/pkg/s3/util.go @@ -55,7 +55,6 @@ func findFuseMountProcess(path string, name string) (*os.Process, error) { return os.FindProcess(p.Pid()) } } - fmt.Println(p.Executable()) } return nil, nil } From 3f83f7fe36ff6d8a5e1a6bf432eb6762ba839473 Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Thu, 26 Jul 2018 22:46:30 +0200 Subject: [PATCH 06/10] Add s3fs to Dockerfile.s3backer --- test/Dockerfile.s3backer | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/Dockerfile.s3backer b/test/Dockerfile.s3backer index dcbf292..b4991e8 100644 --- a/test/Dockerfile.s3backer +++ b/test/Dockerfile.s3backer @@ -3,6 +3,7 @@ LABEL maintainers="Cyrill Troxler " LABEL description="s3 fuse csi plugin" RUN apt-get update && apt-get install -y \ + s3fs \ build-essential \ autoconf \ libcurl4-openssl-dev \ @@ -12,7 +13,8 @@ RUN apt-get update && apt-get install -y \ zlib1g-dev \ xfsprogs \ psmisc \ - git + git && \ + rm -rf /var/lib/apt/lists/* RUN git clone https://github.com/archiecobbs/s3backer.git ./s3backer From db0fbf77ddcd595fd1b5e2a5954e9b653fb3c9ad Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Thu, 26 Jul 2018 22:47:36 +0200 Subject: [PATCH 07/10] Update kubernetes deployment --- deploy/kubernetes/csi-s3-driver.yaml | 8 ++++---- deploy/kubernetes/provisioner.yaml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/deploy/kubernetes/csi-s3-driver.yaml b/deploy/kubernetes/csi-s3-driver.yaml index 3dba016..fcba4d4 100644 --- a/deploy/kubernetes/csi-s3-driver.yaml +++ b/deploy/kubernetes/csi-s3-driver.yaml @@ -130,8 +130,8 @@ spec: - name: pods-mount-dir mountPath: /var/lib/kubelet/pods mountPropagation: "Bidirectional" - - name: device-dir - mountPath: /dev + - name: fuse-device + mountPath: /dev/fuse volumes: - name: plugin-dir hostPath: @@ -141,6 +141,6 @@ spec: hostPath: path: /var/lib/kubelet/pods type: Directory - - name: device-dir + - name: fuse-device hostPath: - path: /dev + path: /dev/fuse diff --git a/deploy/kubernetes/provisioner.yaml b/deploy/kubernetes/provisioner.yaml index 99cfebc..04933d1 100644 --- a/deploy/kubernetes/provisioner.yaml +++ b/deploy/kubernetes/provisioner.yaml @@ -78,7 +78,7 @@ spec: volumeMounts: - name: socket-dir mountPath: /var/lib/kubelet/plugins/ch.ctrox.csi.s3-driver - - name: s3-csi-driver + - name: csi-s3-driver image: ctrox/csi-s3-driver:0.2.0 args: - "--endpoint=$(CSI_ENDPOINT)" From 1caf4699666fdb7109ffeb313c3d3a0a7ce9adb5 Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Fri, 27 Jul 2018 12:56:28 +0200 Subject: [PATCH 08/10] Implement a metadata file and correct sizing As the controller does not mount/create the fs we have to store the capacity somewhere so the node knows about it. --- pkg/s3/controllerserver.go | 34 ++++++++++++---- pkg/s3/mounter.go | 4 +- pkg/s3/mounter_goofys.go | 8 ++-- pkg/s3/mounter_s3backer.go | 19 +++++---- pkg/s3/mounter_s3fs.go | 8 ++-- pkg/s3/mounter_s3ql.go | 8 ++-- pkg/s3/nodeserver.go | 73 ++++++++++++++++++++++------------ pkg/s3/s3-client.go | 39 +++++++++++++++++- pkg/s3/s3-driver_suite_test.go | 30 ++++++-------- 9 files changed, 150 insertions(+), 73 deletions(-) diff --git a/pkg/s3/controllerserver.go b/pkg/s3/controllerserver.go index 98da7ca..cb5bf08 100644 --- a/pkg/s3/controllerserver.go +++ b/pkg/s3/controllerserver.go @@ -34,50 +34,71 @@ type controllerServer struct { } func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { + volumeID := req.GetName() + if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { glog.V(3).Infof("invalid create volume req: %v", req) return nil, err } // Check arguments - if len(req.GetName()) == 0 { + if len(volumeID) == 0 { return nil, status.Error(codes.InvalidArgument, "Name missing in request") } if req.GetVolumeCapabilities() == nil { return nil, status.Error(codes.InvalidArgument, "Volume Capabilities missing in request") } - volumeID := req.GetName() + capacityBytes := int64(req.GetCapacityRange().GetRequiredBytes()) + glog.V(5).Infof("Got a request to create bucket %s", volumeID) exists, err := cs.s3.client.bucketExists(volumeID) if err != nil { return nil, err } - if !exists { + if exists { + var b *bucket + b, err = cs.s3.client.getBucket(volumeID) + if err != nil { + return nil, err + } + // Check if volume capacity requested is bigger than the already existing capacity + if capacityBytes > b.CapacityBytes { + return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", volumeID)) + } + } else { if err = cs.s3.client.createBucket(volumeID); err != nil { glog.V(3).Infof("failed to create volume: %v", err) return nil, err } } + b := &bucket{ + Name: volumeID, + CapacityBytes: capacityBytes, + } + if err := cs.s3.client.setBucket(b); err != nil { + return nil, err + } glog.V(4).Infof("create volume %s", volumeID) s3Vol := s3Volume{} - s3Vol.VolName = req.GetName() + s3Vol.VolName = volumeID s3Vol.VolID = volumeID return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ Id: volumeID, - CapacityBytes: 1, + CapacityBytes: capacityBytes, Attributes: req.GetParameters(), }, }, nil } func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { + volumeID := req.GetVolumeId() // Check arguments - if len(req.GetVolumeId()) == 0 { + if len(volumeID) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } @@ -85,7 +106,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol glog.V(3).Infof("Invalid delete volume req: %v", req) return nil, err } - volumeID := req.VolumeId glog.V(4).Infof("Deleting volume %s", volumeID) exists, err := cs.s3.client.bucketExists(volumeID) diff --git a/pkg/s3/mounter.go b/pkg/s3/mounter.go index 0d8426a..e7e1325 100644 --- a/pkg/s3/mounter.go +++ b/pkg/s3/mounter.go @@ -25,7 +25,7 @@ const ( ) // newMounter returns a new mounter depending on the mounterType parameter -func newMounter(bucket string, cfg *Config) (Mounter, error) { +func newMounter(bucket *bucket, cfg *Config) (Mounter, error) { switch cfg.Mounter { case s3fsMounterType: return newS3fsMounter(bucket, cfg) @@ -40,7 +40,7 @@ func newMounter(bucket string, cfg *Config) (Mounter, error) { return newS3backerMounter(bucket, cfg) } - return nil, fmt.Errorf("Error mounting bucket %s, invalid mounter specified: %s", bucket, cfg.Mounter) + return nil, fmt.Errorf("Error mounting bucket %s, invalid mounter specified: %s", bucket.Name, cfg.Mounter) } func fuseMount(path string, command string, args []string) error { diff --git a/pkg/s3/mounter_goofys.go b/pkg/s3/mounter_goofys.go index 31a37c4..326596e 100644 --- a/pkg/s3/mounter_goofys.go +++ b/pkg/s3/mounter_goofys.go @@ -16,21 +16,21 @@ const ( // Implements Mounter type goofysMounter struct { - bucket string + bucket *bucket endpoint string region string accessKeyID string secretAccessKey string } -func newGoofysMounter(bucket string, cfg *Config) (Mounter, error) { +func newGoofysMounter(b *bucket, cfg *Config) (Mounter, error) { region := cfg.Region // if endpoint is set we need a default region if region == "" && cfg.Endpoint != "" { region = defaultRegion } return &goofysMounter{ - bucket: bucket, + bucket: b, endpoint: cfg.Endpoint, region: region, accessKeyID: cfg.AccessKeyID, @@ -61,7 +61,7 @@ func (goofys *goofysMounter) Mount(source string, target string) error { os.Setenv("AWS_ACCESS_KEY_ID", goofys.accessKeyID) os.Setenv("AWS_SECRET_ACCESS_KEY", goofys.secretAccessKey) - _, _, err := goofysApi.Mount(context.Background(), goofys.bucket, goofysCfg) + _, _, err := goofysApi.Mount(context.Background(), goofys.bucket.Name, goofysCfg) if err != nil { return fmt.Errorf("Error mounting via goofys: %s", err) diff --git a/pkg/s3/mounter_s3backer.go b/pkg/s3/mounter_s3backer.go index bf6dbd8..3d43b4e 100644 --- a/pkg/s3/mounter_s3backer.go +++ b/pkg/s3/mounter_s3backer.go @@ -12,12 +12,11 @@ import ( // Implements Mounter type s3backerMounter struct { - bucket string + bucket *bucket url string region string accessKeyID string secretAccessKey string - size int64 } const ( @@ -25,24 +24,28 @@ const ( s3backerFsType = "xfs" s3backerDevice = "file" // blockSize to use in k - s3backerBlockSize = "128k" + s3backerBlockSize = "128k" + s3backerDefaultSize = 1024 * 1024 * 1024 // 1GiB ) -func newS3backerMounter(bucket string, cfg *Config) (Mounter, error) { +func newS3backerMounter(bucket *bucket, cfg *Config) (Mounter, error) { + // s3backer cannot work with 0 size volumes + if bucket.CapacityBytes == 0 { + bucket.CapacityBytes = s3backerDefaultSize + } s3backer := &s3backerMounter{ bucket: bucket, url: cfg.Endpoint, region: cfg.Region, accessKeyID: cfg.AccessKeyID, secretAccessKey: cfg.SecretAccessKey, - size: 1024 * 1024 * 1024 * 10, } return s3backer, s3backer.writePasswd() } func (s3backer *s3backerMounter) String() string { - return s3backer.bucket + return s3backer.bucket.Name } func (s3backer *s3backerMounter) Stage(stageTarget string) error { @@ -86,9 +89,9 @@ func (s3backer *s3backerMounter) mountInit(path string) error { // baseURL must end with / fmt.Sprintf("--baseURL=%s/", s3backer.url), fmt.Sprintf("--blockSize=%v", s3backerBlockSize), - fmt.Sprintf("--size=%v", s3backer.size), + fmt.Sprintf("--size=%v", s3backer.bucket.CapacityBytes), "--listBlocks", - s3backer.bucket, + s3backer.bucket.Name, path, } if s3backer.region != "" { diff --git a/pkg/s3/mounter_s3fs.go b/pkg/s3/mounter_s3fs.go index 2b90aa3..e6d6f42 100644 --- a/pkg/s3/mounter_s3fs.go +++ b/pkg/s3/mounter_s3fs.go @@ -7,7 +7,7 @@ import ( // Implements Mounter type s3fsMounter struct { - bucket string + bucket *bucket url string region string pwFileContent string @@ -17,9 +17,9 @@ const ( s3fsCmd = "s3fs" ) -func newS3fsMounter(bucket string, cfg *Config) (Mounter, error) { +func newS3fsMounter(b *bucket, cfg *Config) (Mounter, error) { return &s3fsMounter{ - bucket: bucket, + bucket: b, url: cfg.Endpoint, region: cfg.Region, pwFileContent: cfg.AccessKeyID + ":" + cfg.SecretAccessKey, @@ -39,7 +39,7 @@ func (s3fs *s3fsMounter) Mount(source string, target string) error { return err } args := []string{ - fmt.Sprintf("%s", s3fs.bucket), + fmt.Sprintf("%s", s3fs.bucket.Name), fmt.Sprintf("%s", target), "-o", "sigv2", "-o", "use_path_request_style", diff --git a/pkg/s3/mounter_s3ql.go b/pkg/s3/mounter_s3ql.go index 3bd095b..80facaa 100644 --- a/pkg/s3/mounter_s3ql.go +++ b/pkg/s3/mounter_s3ql.go @@ -14,7 +14,7 @@ import ( // Implements Mounter type s3qlMounter struct { - bucket string + bucket *bucket url string bucketURL string login string @@ -31,7 +31,7 @@ const ( s3qlCmdUnmount = "umount.s3ql" ) -func newS3qlMounter(bucket string, cfg *Config) (Mounter, error) { +func newS3qlMounter(b *bucket, cfg *Config) (Mounter, error) { url, err := url.Parse(cfg.Endpoint) if err != nil { return nil, err @@ -41,7 +41,7 @@ func newS3qlMounter(bucket string, cfg *Config) (Mounter, error) { url.Scheme = "s3c" } s3ql := &s3qlMounter{ - bucket: bucket, + bucket: b, url: url.String(), login: cfg.AccessKeyID, password: cfg.SecretAccessKey, @@ -49,7 +49,7 @@ func newS3qlMounter(bucket string, cfg *Config) (Mounter, error) { ssl: ssl, } - url.Path = path.Join(url.Path, bucket) + url.Path = path.Join(url.Path, b.Name) s3ql.bucketURL = url.String() if !ssl { diff --git a/pkg/s3/nodeserver.go b/pkg/s3/nodeserver.go index f86d6d4..27313e3 100644 --- a/pkg/s3/nodeserver.go +++ b/pkg/s3/nodeserver.go @@ -36,23 +36,24 @@ type nodeServer struct { } func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { + volumeID := req.GetVolumeId() + targetPath := req.GetTargetPath() + stagingTargetPath := req.GetStagingTargetPath() // Check arguments if req.GetVolumeCapability() == nil { return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request") } - if len(req.GetVolumeId()) == 0 { + if len(volumeID) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } - if len(req.GetStagingTargetPath()) == 0 { + if len(stagingTargetPath) == 0 { return nil, status.Error(codes.InvalidArgument, "Staging Target path missing in request") } - if len(req.GetTargetPath()) == 0 { + if len(targetPath) == 0 { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } - targetPath := req.GetTargetPath() - stagingPath := req.GetStagingTargetPath() notMnt, err := checkMount(targetPath) if err != nil { return nil, status.Error(codes.Internal, err.Error()) @@ -66,57 +67,70 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis deviceID = req.GetPublishInfo()[deviceID] } + // TODO: Implement readOnly & mountFlags readOnly := req.GetReadonly() - volumeID := req.GetVolumeId() attrib := req.GetVolumeAttributes() mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags() glog.V(4).Infof("target %v\ndevice %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n", targetPath, deviceID, readOnly, volumeID, attrib, mountFlags) - mounter, err := newMounter(volumeID, ns.s3.cfg) + b, err := ns.s3.client.getBucket(volumeID) if err != nil { return nil, err } - if err := mounter.Mount(stagingPath, targetPath); err != nil { + + mounter, err := newMounter(b, ns.s3.cfg) + if err != nil { + return nil, err + } + if err := mounter.Mount(stagingTargetPath, targetPath); err != nil { return nil, err } - glog.V(4).Infof("s3: bucket %s successfuly mounted to %s", volumeID, targetPath) + glog.V(4).Infof("s3: bucket %s successfuly mounted to %s", b.Name, targetPath) return &csi.NodePublishVolumeResponse{}, nil } func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { + volumeID := req.GetVolumeId() + targetPath := req.GetTargetPath() // Check arguments - if len(req.GetVolumeId()) == 0 { + if len(volumeID) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } - if len(req.GetTargetPath()) == 0 { + if len(targetPath) == 0 { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } - mounter, err := newMounter(req.GetVolumeId(), ns.s3.cfg) + b, err := ns.s3.client.getBucket(volumeID) if err != nil { return nil, err } - if err := mounter.Unmount(req.GetTargetPath()); err != nil { + mounter, err := newMounter(b, ns.s3.cfg) + if err != nil { + return nil, err + } + if err := mounter.Unmount(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) } - glog.V(4).Infof("s3: bucket %s has been unmounted.", req.GetVolumeId()) + glog.V(4).Infof("s3: bucket %s has been unmounted.", volumeID) return &csi.NodeUnpublishVolumeResponse{}, nil } func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { + volumeID := req.GetVolumeId() + stagingTargetPath := req.GetStagingTargetPath() // Check arguments - if len(req.GetVolumeId()) == 0 { + if len(volumeID) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } - if len(req.GetStagingTargetPath()) == 0 { + if len(stagingTargetPath) == 0 { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } @@ -124,20 +138,23 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Volume Capability must be provided") } - stagingPath := req.GetStagingTargetPath() - notMnt, err := checkMount(stagingPath) + notMnt, err := checkMount(stagingTargetPath) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } if !notMnt { return &csi.NodeStageVolumeResponse{}, nil } - - mounter, err := newMounter(req.GetVolumeId(), ns.s3.cfg) + b, err := ns.s3.client.getBucket(volumeID) if err != nil { return nil, err } - if err := mounter.Stage(stagingPath); err != nil { + + mounter, err := newMounter(b, ns.s3.cfg) + if err != nil { + return nil, err + } + if err := mounter.Stage(stagingTargetPath); err != nil { return nil, err } @@ -145,20 +162,26 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol } func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { + volumeID := req.GetVolumeId() + stagingTargetPath := req.GetStagingTargetPath() // Check arguments - if len(req.GetVolumeId()) == 0 { + if len(volumeID) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } - if len(req.GetStagingTargetPath()) == 0 { + if len(stagingTargetPath) == 0 { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } - mounter, err := newMounter(req.GetVolumeId(), ns.s3.cfg) + b, err := ns.s3.client.getBucket(volumeID) if err != nil { return nil, err } - if err := mounter.Unstage(req.GetStagingTargetPath()); err != nil { + mounter, err := newMounter(b, ns.s3.cfg) + if err != nil { + return nil, err + } + if err := mounter.Unstage(stagingTargetPath); err != nil { return nil, err } diff --git a/pkg/s3/s3-client.go b/pkg/s3/s3-client.go index 76713a3..69dc676 100644 --- a/pkg/s3/s3-client.go +++ b/pkg/s3/s3-client.go @@ -1,7 +1,10 @@ package s3 import ( + "bytes" + "encoding/json" "fmt" + "io" "net/url" "github.com/golang/glog" @@ -9,7 +12,7 @@ import ( ) const ( - metadataName = ".meta" + metadataName = ".metadata.json" ) type s3Client struct { @@ -17,6 +20,11 @@ type s3Client struct { minio *minio.Client } +type bucket struct { + Name string + CapacityBytes int64 +} + func newS3Client(cfg *Config) (*s3Client, error) { var client = &s3Client{} @@ -91,3 +99,32 @@ func (client *s3Client) emptyBucket(bucketName string) error { return nil } + +func (client *s3Client) setBucket(bucket *bucket) error { + b := new(bytes.Buffer) + json.NewEncoder(b).Encode(bucket) + opts := minio.PutObjectOptions{ContentType: "application/json"} + _, err := client.minio.PutObject(bucket.Name, metadataName, b, int64(b.Len()), opts) + return err +} + +func (client *s3Client) getBucket(bucketName string) (*bucket, error) { + opts := minio.GetObjectOptions{} + obj, err := client.minio.GetObject(bucketName, metadataName, opts) + if err != nil { + return &bucket{}, err + } + objInfo, err := obj.Stat() + if err != nil { + return &bucket{}, err + } + b := make([]byte, objInfo.Size) + _, err = obj.Read(b) + + if err != nil && err != io.EOF { + return &bucket{}, err + } + var meta bucket + err = json.Unmarshal(b, &meta) + return &meta, err +} diff --git a/pkg/s3/s3-driver_suite_test.go b/pkg/s3/s3-driver_suite_test.go index 451132d..d19fd58 100644 --- a/pkg/s3/s3-driver_suite_test.go +++ b/pkg/s3/s3-driver_suite_test.go @@ -12,8 +12,6 @@ import ( "github.com/kubernetes-csi/csi-test/pkg/sanity" ) -const () - var _ = Describe("S3Driver", func() { mntDir, _ := ioutil.TempDir("", "mnt") stagingDir, _ := ioutil.TempDir("", "staging") @@ -43,10 +41,9 @@ var _ = Describe("S3Driver", func() { Describe("CSI sanity", func() { sanityCfg := &sanity.Config{ - TargetPath: mntDir, - StagingPath: stagingDir, - Address: csiEndpoint, - TestVolumeSize: 1, + TargetPath: mntDir, + StagingPath: stagingDir, + Address: csiEndpoint, } sanity.GinkgoTest(sanityCfg) }) @@ -72,10 +69,9 @@ var _ = Describe("S3Driver", func() { Describe("CSI sanity", func() { sanityCfg := &sanity.Config{ - TargetPath: mntDir, - StagingPath: stagingDir, - Address: csiEndpoint, - TestVolumeSize: 1, + TargetPath: mntDir, + StagingPath: stagingDir, + Address: csiEndpoint, } sanity.GinkgoTest(sanityCfg) }) @@ -104,10 +100,9 @@ var _ = Describe("S3Driver", func() { Describe("CSI sanity", func() { sanityCfg := &sanity.Config{ - TargetPath: mntDir, - StagingPath: stagingDir, - Address: csiEndpoint, - TestVolumeSize: 1, + TargetPath: mntDir, + StagingPath: stagingDir, + Address: csiEndpoint, } sanity.GinkgoTest(sanityCfg) }) @@ -134,10 +129,9 @@ var _ = Describe("S3Driver", func() { Describe("CSI sanity", func() { sanityCfg := &sanity.Config{ - TargetPath: mntDir, - StagingPath: stagingDir, - Address: csiEndpoint, - TestVolumeSize: 1, + TargetPath: mntDir, + StagingPath: stagingDir, + Address: csiEndpoint, } sanity.GinkgoTest(sanityCfg) }) From 59481b756a87ff11269026b110252269f66b8de0 Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Fri, 27 Jul 2018 13:03:12 +0200 Subject: [PATCH 09/10] Set s3backer Dockerfile as default --- Makefile | 4 ++-- cmd/s3driver/Dockerfile | 26 ++++++++++++++++++++++++-- test/Dockerfile | 34 ++++++++++++++++++++-------------- test/Dockerfile.s3backer | 30 ------------------------------ test/Dockerfile.s3ql | 24 ++++++++++++++++++++++++ 5 files changed, 70 insertions(+), 48 deletions(-) delete mode 100644 test/Dockerfile.s3backer create mode 100644 test/Dockerfile.s3ql diff --git a/Makefile b/Makefile index 24d68e4..520f1e5 100644 --- a/Makefile +++ b/Makefile @@ -24,10 +24,10 @@ build: if [ ! -d ./vendor ]; then dep ensure -vendor-only; fi CGO_ENABLED=0 GOOS=linux go build -a -ldflags '-extldflags "-static"' -o _output/s3driver ./cmd/s3driver test: - docker build -t $(TEST_IMAGE_TAG) -f test/Dockerfile.s3backer . + docker build -t $(TEST_IMAGE_TAG) -f test/Dockerfile . docker run --rm --privileged -v $(PWD):$(PROJECT_DIR):ro -v /dev:/dev $(TEST_IMAGE_TAG) container: build - docker build -t $(IMAGE_TAG) -f cmd/s3driver/Dockerfile.s3ql . + docker build -t $(IMAGE_TAG) -f cmd/s3driver/Dockerfile . push: container docker push $(IMAGE_TAG) clean: diff --git a/cmd/s3driver/Dockerfile b/cmd/s3driver/Dockerfile index 8007e75..6fe3f26 100644 --- a/cmd/s3driver/Dockerfile +++ b/cmd/s3driver/Dockerfile @@ -1,8 +1,30 @@ -FROM debian:stretch +FROM golang:stretch LABEL maintainers="Cyrill Troxler " LABEL description="s3 fuse csi plugin" -RUN apt-get update && apt-get install -y s3fs && rm -rf /var/lib/apt/lists/* +RUN apt-get update && apt-get install -y \ + s3fs \ + build-essential \ + autoconf \ + libcurl4-openssl-dev \ + libfuse-dev \ + libexpat1-dev \ + libssl-dev \ + zlib1g-dev \ + xfsprogs \ + psmisc \ + git && \ + rm -rf /var/lib/apt/lists/* + +# Compile & install s3backer +RUN git clone https://github.com/archiecobbs/s3backer.git ./s3backer + +WORKDIR "./s3backer" + +RUN ./autogen.sh && \ + ./configure && \ + make && \ + make install COPY ./_output/s3driver /s3driver ENTRYPOINT ["/s3driver"] diff --git a/test/Dockerfile b/test/Dockerfile index c61c431..b4991e8 100644 --- a/test/Dockerfile +++ b/test/Dockerfile @@ -1,23 +1,29 @@ FROM golang:stretch LABEL maintainers="Cyrill Troxler " LABEL description="s3 fuse csi plugin" -ARG S3QL_VERSION=release-2.28 -RUN apt-get update && \ - apt-get install -y \ - s3fs wget psmisc procps python3 python3-setuptools \ - python3-dev python3-pip python3-llfuse pkg-config \ - sqlite3 libsqlite3-dev python3-apsw cython && \ - rm -rf /var/lib/apt/lists/* +RUN apt-get update && apt-get install -y \ + s3fs \ + build-essential \ + autoconf \ + libcurl4-openssl-dev \ + libfuse-dev \ + libexpat1-dev \ + libssl-dev \ + zlib1g-dev \ + xfsprogs \ + psmisc \ + git && \ + rm -rf /var/lib/apt/lists/* -RUN pip3 install defusedxml dugong requests pycrypto +RUN git clone https://github.com/archiecobbs/s3backer.git ./s3backer -WORKDIR /usr/src -RUN wget -q https://github.com/s3ql/s3ql/archive/${S3QL_VERSION}.tar.gz -RUN tar -xzf ${S3QL_VERSION}.tar.gz -WORKDIR /usr/src/s3ql-${S3QL_VERSION} -RUN python3 setup.py build_cython build_ext --inplace -RUN python3 setup.py install +WORKDIR "./s3backer" + +RUN ["./autogen.sh"] +RUN ["./configure"] +RUN ["make"] +RUN ["make", "install"] RUN go get -u github.com/minio/minio && go install github.com/minio/minio/cmd diff --git a/test/Dockerfile.s3backer b/test/Dockerfile.s3backer deleted file mode 100644 index b4991e8..0000000 --- a/test/Dockerfile.s3backer +++ /dev/null @@ -1,30 +0,0 @@ -FROM golang:stretch -LABEL maintainers="Cyrill Troxler " -LABEL description="s3 fuse csi plugin" - -RUN apt-get update && apt-get install -y \ - s3fs \ - build-essential \ - autoconf \ - libcurl4-openssl-dev \ - libfuse-dev \ - libexpat1-dev \ - libssl-dev \ - zlib1g-dev \ - xfsprogs \ - psmisc \ - git && \ - rm -rf /var/lib/apt/lists/* - -RUN git clone https://github.com/archiecobbs/s3backer.git ./s3backer - -WORKDIR "./s3backer" - -RUN ["./autogen.sh"] -RUN ["./configure"] -RUN ["make"] -RUN ["make", "install"] - -RUN go get -u github.com/minio/minio && go install github.com/minio/minio/cmd - -CMD ["/go/src/github.com/ctrox/csi-s3-driver/test/test.sh"] diff --git a/test/Dockerfile.s3ql b/test/Dockerfile.s3ql new file mode 100644 index 0000000..c61c431 --- /dev/null +++ b/test/Dockerfile.s3ql @@ -0,0 +1,24 @@ +FROM golang:stretch +LABEL maintainers="Cyrill Troxler " +LABEL description="s3 fuse csi plugin" +ARG S3QL_VERSION=release-2.28 + +RUN apt-get update && \ + apt-get install -y \ + s3fs wget psmisc procps python3 python3-setuptools \ + python3-dev python3-pip python3-llfuse pkg-config \ + sqlite3 libsqlite3-dev python3-apsw cython && \ + rm -rf /var/lib/apt/lists/* + +RUN pip3 install defusedxml dugong requests pycrypto + +WORKDIR /usr/src +RUN wget -q https://github.com/s3ql/s3ql/archive/${S3QL_VERSION}.tar.gz +RUN tar -xzf ${S3QL_VERSION}.tar.gz +WORKDIR /usr/src/s3ql-${S3QL_VERSION} +RUN python3 setup.py build_cython build_ext --inplace +RUN python3 setup.py install + +RUN go get -u github.com/minio/minio && go install github.com/minio/minio/cmd + +CMD ["/go/src/github.com/ctrox/csi-s3-driver/test/test.sh"] From 87da8c6d21a22aaa4f1ae532f429122bc66e67fd Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Fri, 27 Jul 2018 13:34:24 +0200 Subject: [PATCH 10/10] Add section about s3backer to README --- README.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/README.md b/README.md index 18e5618..0a2850c 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,7 @@ As seen in the deployment example above, the driver can be configured to use one * [s3fs](https://github.com/s3fs-fuse/s3fs-fuse) * [goofys](https://github.com/kahing/goofys) * [s3ql](https://github.com/s3ql/s3ql) +* [s3backer](https://github.com/archiecobbs/s3backer) All mounters have different strengths and weaknesses depending on your use case. Here are some characteristics which should help you choose a mounter: @@ -98,6 +99,14 @@ All mounters have different strengths and weaknesses depending on your use case. * Supports compression before upload * Supports encryption before upload +### s3backer +* Represents a block device stored on S3 +* Allows to use a real filesystem +* Files are not readable with other S3 clients +* Support appends +* Supports compression before upload (Not yet implemented in this driver) +* Supports encryption before upload (Not yet implemented in this driver) + # Limitations As S3 is not a real file system there are some limitations to consider here. Depending on what mounter you are using, you will have different levels of POSIX compability. Also depending on what S3 storage backend you are using there are not always consistency guarantees. The detailed limitations can be found on the documentation of [s3fs](https://github.com/s3fs-fuse/s3fs-fuse#limitations) and [goofys](https://github.com/kahing/goofys#current-status).