Implement support for running geesefs OUTSIDE of the container using systemd to not crash mountpoints when csi-s3 is upgraded or restarted
This commit is contained in:
parent
8ba5614465
commit
6b4f2d9ac4
9 changed files with 125 additions and 13 deletions
pkg/mounter
|
@ -3,6 +3,11 @@ package mounter
|
|||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
systemd "github.com/coreos/go-systemd/v22/dbus"
|
||||
dbus "github.com/godbus/dbus/v5"
|
||||
|
||||
"github.com/yandex-cloud/k8s-csi-s3/pkg/s3"
|
||||
)
|
||||
|
@ -38,19 +43,109 @@ func (geesefs *geesefsMounter) Unstage(stageTarget string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (geesefs *geesefsMounter) Mount(source string, target string) error {
|
||||
fullPath := fmt.Sprintf("%s:%s", geesefs.meta.BucketName, geesefs.meta.Prefix)
|
||||
args := []string{
|
||||
func (geesefs *geesefsMounter) CopyBinary(from, to string) error {
|
||||
st, err := os.Stat(from)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to stat %s: %v", from, err)
|
||||
}
|
||||
st2, err := os.Stat(to)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("Failed to stat %s: %v", to, err)
|
||||
}
|
||||
if err != nil || st2.Size() != st.Size() || st2.ModTime() != st.ModTime() {
|
||||
if err == nil {
|
||||
// remove the file first to not hit "text file busy" errors
|
||||
err = os.Remove(to)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error removing %s to update it: %v", to, err)
|
||||
}
|
||||
}
|
||||
bin, err := os.ReadFile(from)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error copying %s to %s: %v", from, to, err)
|
||||
}
|
||||
err = os.WriteFile(to, bin, 0755)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error copying %s to %s: %v", from, to, err)
|
||||
}
|
||||
err = os.Chtimes(to, st.ModTime(), st.ModTime())
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error copying %s to %s: %v", from, to, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (geesefs *geesefsMounter) MountDirect(target string, args []string) error {
|
||||
args = append([]string{
|
||||
"--endpoint", geesefs.endpoint,
|
||||
"-o", "allow_other",
|
||||
"--log-file", "/dev/stderr",
|
||||
}
|
||||
if geesefs.region != "" {
|
||||
args = append(args, "--region", geesefs.region)
|
||||
}
|
||||
args = append(args, geesefs.meta.MountOptions...)
|
||||
args = append(args, fullPath, target)
|
||||
}, args...)
|
||||
os.Setenv("AWS_ACCESS_KEY_ID", geesefs.accessKeyID)
|
||||
os.Setenv("AWS_SECRET_ACCESS_KEY", geesefs.secretAccessKey)
|
||||
return fuseMount(target, geesefsCmd, args)
|
||||
}
|
||||
|
||||
func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error {
|
||||
fullPath := fmt.Sprintf("%s:%s", geesefs.meta.BucketName, geesefs.meta.Prefix)
|
||||
var args []string
|
||||
if geesefs.region != "" {
|
||||
args = append(args, "--region", geesefs.region)
|
||||
}
|
||||
useSystemd := true
|
||||
for i := 0; i < len(geesefs.meta.MountOptions); i++ {
|
||||
if geesefs.meta.MountOptions[i] == "--no-systemd" {
|
||||
useSystemd = false
|
||||
} else {
|
||||
args = append(args, geesefs.meta.MountOptions[i])
|
||||
}
|
||||
}
|
||||
args = append(args, fullPath, target)
|
||||
// Try to start geesefs using systemd so it doesn't get killed when the container exits
|
||||
if !useSystemd {
|
||||
return geesefs.MountDirect(target, args)
|
||||
}
|
||||
conn, err := systemd.New()
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to connect to systemd dbus service: %v, starting geesefs directly\n", err)
|
||||
return geesefs.MountDirect(target, args)
|
||||
}
|
||||
defer conn.Close()
|
||||
// systemd is present
|
||||
if err = geesefs.CopyBinary("/usr/bin/geesefs", "/csi/geesefs"); err != nil {
|
||||
return err
|
||||
}
|
||||
pluginDir := os.Getenv("PLUGIN_DIR")
|
||||
if pluginDir == "" {
|
||||
pluginDir = "/var/lib/kubelet/plugins/ru.yandex.s3.csi"
|
||||
}
|
||||
args = append([]string{pluginDir+"/geesefs", "-f", "-o", "allow_other", "--endpoint", geesefs.endpoint}, args...)
|
||||
unitName := "geesefs-"+systemd.PathBusEscape(volumeID)+".service"
|
||||
props := []systemd.Property{
|
||||
systemd.Property{
|
||||
Name: "Description",
|
||||
Value: dbus.MakeVariant("GeeseFS mount for Kubernetes volume "+volumeID),
|
||||
},
|
||||
systemd.PropExecStart(args, false),
|
||||
systemd.Property{
|
||||
Name: "Environment",
|
||||
Value: dbus.MakeVariant([]string{ "AWS_ACCESS_KEY_ID="+geesefs.accessKeyID, "AWS_SECRET_ACCESS_KEY="+geesefs.secretAccessKey }),
|
||||
},
|
||||
systemd.Property{
|
||||
Name: "CollectMode",
|
||||
Value: dbus.MakeVariant("inactive-or-failed"),
|
||||
},
|
||||
}
|
||||
_, err = conn.StartTransientUnit(unitName, "replace", props, nil)
|
||||
if err != nil && strings.Contains(err.Error(), "already exists") {
|
||||
// Stop and garbage collect the unit if automatic collection didn't work for some reason
|
||||
conn.StopUnit(unitName, "replace", nil)
|
||||
conn.ResetFailedUnit(unitName)
|
||||
_, err = conn.StartTransientUnit(unitName, "replace", props, nil)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error starting systemd unit %s on host: %v", unitName, err)
|
||||
}
|
||||
return waitForMount(target, 10*time.Second)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue