From 345d7587f3257bd9034a3f4b372adf1421e841cb Mon Sep 17 00:00:00 2001
From: Vitaliy Filippov <vitalif@yourcmc.ru>
Date: Tue, 7 Mar 2023 00:41:41 +0300
Subject: [PATCH] Implement mounting via stage directory

Previously, multiple containers with the same mounted volume resulted in multiple
FUSE processes. This behaviour was breaking parallel modifications from different
containers, consumed extra resources, and after mounting via systemd was introduced,
led to the total inability to mount the same volume into multiple containers on
the same host.

Now only one FUSE process is started per volume, per host.
---
 deploy/kubernetes/csi-s3.yaml |  7 +++++
 pkg/driver/driver.go          |  2 +-
 pkg/driver/nodeserver.go      | 38 ++++++++++++++++----------
 pkg/mounter/geesefs.go        | 50 ++++++++++++++++++++++-------------
 pkg/mounter/mounter.go        | 40 +++++++++++++++++++++++-----
 pkg/mounter/rclone.go         | 10 +------
 pkg/mounter/s3fs.go           | 10 +------
 7 files changed, 100 insertions(+), 57 deletions(-)

diff --git a/deploy/kubernetes/csi-s3.yaml b/deploy/kubernetes/csi-s3.yaml
index 41c5879..65456eb 100644
--- a/deploy/kubernetes/csi-s3.yaml
+++ b/deploy/kubernetes/csi-s3.yaml
@@ -103,6 +103,9 @@ spec:
           volumeMounts:
             - name: plugin-dir
               mountPath: /csi
+            - name: stage-dir
+              mountPath: /var/lib/kubelet/plugins/kubernetes.io/csi/ru.yandex.s3.csi
+              mountPropagation: "Bidirectional"
             - name: pods-mount-dir
               mountPath: /var/lib/kubelet/pods
               mountPropagation: "Bidirectional"
@@ -119,6 +122,10 @@ spec:
           hostPath:
             path: /var/lib/kubelet/plugins/ru.yandex.s3.csi
             type: DirectoryOrCreate
+        - name: stage-dir
+          hostPath:
+            path: /var/lib/kubelet/plugins/kubernetes.io/csi/ru.yandex.s3.csi
+            type: DirectoryOrCreate
         - name: pods-mount-dir
           hostPath:
             path: /var/lib/kubelet/pods
diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go
index 3e24bf3..a342a7a 100644
--- a/pkg/driver/driver.go
+++ b/pkg/driver/driver.go
@@ -33,7 +33,7 @@ type driver struct {
 }
 
 var (
-	vendorVersion = "v1.2.0"
+	vendorVersion = "v1.34.6"
 	driverName    = "ru.yandex.s3.csi"
 )
 
diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go
index 5acecdd..7b5587b 100644
--- a/pkg/driver/nodeserver.go
+++ b/pkg/driver/nodeserver.go
@@ -19,6 +19,7 @@ package driver
 import (
 	"fmt"
 	"os"
+	"os/exec"
 	"regexp"
 	"strconv"
 
@@ -68,7 +69,6 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
 	volumeID := req.GetVolumeId()
 	targetPath := req.GetTargetPath()
 	stagingTargetPath := req.GetStagingTargetPath()
-	bucketName, prefix := volumeIDToBucketPrefix(volumeID)
 
 	// Check arguments
 	if req.GetVolumeCapability() == nil {
@@ -100,18 +100,12 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
 	glog.V(4).Infof("target %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n",
 		targetPath, readOnly, volumeID, attrib, mountFlags)
 
-	s3, err := s3.NewClientFromSecret(req.GetSecrets())
+	cmd := exec.Command("mount", "--bind", stagingTargetPath, targetPath)
+	cmd.Stderr = os.Stderr
+	glog.V(3).Infof("Binding volume %v from %v to %v", volumeID, stagingTargetPath, targetPath)
+	out, err := cmd.Output()
 	if err != nil {
-		return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
-	}
-
-	meta := getMeta(bucketName, prefix, req.VolumeContext)
-	mounter, err := mounter.New(meta, s3.Config)
-	if err != nil {
-		return nil, err
-	}
-	if err := mounter.Mount(stagingTargetPath, targetPath, volumeID); err != nil {
-		return nil, err
+		return nil, fmt.Errorf("Error running mount --bind %v %v: %s", stagingTargetPath, targetPath, out)
 	}
 
 	glog.V(4).Infof("s3: volume %s successfully mounted to %s", volumeID, targetPath)
@@ -131,7 +125,7 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
 		return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
 	}
 
-	if err := mounter.FuseUnmount(targetPath); err != nil {
+	if err := mounter.Unmount(targetPath); err != nil {
 		return nil, status.Error(codes.Internal, err.Error())
 	}
 	glog.V(4).Infof("s3: volume %s has been unmounted.", volumeID)
@@ -174,7 +168,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
 	if err != nil {
 		return nil, err
 	}
-	if err := mounter.Stage(stagingTargetPath); err != nil {
+	if err := mounter.Mount(stagingTargetPath, volumeID); err != nil {
 		return nil, err
 	}
 
@@ -193,6 +187,22 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
 		return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
 	}
 
+	proc, err := mounter.FindFuseMountProcess(stagingTargetPath)
+	if err != nil {
+		return nil, err
+	}
+	exists := false
+	if proc == nil {
+		exists, err = mounter.SystemdUnmount(volumeID)
+		if exists && err != nil {
+			return nil, err
+		}
+	}
+	if !exists {
+		err = mounter.FuseUnmount(stagingTargetPath)
+	}
+	glog.V(4).Infof("s3: volume %s has been unmounted from stage path %v.", volumeID, stagingTargetPath)
+
 	return &csi.NodeUnstageVolumeResponse{}, nil
 }
 
diff --git a/pkg/mounter/geesefs.go b/pkg/mounter/geesefs.go
index 7f8f025..cd28d82 100644
--- a/pkg/mounter/geesefs.go
+++ b/pkg/mounter/geesefs.go
@@ -3,11 +3,11 @@ package mounter
 import (
 	"fmt"
 	"os"
-	"strings"
 	"time"
 
 	systemd "github.com/coreos/go-systemd/v22/dbus"
 	dbus "github.com/godbus/dbus/v5"
+	"github.com/golang/glog"
 
 	"github.com/yandex-cloud/k8s-csi-s3/pkg/s3"
 )
@@ -35,14 +35,6 @@ func newGeeseFSMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) {
 	}, nil
 }
 
-func (geesefs *geesefsMounter) Stage(stageTarget string) error {
-	return nil
-}
-
-func (geesefs *geesefsMounter) Unstage(stageTarget string) error {
-	return nil
-}
-
 func (geesefs *geesefsMounter) CopyBinary(from, to string) error {
 	st, err := os.Stat(from)
 	if err != nil {
@@ -87,7 +79,7 @@ func (geesefs *geesefsMounter) MountDirect(target string, args []string) error {
 	return fuseMount(target, geesefsCmd, args)
 }
 
-func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error {
+func (geesefs *geesefsMounter) Mount(target, volumeID string) error {
 	fullPath := fmt.Sprintf("%s:%s", geesefs.meta.BucketName, geesefs.meta.Prefix)
 	var args []string
 	if geesefs.region != "" {
@@ -113,7 +105,7 @@ func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error {
 	}
 	conn, err := systemd.New()
 	if err != nil {
-		fmt.Printf("Failed to connect to systemd dbus service: %v, starting geesefs directly\n", err)
+		glog.Errorf("Failed to connect to systemd dbus service: %v, starting geesefs directly", err)
 		return geesefs.MountDirect(target, args)
 	}
 	defer conn.Close()
@@ -127,7 +119,7 @@ func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error {
 	}
 	args = append([]string{pluginDir+"/geesefs", "-f", "-o", "allow_other", "--endpoint", geesefs.endpoint}, args...)
 	unitName := "geesefs-"+systemd.PathBusEscape(volumeID)+".service"
-	props := []systemd.Property{
+	newProps := []systemd.Property{
 		systemd.Property{
 			Name: "Description",
 			Value: dbus.MakeVariant("GeeseFS mount for Kubernetes volume "+volumeID),
@@ -142,13 +134,35 @@ func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error {
 			Value: dbus.MakeVariant("inactive-or-failed"),
 		},
 	}
-	_, err = conn.StartTransientUnit(unitName, "replace", props, nil)
-	if err != nil && strings.Contains(err.Error(), "already exists") {
-		// Stop and garbage collect the unit if automatic collection didn't work for some reason
-		conn.StopUnit(unitName, "replace", nil)
-		conn.ResetFailedUnit(unitName)
-		_, err = conn.StartTransientUnit(unitName, "replace", props, nil)
+	unitProps, err := conn.GetAllProperties(unitName)
+	if err == nil {
+		// Unit already exists
+		if s, ok := unitProps["ActiveState"].(string); ok && (s == "active" || s == "activating" || s == "reloading") {
+			// Unit is already active
+			curPath := ""
+			prevExec, ok := unitProps["ExecStart"].([][]interface{})
+			if ok && len(prevExec) > 0 && len(prevExec[0]) >= 2 {
+				execArgs, ok := prevExec[0][1].([]string)
+				if ok && len(execArgs) >= 2 {
+					curPath = execArgs[len(execArgs)-1]
+				}
+			}
+			if curPath != target {
+				return fmt.Errorf(
+					"GeeseFS for volume %v is already mounted on host, but"+
+					" in a different directory. We want %v, but it's in %v",
+					volumeID, target, curPath,
+				)
+			}
+			// Already mounted at right location
+			return nil
+		} else {
+			// Stop and garbage collect the unit if automatic collection didn't work for some reason
+			conn.StopUnit(unitName, "replace", nil)
+			conn.ResetFailedUnit(unitName)
+		}
 	}
+	_, err = conn.StartTransientUnit(unitName, "replace", newProps, nil)
 	if err != nil {
 		return fmt.Errorf("Error starting systemd unit %s on host: %v", unitName, err)
 	}
diff --git a/pkg/mounter/mounter.go b/pkg/mounter/mounter.go
index 1f71afa..7a83b7e 100644
--- a/pkg/mounter/mounter.go
+++ b/pkg/mounter/mounter.go
@@ -11,18 +11,18 @@ import (
 	"syscall"
 	"time"
 
-	"github.com/yandex-cloud/k8s-csi-s3/pkg/s3"
+	systemd "github.com/coreos/go-systemd/v22/dbus"
 	"github.com/golang/glog"
 	"github.com/mitchellh/go-ps"
 	"k8s.io/kubernetes/pkg/util/mount"
+
+	"github.com/yandex-cloud/k8s-csi-s3/pkg/s3"
 )
 
 // Mounter interface which can be implemented
 // by the different mounter types
 type Mounter interface {
-	Stage(stagePath string) error
-	Unstage(stagePath string) error
-	Mount(source, target, volumeID string) error
+	Mount(target, volumeID string) error
 }
 
 const (
@@ -70,12 +70,40 @@ func fuseMount(path string, command string, args []string) error {
 	return waitForMount(path, 10*time.Second)
 }
 
+func Unmount(path string) error {
+	if err := mount.New("").Unmount(path); err != nil {
+		return err
+	}
+	return nil
+}
+
+func SystemdUnmount(volumeID string) (bool, error) {
+	conn, err := systemd.New()
+	if err != nil {
+		glog.Errorf("Failed to connect to systemd dbus service: %v", err)
+		return false, err
+	}
+	defer conn.Close()
+	unitName := "geesefs-"+systemd.PathBusEscape(volumeID)+".service"
+	units, err := conn.ListUnitsByNames([]string{ unitName })
+	glog.Errorf("Got %v", units)
+	if err != nil {
+		glog.Errorf("Failed to list systemd unit by name %v: %v", unitName, err)
+		return false, err
+	}
+	if len(units) == 0 || units[0].ActiveState == "inactive" || units[0].ActiveState == "failed" {
+		return true, nil
+	}
+	_, err = conn.StopUnit(unitName, "replace", nil)
+	return true, err
+}
+
 func FuseUnmount(path string) error {
 	if err := mount.New("").Unmount(path); err != nil {
 		return err
 	}
 	// as fuse quits immediately, we will try to wait until the process is done
-	process, err := findFuseMountProcess(path)
+	process, err := FindFuseMountProcess(path)
 	if err != nil {
 		glog.Errorf("Error getting PID of fuse mount: %s", err)
 		return nil
@@ -107,7 +135,7 @@ func waitForMount(path string, timeout time.Duration) error {
 	}
 }
 
-func findFuseMountProcess(path string) (*os.Process, error) {
+func FindFuseMountProcess(path string) (*os.Process, error) {
 	processes, err := ps.Processes()
 	if err != nil {
 		return nil, err
diff --git a/pkg/mounter/rclone.go b/pkg/mounter/rclone.go
index d918702..7b912c2 100644
--- a/pkg/mounter/rclone.go
+++ b/pkg/mounter/rclone.go
@@ -31,15 +31,7 @@ func newRcloneMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) {
 	}, nil
 }
 
-func (rclone *rcloneMounter) Stage(stageTarget string) error {
-	return nil
-}
-
-func (rclone *rcloneMounter) Unstage(stageTarget string) error {
-	return nil
-}
-
-func (rclone *rcloneMounter) Mount(source, target, volumeID string) error {
+func (rclone *rcloneMounter) Mount(target, volumeID string) error {
 	args := []string{
 		"mount",
 		fmt.Sprintf(":s3:%s", path.Join(rclone.meta.BucketName, rclone.meta.Prefix)),
diff --git a/pkg/mounter/s3fs.go b/pkg/mounter/s3fs.go
index c4eba11..533522f 100644
--- a/pkg/mounter/s3fs.go
+++ b/pkg/mounter/s3fs.go
@@ -28,15 +28,7 @@ func newS3fsMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) {
 	}, nil
 }
 
-func (s3fs *s3fsMounter) Stage(stageTarget string) error {
-	return nil
-}
-
-func (s3fs *s3fsMounter) Unstage(stageTarget string) error {
-	return nil
-}
-
-func (s3fs *s3fsMounter) Mount(source, target, volumeID string) error {
+func (s3fs *s3fsMounter) Mount(target, volumeID string) error {
 	if err := writes3fsPass(s3fs.pwFileContent); err != nil {
 		return err
 	}