Implement mounting via stage directory

Previously, multiple containers with the same mounted volume resulted in multiple
FUSE processes. This behaviour was breaking parallel modifications from different
containers, consumed extra resources, and after mounting via systemd was introduced,
led to the total inability to mount the same volume into multiple containers on
the same host.

Now only one FUSE process is started per volume, per host.
This commit is contained in:
Vitaliy Filippov 2023-03-07 00:41:41 +03:00
parent 8a7620d2fa
commit 345d7587f3
7 changed files with 100 additions and 57 deletions
pkg/mounter

View file

@ -3,11 +3,11 @@ package mounter
import (
"fmt"
"os"
"strings"
"time"
systemd "github.com/coreos/go-systemd/v22/dbus"
dbus "github.com/godbus/dbus/v5"
"github.com/golang/glog"
"github.com/yandex-cloud/k8s-csi-s3/pkg/s3"
)
@ -35,14 +35,6 @@ func newGeeseFSMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) {
}, nil
}
func (geesefs *geesefsMounter) Stage(stageTarget string) error {
return nil
}
func (geesefs *geesefsMounter) Unstage(stageTarget string) error {
return nil
}
func (geesefs *geesefsMounter) CopyBinary(from, to string) error {
st, err := os.Stat(from)
if err != nil {
@ -87,7 +79,7 @@ func (geesefs *geesefsMounter) MountDirect(target string, args []string) error {
return fuseMount(target, geesefsCmd, args)
}
func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error {
func (geesefs *geesefsMounter) Mount(target, volumeID string) error {
fullPath := fmt.Sprintf("%s:%s", geesefs.meta.BucketName, geesefs.meta.Prefix)
var args []string
if geesefs.region != "" {
@ -113,7 +105,7 @@ func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error {
}
conn, err := systemd.New()
if err != nil {
fmt.Printf("Failed to connect to systemd dbus service: %v, starting geesefs directly\n", err)
glog.Errorf("Failed to connect to systemd dbus service: %v, starting geesefs directly", err)
return geesefs.MountDirect(target, args)
}
defer conn.Close()
@ -127,7 +119,7 @@ func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error {
}
args = append([]string{pluginDir+"/geesefs", "-f", "-o", "allow_other", "--endpoint", geesefs.endpoint}, args...)
unitName := "geesefs-"+systemd.PathBusEscape(volumeID)+".service"
props := []systemd.Property{
newProps := []systemd.Property{
systemd.Property{
Name: "Description",
Value: dbus.MakeVariant("GeeseFS mount for Kubernetes volume "+volumeID),
@ -142,13 +134,35 @@ func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error {
Value: dbus.MakeVariant("inactive-or-failed"),
},
}
_, err = conn.StartTransientUnit(unitName, "replace", props, nil)
if err != nil && strings.Contains(err.Error(), "already exists") {
// Stop and garbage collect the unit if automatic collection didn't work for some reason
conn.StopUnit(unitName, "replace", nil)
conn.ResetFailedUnit(unitName)
_, err = conn.StartTransientUnit(unitName, "replace", props, nil)
unitProps, err := conn.GetAllProperties(unitName)
if err == nil {
// Unit already exists
if s, ok := unitProps["ActiveState"].(string); ok && (s == "active" || s == "activating" || s == "reloading") {
// Unit is already active
curPath := ""
prevExec, ok := unitProps["ExecStart"].([][]interface{})
if ok && len(prevExec) > 0 && len(prevExec[0]) >= 2 {
execArgs, ok := prevExec[0][1].([]string)
if ok && len(execArgs) >= 2 {
curPath = execArgs[len(execArgs)-1]
}
}
if curPath != target {
return fmt.Errorf(
"GeeseFS for volume %v is already mounted on host, but"+
" in a different directory. We want %v, but it's in %v",
volumeID, target, curPath,
)
}
// Already mounted at right location
return nil
} else {
// Stop and garbage collect the unit if automatic collection didn't work for some reason
conn.StopUnit(unitName, "replace", nil)
conn.ResetFailedUnit(unitName)
}
}
_, err = conn.StartTransientUnit(unitName, "replace", newProps, nil)
if err != nil {
return fmt.Errorf("Error starting systemd unit %s on host: %v", unitName, err)
}