diff --git a/README.md b/README.md index 27bf62d..438c4b5 100644 --- a/README.md +++ b/README.md @@ -94,6 +94,7 @@ metadata: provisioner: ru.yandex.s3.csi parameters: mounter: geesefs + options: "--memory-limit 1000 --dir-mode 0777 --file-mode 0666" bucket: some-existing-bucket-name ``` @@ -124,6 +125,10 @@ You can check POSIX compatibility matrix here: https://github.com/yandex-cloud/g * Almost full POSIX compatibility * Good performance for both small and big files * Does not store file permissions and custom modification times +* By default runs **outside** of the csi-s3 container using systemd, to not crash + mountpoints with "Transport endpoint is not connected" when csi-s3 is upgraded + or restarted. Add `--no-systemd` to `parameters.options` of the `StorageClass` + to disable this behaviour. #### s3fs diff --git a/deploy/kubernetes/csi-s3.yaml b/deploy/kubernetes/csi-s3.yaml index d2eac6d..867b7ea 100644 --- a/deploy/kubernetes/csi-s3.yaml +++ b/deploy/kubernetes/csi-s3.yaml @@ -108,6 +108,8 @@ spec: mountPropagation: "Bidirectional" - name: fuse-device mountPath: /dev/fuse + - name: systemd-control + mountPath: /run/systemd volumes: - name: registration-dir hostPath: @@ -124,3 +126,7 @@ spec: - name: fuse-device hostPath: path: /dev/fuse + - name: systemd-control + hostPath: + path: /run/systemd + type: DirectoryOrCreate diff --git a/go.mod b/go.mod index d10aab1..089a40b 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,8 @@ go 1.15 require ( github.com/container-storage-interface/spec v1.1.0 + github.com/coreos/go-systemd/v22 v22.5.0 // indirect + github.com/godbus/dbus/v5 v5.0.4 // indirect github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b github.com/golang/protobuf v1.1.0 // indirect github.com/kubernetes-csi/csi-lib-utils v0.6.1 // indirect diff --git a/go.sum b/go.sum index 66003e9..23c5826 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,13 @@ github.com/container-storage-interface/spec v1.1.0 h1:qPsTqtR1VUPvMPeK0UnCZMtXaKGyyLPG8gj/wG6VqMs= github.com/container-storage-interface/spec v1.1.0/go.mod h1:6URME8mwIBbpVyZV93Ce5St17xBiQJQY67NDsuohiy4= +github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/protobuf v1.1.0 h1:0iH4Ffd/meGoXqF2lSAhZHt8X+cPgkfn/cb6Cce5Vpc= diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go index 2b8e281..5acecdd 100644 --- a/pkg/driver/nodeserver.go +++ b/pkg/driver/nodeserver.go @@ -110,7 +110,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis if err != nil { return nil, err } - if err := mounter.Mount(stagingTargetPath, targetPath); err != nil { + if err := mounter.Mount(stagingTargetPath, targetPath, volumeID); err != nil { return nil, err } diff --git a/pkg/mounter/geesefs.go b/pkg/mounter/geesefs.go index 624c6ba..9be6b1c 100644 --- a/pkg/mounter/geesefs.go +++ b/pkg/mounter/geesefs.go @@ -3,6 +3,11 @@ package mounter import ( "fmt" "os" + "strings" + "time" + + systemd "github.com/coreos/go-systemd/v22/dbus" + dbus "github.com/godbus/dbus/v5" "github.com/yandex-cloud/k8s-csi-s3/pkg/s3" ) @@ -38,19 +43,109 @@ func (geesefs *geesefsMounter) Unstage(stageTarget string) error { return nil } -func (geesefs *geesefsMounter) Mount(source string, target string) error { - fullPath := fmt.Sprintf("%s:%s", geesefs.meta.BucketName, geesefs.meta.Prefix) - args := []string{ +func (geesefs *geesefsMounter) CopyBinary(from, to string) error { + st, err := os.Stat(from) + if err != nil { + return fmt.Errorf("Failed to stat %s: %v", from, err) + } + st2, err := os.Stat(to) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("Failed to stat %s: %v", to, err) + } + if err != nil || st2.Size() != st.Size() || st2.ModTime() != st.ModTime() { + if err == nil { + // remove the file first to not hit "text file busy" errors + err = os.Remove(to) + if err != nil { + return fmt.Errorf("Error removing %s to update it: %v", to, err) + } + } + bin, err := os.ReadFile(from) + if err != nil { + return fmt.Errorf("Error copying %s to %s: %v", from, to, err) + } + err = os.WriteFile(to, bin, 0755) + if err != nil { + return fmt.Errorf("Error copying %s to %s: %v", from, to, err) + } + err = os.Chtimes(to, st.ModTime(), st.ModTime()) + if err != nil { + return fmt.Errorf("Error copying %s to %s: %v", from, to, err) + } + } + return nil +} + +func (geesefs *geesefsMounter) MountDirect(target string, args []string) error { + args = append([]string{ "--endpoint", geesefs.endpoint, "-o", "allow_other", "--log-file", "/dev/stderr", - } - if geesefs.region != "" { - args = append(args, "--region", geesefs.region) - } - args = append(args, geesefs.meta.MountOptions...) - args = append(args, fullPath, target) + }, args...) os.Setenv("AWS_ACCESS_KEY_ID", geesefs.accessKeyID) os.Setenv("AWS_SECRET_ACCESS_KEY", geesefs.secretAccessKey) return fuseMount(target, geesefsCmd, args) } + +func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error { + fullPath := fmt.Sprintf("%s:%s", geesefs.meta.BucketName, geesefs.meta.Prefix) + var args []string + if geesefs.region != "" { + args = append(args, "--region", geesefs.region) + } + useSystemd := true + for i := 0; i < len(geesefs.meta.MountOptions); i++ { + if geesefs.meta.MountOptions[i] == "--no-systemd" { + useSystemd = false + } else { + args = append(args, geesefs.meta.MountOptions[i]) + } + } + args = append(args, fullPath, target) + // Try to start geesefs using systemd so it doesn't get killed when the container exits + if !useSystemd { + return geesefs.MountDirect(target, args) + } + conn, err := systemd.New() + if err != nil { + fmt.Printf("Failed to connect to systemd dbus service: %v, starting geesefs directly\n", err) + return geesefs.MountDirect(target, args) + } + defer conn.Close() + // systemd is present + if err = geesefs.CopyBinary("/usr/bin/geesefs", "/csi/geesefs"); err != nil { + return err + } + pluginDir := os.Getenv("PLUGIN_DIR") + if pluginDir == "" { + pluginDir = "/var/lib/kubelet/plugins/ru.yandex.s3.csi" + } + args = append([]string{pluginDir+"/geesefs", "-f", "-o", "allow_other", "--endpoint", geesefs.endpoint}, args...) + unitName := "geesefs-"+systemd.PathBusEscape(volumeID)+".service" + props := []systemd.Property{ + systemd.Property{ + Name: "Description", + Value: dbus.MakeVariant("GeeseFS mount for Kubernetes volume "+volumeID), + }, + systemd.PropExecStart(args, false), + systemd.Property{ + Name: "Environment", + Value: dbus.MakeVariant([]string{ "AWS_ACCESS_KEY_ID="+geesefs.accessKeyID, "AWS_SECRET_ACCESS_KEY="+geesefs.secretAccessKey }), + }, + systemd.Property{ + Name: "CollectMode", + Value: dbus.MakeVariant("inactive-or-failed"), + }, + } + _, err = conn.StartTransientUnit(unitName, "replace", props, nil) + if err != nil && strings.Contains(err.Error(), "already exists") { + // Stop and garbage collect the unit if automatic collection didn't work for some reason + conn.StopUnit(unitName, "replace", nil) + conn.ResetFailedUnit(unitName) + _, err = conn.StartTransientUnit(unitName, "replace", props, nil) + } + if err != nil { + return fmt.Errorf("Error starting systemd unit %s on host: %v", unitName, err) + } + return waitForMount(target, 10*time.Second) +} diff --git a/pkg/mounter/mounter.go b/pkg/mounter/mounter.go index 1e79421..1f71afa 100644 --- a/pkg/mounter/mounter.go +++ b/pkg/mounter/mounter.go @@ -22,7 +22,7 @@ import ( type Mounter interface { Stage(stagePath string) error Unstage(stagePath string) error - Mount(source string, target string) error + Mount(source, target, volumeID string) error } const ( diff --git a/pkg/mounter/rclone.go b/pkg/mounter/rclone.go index eebc239..d918702 100644 --- a/pkg/mounter/rclone.go +++ b/pkg/mounter/rclone.go @@ -39,7 +39,7 @@ func (rclone *rcloneMounter) Unstage(stageTarget string) error { return nil } -func (rclone *rcloneMounter) Mount(source string, target string) error { +func (rclone *rcloneMounter) Mount(source, target, volumeID string) error { args := []string{ "mount", fmt.Sprintf(":s3:%s", path.Join(rclone.meta.BucketName, rclone.meta.Prefix)), diff --git a/pkg/mounter/s3fs.go b/pkg/mounter/s3fs.go index 644aed6..c4eba11 100644 --- a/pkg/mounter/s3fs.go +++ b/pkg/mounter/s3fs.go @@ -36,7 +36,7 @@ func (s3fs *s3fsMounter) Unstage(stageTarget string) error { return nil } -func (s3fs *s3fsMounter) Mount(source string, target string) error { +func (s3fs *s3fsMounter) Mount(source, target, volumeID string) error { if err := writes3fsPass(s3fs.pwFileContent); err != nil { return err }