Add rclone mounter (#15)
This commit is contained in:
parent
577f229ef7
commit
59fd15b628
9 changed files with 145 additions and 7 deletions
|
@ -3,6 +3,7 @@ package s3
|
|||
import (
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
|
@ -22,6 +23,7 @@ const (
|
|||
goofysMounterType = "goofys"
|
||||
s3qlMounterType = "s3ql"
|
||||
s3backerMounterType = "s3backer"
|
||||
rcloneMounterType = "rclone"
|
||||
mounterTypeKey = "mounter"
|
||||
)
|
||||
|
||||
|
@ -45,6 +47,9 @@ func newMounter(bucket *bucket, cfg *Config) (Mounter, error) {
|
|||
case s3backerMounterType:
|
||||
return newS3backerMounter(bucket, cfg)
|
||||
|
||||
case rcloneMounterType:
|
||||
return newRcloneMounter(bucket, cfg)
|
||||
|
||||
default:
|
||||
// default to s3backer
|
||||
return newS3backerMounter(bucket, cfg)
|
||||
|
@ -59,7 +64,7 @@ func fuseMount(path string, command string, args []string) error {
|
|||
return fmt.Errorf("Error fuseMount command: %s\nargs: %s\noutput: %s", command, args, out)
|
||||
}
|
||||
|
||||
return nil
|
||||
return waitForMount(path, 10*time.Second)
|
||||
}
|
||||
|
||||
func fuseUnmount(path string, command string) error {
|
||||
|
|
60
pkg/s3/mounter_rclone.go
Normal file
60
pkg/s3/mounter_rclone.go
Normal file
|
@ -0,0 +1,60 @@
|
|||
package s3
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
// Implements Mounter
|
||||
type rcloneMounter struct {
|
||||
bucket *bucket
|
||||
url string
|
||||
region string
|
||||
accessKeyID string
|
||||
secretAccessKey string
|
||||
}
|
||||
|
||||
const (
|
||||
rcloneCmd = "rclone"
|
||||
)
|
||||
|
||||
func newRcloneMounter(b *bucket, cfg *Config) (Mounter, error) {
|
||||
return &rcloneMounter{
|
||||
bucket: b,
|
||||
url: cfg.Endpoint,
|
||||
region: cfg.Region,
|
||||
accessKeyID: cfg.AccessKeyID,
|
||||
secretAccessKey: cfg.SecretAccessKey,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (rclone *rcloneMounter) Stage(stageTarget string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rclone *rcloneMounter) Unstage(stageTarget string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rclone *rcloneMounter) Mount(source string, target string) error {
|
||||
args := []string{
|
||||
"mount",
|
||||
fmt.Sprintf(":s3:%s/%s", rclone.bucket.Name, rclone.bucket.FSPath),
|
||||
fmt.Sprintf("%s", target),
|
||||
"--daemon",
|
||||
"--s3-provider=AWS",
|
||||
"--s3-env-auth=true",
|
||||
fmt.Sprintf("--s3-region=%s", rclone.region),
|
||||
fmt.Sprintf("--s3-endpoint=%s", rclone.url),
|
||||
"--allow-other",
|
||||
// TODO: make this configurable
|
||||
"--vfs-cache-mode=writes",
|
||||
}
|
||||
os.Setenv("AWS_ACCESS_KEY_ID", rclone.accessKeyID)
|
||||
os.Setenv("AWS_SECRET_ACCESS_KEY", rclone.secretAccessKey)
|
||||
return fuseMount(target, rcloneCmd, args)
|
||||
}
|
||||
|
||||
func (rclone *rcloneMounter) Unmount(target string) error {
|
||||
return fuseUnmount(target, rcloneCmd)
|
||||
}
|
|
@ -139,4 +139,32 @@ var _ = Describe("S3Driver", func() {
|
|||
})
|
||||
})
|
||||
|
||||
Context("rclone", func() {
|
||||
socket := "/tmp/csi-rclone.sock"
|
||||
csiEndpoint := "unix://" + socket
|
||||
|
||||
cfg := &s3.Config{
|
||||
AccessKeyID: "FJDSJ",
|
||||
SecretAccessKey: "DSG643HGDS",
|
||||
Endpoint: "http://127.0.0.1:9000",
|
||||
Mounter: "rclone",
|
||||
}
|
||||
if err := os.Remove(socket); err != nil && !os.IsNotExist(err) {
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
driver, err := s3.NewS3("test-node", csiEndpoint, cfg)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
go driver.Run()
|
||||
|
||||
Describe("CSI sanity", func() {
|
||||
sanityCfg := &sanity.Config{
|
||||
TargetPath: mntDir,
|
||||
StagingPath: stagingDir,
|
||||
Address: csiEndpoint,
|
||||
}
|
||||
sanity.GinkgoTest(sanityCfg)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package s3
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
@ -10,6 +11,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/mitchellh/go-ps"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
@ -39,6 +41,25 @@ func waitForProcess(p *os.Process, backoff int) error {
|
|||
return waitForProcess(p, backoff+1)
|
||||
}
|
||||
|
||||
func waitForMount(path string, timeout time.Duration) error {
|
||||
var elapsed time.Duration
|
||||
var interval = 10 * time.Millisecond
|
||||
for {
|
||||
notMount, err := mount.New("").IsNotMountPoint(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !notMount {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(interval)
|
||||
elapsed = elapsed + interval
|
||||
if elapsed >= timeout {
|
||||
return errors.New("Timeout waiting for mount")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func findFuseMountProcess(path string, name string) (*os.Process, error) {
|
||||
processes, err := ps.Processes()
|
||||
if err != nil {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue