Add experimental s3backer mounter

This also adds some generic handling of stale umounts.
Fuse returns immediately and does not indicate that
the mounter has finished writing to the backend.
The process finding is sort of hacky as I could not
find a better way to get to the PID from a fuse mount.
This commit is contained in:
Cyrill Troxler 2018-07-22 22:08:48 +02:00
parent 108364fb88
commit 82ab4b0983
11 changed files with 354 additions and 36 deletions

8
Gopkg.lock generated
View file

@ -158,6 +158,12 @@
packages = ["."] packages = ["."]
revision = "3864e76763d94a6df2f9960b16a20a33da9f9a66" revision = "3864e76763d94a6df2f9960b16a20a33da9f9a66"
[[projects]]
branch = "master"
name = "github.com/mitchellh/go-ps"
packages = ["."]
revision = "4fdf99ab29366514c69ccccddab5dc58b8d84062"
[[projects]] [[projects]]
name = "github.com/onsi/ginkgo" name = "github.com/onsi/ginkgo"
packages = [ packages = [
@ -372,6 +378,6 @@
[solve-meta] [solve-meta]
analyzer-name = "dep" analyzer-name = "dep"
analyzer-version = 1 analyzer-version = 1
inputs-digest = "659f47734b56af7fba146039231841e82cc4c3c95dff2814ec3688e967790a50" inputs-digest = "a655e5231fe7b5c962579c8f032338c53177a95a3160bef4628f0761f714f730"
solver-name = "gps-cdcl" solver-name = "gps-cdcl"
solver-version = 1 solver-version = 1

View file

@ -41,3 +41,7 @@
[[constraint]] [[constraint]]
name = "gopkg.in/ini.v1" name = "gopkg.in/ini.v1"
version = "1.38.1" version = "1.38.1"
[[constraint]]
branch = "master"
name = "github.com/mitchellh/go-ps"

View file

@ -24,7 +24,7 @@ build:
if [ ! -d ./vendor ]; then dep ensure -vendor-only; fi if [ ! -d ./vendor ]; then dep ensure -vendor-only; fi
CGO_ENABLED=0 GOOS=linux go build -a -ldflags '-extldflags "-static"' -o _output/s3driver ./cmd/s3driver CGO_ENABLED=0 GOOS=linux go build -a -ldflags '-extldflags "-static"' -o _output/s3driver ./cmd/s3driver
test: test:
docker build -t $(TEST_IMAGE_TAG) -f test/Dockerfile . docker build -t $(TEST_IMAGE_TAG) -f test/Dockerfile.s3backer .
docker run --rm --privileged -v $(PWD):$(PROJECT_DIR):ro -v /dev:/dev $(TEST_IMAGE_TAG) docker run --rm --privileged -v $(PWD):$(PROJECT_DIR):ro -v /dev:/dev $(TEST_IMAGE_TAG)
container: build container: build
docker build -t $(IMAGE_TAG) -f cmd/s3driver/Dockerfile.s3ql . docker build -t $(IMAGE_TAG) -f cmd/s3driver/Dockerfile.s3ql .

View file

@ -1,6 +1,12 @@
package s3 package s3
import "fmt" import (
"fmt"
"os/exec"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util/mount"
)
// Mounter interface which can be implemented // Mounter interface which can be implemented
// by the different mounter types // by the different mounter types
@ -14,6 +20,7 @@ const (
s3fsMounterType = "s3fs" s3fsMounterType = "s3fs"
goofysMounterType = "goofys" goofysMounterType = "goofys"
s3qlMounterType = "s3ql" s3qlMounterType = "s3ql"
s3backerMounterType = "s3backer"
) )
// newMounter returns a new mounter depending on the mounterType parameter // newMounter returns a new mounter depending on the mounterType parameter
@ -28,6 +35,38 @@ func newMounter(bucket string, cfg *Config) (Mounter, error) {
case s3qlMounterType: case s3qlMounterType:
return newS3qlMounter(bucket, cfg) return newS3qlMounter(bucket, cfg)
case s3backerMounterType:
return newS3backerMounter(bucket, cfg)
} }
return nil, fmt.Errorf("Error mounting bucket %s, invalid mounter specified: %s", bucket, cfg.Mounter) return nil, fmt.Errorf("Error mounting bucket %s, invalid mounter specified: %s", bucket, cfg.Mounter)
} }
func fuseMount(path string, command string, args []string) error {
cmd := exec.Command(command, args...)
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("Error fuseMount command: %s\nargs: %s\noutput: %s", command, args, out)
}
return nil
}
func fuseUnmount(path string, command string) error {
if err := mount.New("").Unmount(path); err != nil {
return err
}
// as fuse quits immediately, we will try to wait until the process is done
process, err := findFuseMountProcess(path, command)
if err != nil {
glog.Errorf("Error getting PID of fuse mount: %s", err)
return nil
}
if process == nil {
glog.Warningf("Unable to find PID of fuse mount %s, it must have finished already", path)
return nil
}
glog.Infof("Found fuse pid %v of mount %s, checking if it still runs", process.Pid, path)
return waitForProcess(process, 1)
}

View file

@ -7,10 +7,12 @@ import (
"context" "context"
goofysApi "github.com/kahing/goofys/api" goofysApi "github.com/kahing/goofys/api"
"k8s.io/kubernetes/pkg/util/mount"
) )
const defaultRegion = "us-east-1" const (
goofysCmd = "goofys"
defaultRegion = "us-east-1"
)
// Implements Mounter // Implements Mounter
type goofysMounter struct { type goofysMounter struct {
@ -64,5 +66,5 @@ func (goofys *goofysMounter) Mount(targetPath string) error {
} }
func (goofys *goofysMounter) Unmount(targetPath string) error { func (goofys *goofysMounter) Unmount(targetPath string) error {
return mount.New("").Unmount(targetPath) return fuseUnmount(targetPath, goofysCmd)
} }

153
pkg/s3/mounter_s3backer.go Normal file
View file

@ -0,0 +1,153 @@
package s3
import (
"fmt"
"io/ioutil"
"os"
"os/exec"
"path"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util/mount"
)
// Implements Mounter
type s3backerMounter struct {
bucket string
url string
region string
accessKeyID string
secretAccessKey string
size int64
initMountPath string
}
const (
s3backerCmd = "s3backer"
s3backerFsType = "xfs"
s3backerMountBase = "/mnt"
s3backerDevice = "file"
// blockSize to use in k
s3backerBlockSize = "128k"
)
func newS3backerMounter(bucket string, cfg *Config) (Mounter, error) {
s3backer := &s3backerMounter{
bucket: bucket,
url: cfg.Endpoint,
region: cfg.Region,
accessKeyID: cfg.AccessKeyID,
secretAccessKey: cfg.SecretAccessKey,
initMountPath: path.Join(s3backerMountBase, bucket),
size: 1024 * 1024 * 1024 * 10,
}
return s3backer, s3backer.writePasswd()
}
func (s3backer *s3backerMounter) String() string {
return s3backer.bucket
}
func (s3backer *s3backerMounter) Format() error {
tmpDir, err := ioutil.TempDir("", "s3backer")
if err != nil {
return err
}
defer os.RemoveAll(tmpDir)
if err := s3backer.mountInit(tmpDir); err != nil {
return err
}
defer fuseUnmount(tmpDir, s3backerCmd)
return formatFs(s3backerFsType, path.Join(tmpDir, s3backerDevice))
}
func (s3backer *s3backerMounter) Mount(targetPath string) error {
if err := os.MkdirAll(s3backer.initMountPath, 0700); err != nil {
return err
}
// s3backer requires two mounts
// first mount will fuse mount the bucket to a single 'file'
err := s3backer.mountInit(s3backer.initMountPath)
if err != nil {
return err
}
device := path.Join(s3backer.initMountPath, s3backerDevice)
// second mount will mount the 'file' as a filesystem
err = mount.New("").Mount(device, targetPath, s3backerFsType, []string{})
if err != nil {
// cleanup fuse mount
fuseUnmount(targetPath, s3backerCmd)
return err
}
return nil
}
func (s3backer *s3backerMounter) Unmount(targetPath string) error {
// Unmount the filesystem first
if err := mount.New("").Unmount(targetPath); err != nil {
return err
}
// Unmount the s3backer fuse mount
err := fuseUnmount(s3backer.initMountPath, s3backerCmd)
if err != nil {
return err
}
return nil
}
func (s3backer *s3backerMounter) mountInit(path string) error {
args := []string{
// baseURL must end with /
fmt.Sprintf("--baseURL=%s/", s3backer.url),
fmt.Sprintf("--blockSize=%v", s3backerBlockSize),
fmt.Sprintf("--size=%v", s3backer.size),
"--listBlocks",
s3backer.bucket,
path,
}
if s3backer.region != "" {
args = append(args, fmt.Sprintf("--region=%s", s3backer.region))
}
return fuseMount(path, s3backerCmd, args)
}
func (s3backer *s3backerMounter) writePasswd() error {
pwFileName := fmt.Sprintf("%s/.s3backer_passwd", os.Getenv("HOME"))
pwFile, err := os.OpenFile(pwFileName, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return err
}
_, err = pwFile.WriteString(s3backer.accessKeyID + ":" + s3backer.secretAccessKey)
if err != nil {
return err
}
pwFile.Close()
return nil
}
func formatFs(fsType string, device string) error {
diskMounter := &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: mount.NewOsExec()}
format, err := diskMounter.GetDiskFormat(device)
if err != nil {
return err
}
if format != "" {
glog.Infof("Disk %s is already formatted with format %s", device, format)
return nil
}
args := []string{
device,
}
cmd := exec.Command("mkfs."+fsType, args...)
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("Error formatting disk: %s", out)
}
glog.Info("Formatting fs with type %s, out: %s", fsType, out)
return nil
}

View file

@ -3,9 +3,6 @@ package s3
import ( import (
"fmt" "fmt"
"os" "os"
"os/exec"
"k8s.io/kubernetes/pkg/util/mount"
) )
// Implements Mounter // Implements Mounter
@ -16,6 +13,10 @@ type s3fsMounter struct {
pwFileContent string pwFileContent string
} }
const (
s3fsCmd = "s3fs"
)
func newS3fsMounter(bucket string, cfg *Config) (Mounter, error) { func newS3fsMounter(bucket string, cfg *Config) (Mounter, error) {
return &s3fsMounter{ return &s3fsMounter{
bucket: bucket, bucket: bucket,
@ -43,16 +44,11 @@ func (s3fs *s3fsMounter) Mount(targetPath string) error {
"-o", "allow_other", "-o", "allow_other",
"-o", "mp_umask=000", "-o", "mp_umask=000",
} }
cmd := exec.Command("s3fs", args...) return fuseMount(targetPath, s3fsCmd, args)
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("Error mounting using s3fs, output: %s", out)
}
return nil
} }
func (s3fs *s3fsMounter) Unmount(targetPath string) error { func (s3fs *s3fsMounter) Unmount(targetPath string) error {
return mount.New("").Unmount(targetPath) return fuseUnmount(targetPath, s3fsCmd)
} }
func writes3fsPass(pwFileContent string) error { func writes3fsPass(pwFileContent string) error {

View file

@ -3,7 +3,6 @@ package s3
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"io"
"net/url" "net/url"
"os" "os"
"os/exec" "os/exec"
@ -15,6 +14,7 @@ import (
// Implements Mounter // Implements Mounter
type s3qlMounter struct { type s3qlMounter struct {
bucket string
url string url string
bucketURL string bucketURL string
login string login string
@ -41,6 +41,7 @@ func newS3qlMounter(bucket string, cfg *Config) (Mounter, error) {
url.Scheme = "s3c" url.Scheme = "s3c"
} }
s3ql := &s3qlMounter{ s3ql := &s3qlMounter{
bucket: bucket,
url: url.String(), url: url.String(),
login: cfg.AccessKeyID, login: cfg.AccessKeyID,
password: cfg.SecretAccessKey, password: cfg.SecretAccessKey,
@ -67,7 +68,14 @@ func (s3ql *s3qlMounter) Format() error {
p := fmt.Sprintf("%s\n%s\n", s3ql.passphrase, s3ql.passphrase) p := fmt.Sprintf("%s\n%s\n", s3ql.passphrase, s3ql.passphrase)
reader := bytes.NewReader([]byte(p)) reader := bytes.NewReader([]byte(p))
return s3qlCmd(s3qlCmdMkfs, append(args, s3ql.options...), reader) cmd := exec.Command(s3qlCmdMkfs, append(args, s3ql.options...)...)
cmd.Stdin = reader
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("Error running s3ql command: %s", out)
}
return nil
} }
func (s3ql *s3qlMounter) Mount(targetPath string) error { func (s3ql *s3qlMounter) Mount(targetPath string) error {
@ -76,24 +84,11 @@ func (s3ql *s3qlMounter) Mount(targetPath string) error {
targetPath, targetPath,
"--allow-other", "--allow-other",
} }
return s3qlCmd(s3qlCmdMount, append(args, s3ql.options...), nil) return fuseMount(targetPath, s3qlCmdMount, append(args, s3ql.options...))
} }
func (s3ql *s3qlMounter) Unmount(targetPath string) error { func (s3ql *s3qlMounter) Unmount(targetPath string) error {
return s3qlCmd(s3qlCmdUnmount, []string{targetPath}, nil) return fuseUnmount(targetPath, s3qlCmdMount)
}
func s3qlCmd(s3qlCmd string, args []string, stdin io.Reader) error {
cmd := exec.Command(s3qlCmd, args...)
if stdin != nil {
cmd.Stdin = stdin
}
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("Error running s3ql command: %s", out)
}
return nil
} }
func (s3ql *s3qlMounter) writeConfig() error { func (s3ql *s3qlMounter) writeConfig() error {

View file

@ -113,4 +113,35 @@ var _ = Describe("S3Driver", func() {
}) })
}) })
Context("s3backer", func() {
socket := "/tmp/csi-s3backer.sock"
csiEndpoint := "unix://" + socket
cfg := &s3.Config{
AccessKeyID: "FJDSJ",
SecretAccessKey: "DSG643HGDS",
Endpoint: "http://127.0.0.1:9000",
Mounter: "s3backer",
}
if err := os.Remove(socket); err != nil && !os.IsNotExist(err) {
Expect(err).NotTo(HaveOccurred())
}
driver, err := s3.NewS3("test-node", csiEndpoint, cfg)
if err != nil {
log.Fatal(err)
}
go driver.Run()
defer os.RemoveAll(mntDir)
Describe("CSI sanity", func() {
sanityCfg := &sanity.Config{
TargetPath: mntDir,
Address: csiEndpoint,
TestVolumeSize: 1,
}
sanity.GinkgoTest(sanityCfg)
})
})
}) })

64
pkg/s3/util.go Normal file
View file

@ -0,0 +1,64 @@
package s3
import (
"fmt"
"io/ioutil"
"os"
"strings"
"syscall"
"time"
"github.com/mitchellh/go-ps"
"github.com/golang/glog"
)
func waitForProcess(p *os.Process, backoff int) error {
if backoff == 20 {
return fmt.Errorf("Timeout waiting for PID %v to end", p.Pid)
}
if err := p.Signal(syscall.Signal(0)); err != nil {
glog.Warningf("Fuse process does not seem active or we are unprivileged: %s", err)
return nil
}
glog.Infof("Fuse process with PID %v still active, waiting...", p.Pid)
time.Sleep(time.Duration(backoff*100) * time.Millisecond)
return waitForProcess(p, backoff+1)
}
func findFuseMountProcess(path string, name string) (*os.Process, error) {
processes, err := ps.Processes()
if err != nil {
return nil, err
}
for _, p := range processes {
if strings.Contains(p.Executable(), name) {
cmdLine, err := getCmdLine(p.Pid())
if err != nil {
glog.Errorf("Unable to get cmdline of PID %v: %s", p.Pid(), err)
continue
}
if cmdLine == "" {
// ignore defunct processes
// TODO: debug why this happens in the first place
// seems to only happen on k8s, not on local docker
continue
}
if strings.Contains(cmdLine, path) {
glog.Infof("Found matching pid %v on path %s", p.Pid(), path)
return os.FindProcess(p.Pid())
}
}
fmt.Println(p.Executable())
}
return nil, nil
}
func getCmdLine(pid int) (string, error) {
cmdLineFile := fmt.Sprintf("/proc/%v/cmdline", pid)
cmdLine, err := ioutil.ReadFile(cmdLineFile)
if err != nil {
return "", err
}
return string(cmdLine), nil
}

28
test/Dockerfile.s3backer Normal file
View file

@ -0,0 +1,28 @@
FROM golang:stretch
LABEL maintainers="Cyrill Troxler <cyrilltroxler@gmail.com>"
LABEL description="s3 fuse csi plugin"
RUN apt-get update && apt-get install -y \
build-essential \
autoconf \
libcurl4-openssl-dev \
libfuse-dev \
libexpat1-dev \
libssl-dev \
zlib1g-dev \
xfsprogs \
psmisc \
git
RUN git clone https://github.com/archiecobbs/s3backer.git ./s3backer
WORKDIR "./s3backer"
RUN ["./autogen.sh"]
RUN ["./configure"]
RUN ["make"]
RUN ["make", "install"]
RUN go get -u github.com/minio/minio && go install github.com/minio/minio/cmd
CMD ["/go/src/github.com/ctrox/csi-s3-driver/test/test.sh"]