package mounter import ( "fmt" "os" "strings" "time" systemd "github.com/coreos/go-systemd/v22/dbus" dbus "github.com/godbus/dbus/v5" "github.com/yandex-cloud/k8s-csi-s3/pkg/s3" ) const ( geesefsCmd = "geesefs" ) // Implements Mounter type geesefsMounter struct { meta *s3.FSMeta endpoint string region string accessKeyID string secretAccessKey string } func newGeeseFSMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) { return &geesefsMounter{ meta: meta, endpoint: cfg.Endpoint, region: cfg.Region, accessKeyID: cfg.AccessKeyID, secretAccessKey: cfg.SecretAccessKey, }, nil } func (geesefs *geesefsMounter) Stage(stageTarget string) error { return nil } func (geesefs *geesefsMounter) Unstage(stageTarget string) error { return nil } func (geesefs *geesefsMounter) CopyBinary(from, to string) error { st, err := os.Stat(from) if err != nil { return fmt.Errorf("Failed to stat %s: %v", from, err) } st2, err := os.Stat(to) if err != nil && !os.IsNotExist(err) { return fmt.Errorf("Failed to stat %s: %v", to, err) } if err != nil || st2.Size() != st.Size() || st2.ModTime() != st.ModTime() { if err == nil { // remove the file first to not hit "text file busy" errors err = os.Remove(to) if err != nil { return fmt.Errorf("Error removing %s to update it: %v", to, err) } } bin, err := os.ReadFile(from) if err != nil { return fmt.Errorf("Error copying %s to %s: %v", from, to, err) } err = os.WriteFile(to, bin, 0755) if err != nil { return fmt.Errorf("Error copying %s to %s: %v", from, to, err) } err = os.Chtimes(to, st.ModTime(), st.ModTime()) if err != nil { return fmt.Errorf("Error copying %s to %s: %v", from, to, err) } } return nil } func (geesefs *geesefsMounter) MountDirect(target string, args []string) error { args = append([]string{ "--endpoint", geesefs.endpoint, "-o", "allow_other", "--log-file", "/dev/stderr", }, args...) os.Setenv("AWS_ACCESS_KEY_ID", geesefs.accessKeyID) os.Setenv("AWS_SECRET_ACCESS_KEY", geesefs.secretAccessKey) return fuseMount(target, geesefsCmd, args) } func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error { fullPath := fmt.Sprintf("%s:%s", geesefs.meta.BucketName, geesefs.meta.Prefix) var args []string if geesefs.region != "" { args = append(args, "--region", geesefs.region) } useSystemd := true for i := 0; i < len(geesefs.meta.MountOptions); i++ { if geesefs.meta.MountOptions[i] == "--no-systemd" { useSystemd = false } else { args = append(args, geesefs.meta.MountOptions[i]) } } args = append(args, fullPath, target) // Try to start geesefs using systemd so it doesn't get killed when the container exits if !useSystemd { return geesefs.MountDirect(target, args) } conn, err := systemd.New() if err != nil { fmt.Printf("Failed to connect to systemd dbus service: %v, starting geesefs directly\n", err) return geesefs.MountDirect(target, args) } defer conn.Close() // systemd is present if err = geesefs.CopyBinary("/usr/bin/geesefs", "/csi/geesefs"); err != nil { return err } pluginDir := os.Getenv("PLUGIN_DIR") if pluginDir == "" { pluginDir = "/var/lib/kubelet/plugins/ru.yandex.s3.csi" } args = append([]string{pluginDir+"/geesefs", "-f", "-o", "allow_other", "--endpoint", geesefs.endpoint}, args...) unitName := "geesefs-"+systemd.PathBusEscape(volumeID)+".service" props := []systemd.Property{ systemd.Property{ Name: "Description", Value: dbus.MakeVariant("GeeseFS mount for Kubernetes volume "+volumeID), }, systemd.PropExecStart(args, false), systemd.Property{ Name: "Environment", Value: dbus.MakeVariant([]string{ "AWS_ACCESS_KEY_ID="+geesefs.accessKeyID, "AWS_SECRET_ACCESS_KEY="+geesefs.secretAccessKey }), }, systemd.Property{ Name: "CollectMode", Value: dbus.MakeVariant("inactive-or-failed"), }, } _, err = conn.StartTransientUnit(unitName, "replace", props, nil) if err != nil && strings.Contains(err.Error(), "already exists") { // Stop and garbage collect the unit if automatic collection didn't work for some reason conn.StopUnit(unitName, "replace", nil) conn.ResetFailedUnit(unitName) _, err = conn.StartTransientUnit(unitName, "replace", props, nil) } if err != nil { return fmt.Errorf("Error starting systemd unit %s on host: %v", unitName, err) } return waitForMount(target, 10*time.Second) }