diff --git a/deploy/kubernetes/csi-s3.yaml b/deploy/kubernetes/csi-s3.yaml index 41c5879..65456eb 100644 --- a/deploy/kubernetes/csi-s3.yaml +++ b/deploy/kubernetes/csi-s3.yaml @@ -103,6 +103,9 @@ spec: volumeMounts: - name: plugin-dir mountPath: /csi + - name: stage-dir + mountPath: /var/lib/kubelet/plugins/kubernetes.io/csi/ru.yandex.s3.csi + mountPropagation: "Bidirectional" - name: pods-mount-dir mountPath: /var/lib/kubelet/pods mountPropagation: "Bidirectional" @@ -119,6 +122,10 @@ spec: hostPath: path: /var/lib/kubelet/plugins/ru.yandex.s3.csi type: DirectoryOrCreate + - name: stage-dir + hostPath: + path: /var/lib/kubelet/plugins/kubernetes.io/csi/ru.yandex.s3.csi + type: DirectoryOrCreate - name: pods-mount-dir hostPath: path: /var/lib/kubelet/pods diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index 3e24bf3..a342a7a 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -33,7 +33,7 @@ type driver struct { } var ( - vendorVersion = "v1.2.0" + vendorVersion = "v1.34.6" driverName = "ru.yandex.s3.csi" ) diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go index 5acecdd..7b5587b 100644 --- a/pkg/driver/nodeserver.go +++ b/pkg/driver/nodeserver.go @@ -19,6 +19,7 @@ package driver import ( "fmt" "os" + "os/exec" "regexp" "strconv" @@ -68,7 +69,6 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis volumeID := req.GetVolumeId() targetPath := req.GetTargetPath() stagingTargetPath := req.GetStagingTargetPath() - bucketName, prefix := volumeIDToBucketPrefix(volumeID) // Check arguments if req.GetVolumeCapability() == nil { @@ -100,18 +100,12 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis glog.V(4).Infof("target %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n", targetPath, readOnly, volumeID, attrib, mountFlags) - s3, err := s3.NewClientFromSecret(req.GetSecrets()) + cmd := exec.Command("mount", "--bind", stagingTargetPath, targetPath) + cmd.Stderr = os.Stderr + glog.V(3).Infof("Binding volume %v from %v to %v", volumeID, stagingTargetPath, targetPath) + out, err := cmd.Output() if err != nil { - return nil, fmt.Errorf("failed to initialize S3 client: %s", err) - } - - meta := getMeta(bucketName, prefix, req.VolumeContext) - mounter, err := mounter.New(meta, s3.Config) - if err != nil { - return nil, err - } - if err := mounter.Mount(stagingTargetPath, targetPath, volumeID); err != nil { - return nil, err + return nil, fmt.Errorf("Error running mount --bind %v %v: %s", stagingTargetPath, targetPath, out) } glog.V(4).Infof("s3: volume %s successfully mounted to %s", volumeID, targetPath) @@ -131,7 +125,7 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } - if err := mounter.FuseUnmount(targetPath); err != nil { + if err := mounter.Unmount(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) } glog.V(4).Infof("s3: volume %s has been unmounted.", volumeID) @@ -174,7 +168,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol if err != nil { return nil, err } - if err := mounter.Stage(stagingTargetPath); err != nil { + if err := mounter.Mount(stagingTargetPath, volumeID); err != nil { return nil, err } @@ -193,6 +187,22 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } + proc, err := mounter.FindFuseMountProcess(stagingTargetPath) + if err != nil { + return nil, err + } + exists := false + if proc == nil { + exists, err = mounter.SystemdUnmount(volumeID) + if exists && err != nil { + return nil, err + } + } + if !exists { + err = mounter.FuseUnmount(stagingTargetPath) + } + glog.V(4).Infof("s3: volume %s has been unmounted from stage path %v.", volumeID, stagingTargetPath) + return &csi.NodeUnstageVolumeResponse{}, nil } diff --git a/pkg/mounter/geesefs.go b/pkg/mounter/geesefs.go index 7f8f025..cd28d82 100644 --- a/pkg/mounter/geesefs.go +++ b/pkg/mounter/geesefs.go @@ -3,11 +3,11 @@ package mounter import ( "fmt" "os" - "strings" "time" systemd "github.com/coreos/go-systemd/v22/dbus" dbus "github.com/godbus/dbus/v5" + "github.com/golang/glog" "github.com/yandex-cloud/k8s-csi-s3/pkg/s3" ) @@ -35,14 +35,6 @@ func newGeeseFSMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) { }, nil } -func (geesefs *geesefsMounter) Stage(stageTarget string) error { - return nil -} - -func (geesefs *geesefsMounter) Unstage(stageTarget string) error { - return nil -} - func (geesefs *geesefsMounter) CopyBinary(from, to string) error { st, err := os.Stat(from) if err != nil { @@ -87,7 +79,7 @@ func (geesefs *geesefsMounter) MountDirect(target string, args []string) error { return fuseMount(target, geesefsCmd, args) } -func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error { +func (geesefs *geesefsMounter) Mount(target, volumeID string) error { fullPath := fmt.Sprintf("%s:%s", geesefs.meta.BucketName, geesefs.meta.Prefix) var args []string if geesefs.region != "" { @@ -113,7 +105,7 @@ func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error { } conn, err := systemd.New() if err != nil { - fmt.Printf("Failed to connect to systemd dbus service: %v, starting geesefs directly\n", err) + glog.Errorf("Failed to connect to systemd dbus service: %v, starting geesefs directly", err) return geesefs.MountDirect(target, args) } defer conn.Close() @@ -127,7 +119,7 @@ func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error { } args = append([]string{pluginDir+"/geesefs", "-f", "-o", "allow_other", "--endpoint", geesefs.endpoint}, args...) unitName := "geesefs-"+systemd.PathBusEscape(volumeID)+".service" - props := []systemd.Property{ + newProps := []systemd.Property{ systemd.Property{ Name: "Description", Value: dbus.MakeVariant("GeeseFS mount for Kubernetes volume "+volumeID), @@ -142,13 +134,35 @@ func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error { Value: dbus.MakeVariant("inactive-or-failed"), }, } - _, err = conn.StartTransientUnit(unitName, "replace", props, nil) - if err != nil && strings.Contains(err.Error(), "already exists") { - // Stop and garbage collect the unit if automatic collection didn't work for some reason - conn.StopUnit(unitName, "replace", nil) - conn.ResetFailedUnit(unitName) - _, err = conn.StartTransientUnit(unitName, "replace", props, nil) + unitProps, err := conn.GetAllProperties(unitName) + if err == nil { + // Unit already exists + if s, ok := unitProps["ActiveState"].(string); ok && (s == "active" || s == "activating" || s == "reloading") { + // Unit is already active + curPath := "" + prevExec, ok := unitProps["ExecStart"].([][]interface{}) + if ok && len(prevExec) > 0 && len(prevExec[0]) >= 2 { + execArgs, ok := prevExec[0][1].([]string) + if ok && len(execArgs) >= 2 { + curPath = execArgs[len(execArgs)-1] + } + } + if curPath != target { + return fmt.Errorf( + "GeeseFS for volume %v is already mounted on host, but"+ + " in a different directory. We want %v, but it's in %v", + volumeID, target, curPath, + ) + } + // Already mounted at right location + return nil + } else { + // Stop and garbage collect the unit if automatic collection didn't work for some reason + conn.StopUnit(unitName, "replace", nil) + conn.ResetFailedUnit(unitName) + } } + _, err = conn.StartTransientUnit(unitName, "replace", newProps, nil) if err != nil { return fmt.Errorf("Error starting systemd unit %s on host: %v", unitName, err) } diff --git a/pkg/mounter/mounter.go b/pkg/mounter/mounter.go index 1f71afa..7a83b7e 100644 --- a/pkg/mounter/mounter.go +++ b/pkg/mounter/mounter.go @@ -11,18 +11,18 @@ import ( "syscall" "time" - "github.com/yandex-cloud/k8s-csi-s3/pkg/s3" + systemd "github.com/coreos/go-systemd/v22/dbus" "github.com/golang/glog" "github.com/mitchellh/go-ps" "k8s.io/kubernetes/pkg/util/mount" + + "github.com/yandex-cloud/k8s-csi-s3/pkg/s3" ) // Mounter interface which can be implemented // by the different mounter types type Mounter interface { - Stage(stagePath string) error - Unstage(stagePath string) error - Mount(source, target, volumeID string) error + Mount(target, volumeID string) error } const ( @@ -70,12 +70,40 @@ func fuseMount(path string, command string, args []string) error { return waitForMount(path, 10*time.Second) } +func Unmount(path string) error { + if err := mount.New("").Unmount(path); err != nil { + return err + } + return nil +} + +func SystemdUnmount(volumeID string) (bool, error) { + conn, err := systemd.New() + if err != nil { + glog.Errorf("Failed to connect to systemd dbus service: %v", err) + return false, err + } + defer conn.Close() + unitName := "geesefs-"+systemd.PathBusEscape(volumeID)+".service" + units, err := conn.ListUnitsByNames([]string{ unitName }) + glog.Errorf("Got %v", units) + if err != nil { + glog.Errorf("Failed to list systemd unit by name %v: %v", unitName, err) + return false, err + } + if len(units) == 0 || units[0].ActiveState == "inactive" || units[0].ActiveState == "failed" { + return true, nil + } + _, err = conn.StopUnit(unitName, "replace", nil) + return true, err +} + func FuseUnmount(path string) error { if err := mount.New("").Unmount(path); err != nil { return err } // as fuse quits immediately, we will try to wait until the process is done - process, err := findFuseMountProcess(path) + process, err := FindFuseMountProcess(path) if err != nil { glog.Errorf("Error getting PID of fuse mount: %s", err) return nil @@ -107,7 +135,7 @@ func waitForMount(path string, timeout time.Duration) error { } } -func findFuseMountProcess(path string) (*os.Process, error) { +func FindFuseMountProcess(path string) (*os.Process, error) { processes, err := ps.Processes() if err != nil { return nil, err diff --git a/pkg/mounter/rclone.go b/pkg/mounter/rclone.go index d918702..7b912c2 100644 --- a/pkg/mounter/rclone.go +++ b/pkg/mounter/rclone.go @@ -31,15 +31,7 @@ func newRcloneMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) { }, nil } -func (rclone *rcloneMounter) Stage(stageTarget string) error { - return nil -} - -func (rclone *rcloneMounter) Unstage(stageTarget string) error { - return nil -} - -func (rclone *rcloneMounter) Mount(source, target, volumeID string) error { +func (rclone *rcloneMounter) Mount(target, volumeID string) error { args := []string{ "mount", fmt.Sprintf(":s3:%s", path.Join(rclone.meta.BucketName, rclone.meta.Prefix)), diff --git a/pkg/mounter/s3fs.go b/pkg/mounter/s3fs.go index c4eba11..533522f 100644 --- a/pkg/mounter/s3fs.go +++ b/pkg/mounter/s3fs.go @@ -28,15 +28,7 @@ func newS3fsMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) { }, nil } -func (s3fs *s3fsMounter) Stage(stageTarget string) error { - return nil -} - -func (s3fs *s3fsMounter) Unstage(stageTarget string) error { - return nil -} - -func (s3fs *s3fsMounter) Mount(source, target, volumeID string) error { +func (s3fs *s3fsMounter) Mount(target, volumeID string) error { if err := writes3fsPass(s3fs.pwFileContent); err != nil { return err }