Implement mounting via stage directory
Previously, multiple containers with the same mounted volume resulted in multiple FUSE processes. This behaviour was breaking parallel modifications from different containers, consumed extra resources, and after mounting via systemd was introduced, led to the total inability to mount the same volume into multiple containers on the same host. Now only one FUSE process is started per volume, per host.
This commit is contained in:
parent
1305b20bae
commit
ecf1031dfc
7 changed files with 100 additions and 57 deletions
|
@ -103,6 +103,9 @@ spec:
|
|||
volumeMounts:
|
||||
- name: plugin-dir
|
||||
mountPath: /csi
|
||||
- name: stage-dir
|
||||
mountPath: /var/lib/kubelet/plugins/kubernetes.io/csi/ru.yandex.s3.csi
|
||||
mountPropagation: "Bidirectional"
|
||||
- name: pods-mount-dir
|
||||
mountPath: /var/lib/kubelet/pods
|
||||
mountPropagation: "Bidirectional"
|
||||
|
@ -119,6 +122,10 @@ spec:
|
|||
hostPath:
|
||||
path: /var/lib/kubelet/plugins/ru.yandex.s3.csi
|
||||
type: DirectoryOrCreate
|
||||
- name: stage-dir
|
||||
hostPath:
|
||||
path: /var/lib/kubelet/plugins/kubernetes.io/csi/ru.yandex.s3.csi
|
||||
type: DirectoryOrCreate
|
||||
- name: pods-mount-dir
|
||||
hostPath:
|
||||
path: /var/lib/kubelet/pods
|
||||
|
|
|
@ -33,7 +33,7 @@ type driver struct {
|
|||
}
|
||||
|
||||
var (
|
||||
vendorVersion = "v1.2.0"
|
||||
vendorVersion = "v1.34.6"
|
||||
driverName = "ru.yandex.s3.csi"
|
||||
)
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ package driver
|
|||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"regexp"
|
||||
"strconv"
|
||||
|
||||
|
@ -68,7 +69,6 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||
volumeID := req.GetVolumeId()
|
||||
targetPath := req.GetTargetPath()
|
||||
stagingTargetPath := req.GetStagingTargetPath()
|
||||
bucketName, prefix := volumeIDToBucketPrefix(volumeID)
|
||||
|
||||
// Check arguments
|
||||
if req.GetVolumeCapability() == nil {
|
||||
|
@ -100,18 +100,12 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||
glog.V(4).Infof("target %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n",
|
||||
targetPath, readOnly, volumeID, attrib, mountFlags)
|
||||
|
||||
s3, err := s3.NewClientFromSecret(req.GetSecrets())
|
||||
cmd := exec.Command("mount", "--bind", stagingTargetPath, targetPath)
|
||||
cmd.Stderr = os.Stderr
|
||||
glog.V(3).Infof("Binding volume %v from %v to %v", volumeID, stagingTargetPath, targetPath)
|
||||
out, err := cmd.Output()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
|
||||
}
|
||||
|
||||
meta := getMeta(bucketName, prefix, req.VolumeContext)
|
||||
mounter, err := mounter.New(meta, s3.Config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := mounter.Mount(stagingTargetPath, targetPath, volumeID); err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("Error running mount --bind %v %v: %s", stagingTargetPath, targetPath, out)
|
||||
}
|
||||
|
||||
glog.V(4).Infof("s3: volume %s successfully mounted to %s", volumeID, targetPath)
|
||||
|
@ -131,7 +125,7 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
|
|||
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
|
||||
}
|
||||
|
||||
if err := mounter.FuseUnmount(targetPath); err != nil {
|
||||
if err := mounter.Unmount(targetPath); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
glog.V(4).Infof("s3: volume %s has been unmounted.", volumeID)
|
||||
|
@ -174,7 +168,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := mounter.Stage(stagingTargetPath); err != nil {
|
||||
if err := mounter.Mount(stagingTargetPath, volumeID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -193,6 +187,22 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
|
|||
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
|
||||
}
|
||||
|
||||
proc, err := mounter.FindFuseMountProcess(stagingTargetPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
exists := false
|
||||
if proc == nil {
|
||||
exists, err = mounter.SystemdUnmount(volumeID)
|
||||
if exists && err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if !exists {
|
||||
err = mounter.FuseUnmount(stagingTargetPath)
|
||||
}
|
||||
glog.V(4).Infof("s3: volume %s has been unmounted from stage path %v.", volumeID, stagingTargetPath)
|
||||
|
||||
return &csi.NodeUnstageVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -3,11 +3,11 @@ package mounter
|
|||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
systemd "github.com/coreos/go-systemd/v22/dbus"
|
||||
dbus "github.com/godbus/dbus/v5"
|
||||
"github.com/golang/glog"
|
||||
|
||||
"github.com/yandex-cloud/k8s-csi-s3/pkg/s3"
|
||||
)
|
||||
|
@ -35,14 +35,6 @@ func newGeeseFSMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (geesefs *geesefsMounter) Stage(stageTarget string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (geesefs *geesefsMounter) Unstage(stageTarget string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (geesefs *geesefsMounter) CopyBinary(from, to string) error {
|
||||
st, err := os.Stat(from)
|
||||
if err != nil {
|
||||
|
@ -87,7 +79,7 @@ func (geesefs *geesefsMounter) MountDirect(target string, args []string) error {
|
|||
return fuseMount(target, geesefsCmd, args)
|
||||
}
|
||||
|
||||
func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error {
|
||||
func (geesefs *geesefsMounter) Mount(target, volumeID string) error {
|
||||
fullPath := fmt.Sprintf("%s:%s", geesefs.meta.BucketName, geesefs.meta.Prefix)
|
||||
var args []string
|
||||
if geesefs.region != "" {
|
||||
|
@ -113,7 +105,7 @@ func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error {
|
|||
}
|
||||
conn, err := systemd.New()
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to connect to systemd dbus service: %v, starting geesefs directly\n", err)
|
||||
glog.Errorf("Failed to connect to systemd dbus service: %v, starting geesefs directly", err)
|
||||
return geesefs.MountDirect(target, args)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
@ -127,7 +119,7 @@ func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error {
|
|||
}
|
||||
args = append([]string{pluginDir+"/geesefs", "-f", "-o", "allow_other", "--endpoint", geesefs.endpoint}, args...)
|
||||
unitName := "geesefs-"+systemd.PathBusEscape(volumeID)+".service"
|
||||
props := []systemd.Property{
|
||||
newProps := []systemd.Property{
|
||||
systemd.Property{
|
||||
Name: "Description",
|
||||
Value: dbus.MakeVariant("GeeseFS mount for Kubernetes volume "+volumeID),
|
||||
|
@ -142,13 +134,35 @@ func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error {
|
|||
Value: dbus.MakeVariant("inactive-or-failed"),
|
||||
},
|
||||
}
|
||||
_, err = conn.StartTransientUnit(unitName, "replace", props, nil)
|
||||
if err != nil && strings.Contains(err.Error(), "already exists") {
|
||||
// Stop and garbage collect the unit if automatic collection didn't work for some reason
|
||||
conn.StopUnit(unitName, "replace", nil)
|
||||
conn.ResetFailedUnit(unitName)
|
||||
_, err = conn.StartTransientUnit(unitName, "replace", props, nil)
|
||||
unitProps, err := conn.GetAllProperties(unitName)
|
||||
if err == nil {
|
||||
// Unit already exists
|
||||
if s, ok := unitProps["ActiveState"].(string); ok && (s == "active" || s == "activating" || s == "reloading") {
|
||||
// Unit is already active
|
||||
curPath := ""
|
||||
prevExec, ok := unitProps["ExecStart"].([][]interface{})
|
||||
if ok && len(prevExec) > 0 && len(prevExec[0]) >= 2 {
|
||||
execArgs, ok := prevExec[0][1].([]string)
|
||||
if ok && len(execArgs) >= 2 {
|
||||
curPath = execArgs[len(execArgs)-1]
|
||||
}
|
||||
}
|
||||
if curPath != target {
|
||||
return fmt.Errorf(
|
||||
"GeeseFS for volume %v is already mounted on host, but"+
|
||||
" in a different directory. We want %v, but it's in %v",
|
||||
volumeID, target, curPath,
|
||||
)
|
||||
}
|
||||
// Already mounted at right location
|
||||
return nil
|
||||
} else {
|
||||
// Stop and garbage collect the unit if automatic collection didn't work for some reason
|
||||
conn.StopUnit(unitName, "replace", nil)
|
||||
conn.ResetFailedUnit(unitName)
|
||||
}
|
||||
}
|
||||
_, err = conn.StartTransientUnit(unitName, "replace", newProps, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error starting systemd unit %s on host: %v", unitName, err)
|
||||
}
|
||||
|
|
|
@ -11,18 +11,18 @@ import (
|
|||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/yandex-cloud/k8s-csi-s3/pkg/s3"
|
||||
systemd "github.com/coreos/go-systemd/v22/dbus"
|
||||
"github.com/golang/glog"
|
||||
"github.com/mitchellh/go-ps"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
|
||||
"github.com/yandex-cloud/k8s-csi-s3/pkg/s3"
|
||||
)
|
||||
|
||||
// Mounter interface which can be implemented
|
||||
// by the different mounter types
|
||||
type Mounter interface {
|
||||
Stage(stagePath string) error
|
||||
Unstage(stagePath string) error
|
||||
Mount(source, target, volumeID string) error
|
||||
Mount(target, volumeID string) error
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -70,12 +70,40 @@ func fuseMount(path string, command string, args []string) error {
|
|||
return waitForMount(path, 10*time.Second)
|
||||
}
|
||||
|
||||
func Unmount(path string) error {
|
||||
if err := mount.New("").Unmount(path); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func SystemdUnmount(volumeID string) (bool, error) {
|
||||
conn, err := systemd.New()
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to connect to systemd dbus service: %v", err)
|
||||
return false, err
|
||||
}
|
||||
defer conn.Close()
|
||||
unitName := "geesefs-"+systemd.PathBusEscape(volumeID)+".service"
|
||||
units, err := conn.ListUnitsByNames([]string{ unitName })
|
||||
glog.Errorf("Got %v", units)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to list systemd unit by name %v: %v", unitName, err)
|
||||
return false, err
|
||||
}
|
||||
if len(units) == 0 || units[0].ActiveState == "inactive" || units[0].ActiveState == "failed" {
|
||||
return true, nil
|
||||
}
|
||||
_, err = conn.StopUnit(unitName, "replace", nil)
|
||||
return true, err
|
||||
}
|
||||
|
||||
func FuseUnmount(path string) error {
|
||||
if err := mount.New("").Unmount(path); err != nil {
|
||||
return err
|
||||
}
|
||||
// as fuse quits immediately, we will try to wait until the process is done
|
||||
process, err := findFuseMountProcess(path)
|
||||
process, err := FindFuseMountProcess(path)
|
||||
if err != nil {
|
||||
glog.Errorf("Error getting PID of fuse mount: %s", err)
|
||||
return nil
|
||||
|
@ -107,7 +135,7 @@ func waitForMount(path string, timeout time.Duration) error {
|
|||
}
|
||||
}
|
||||
|
||||
func findFuseMountProcess(path string) (*os.Process, error) {
|
||||
func FindFuseMountProcess(path string) (*os.Process, error) {
|
||||
processes, err := ps.Processes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -31,15 +31,7 @@ func newRcloneMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (rclone *rcloneMounter) Stage(stageTarget string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rclone *rcloneMounter) Unstage(stageTarget string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rclone *rcloneMounter) Mount(source, target, volumeID string) error {
|
||||
func (rclone *rcloneMounter) Mount(target, volumeID string) error {
|
||||
args := []string{
|
||||
"mount",
|
||||
fmt.Sprintf(":s3:%s", path.Join(rclone.meta.BucketName, rclone.meta.Prefix)),
|
||||
|
|
|
@ -28,15 +28,7 @@ func newS3fsMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (s3fs *s3fsMounter) Stage(stageTarget string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s3fs *s3fsMounter) Unstage(stageTarget string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s3fs *s3fsMounter) Mount(source, target, volumeID string) error {
|
||||
func (s3fs *s3fsMounter) Mount(target, volumeID string) error {
|
||||
if err := writes3fsPass(s3fs.pwFileContent); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue