Refactor all mounters to use the mounter interface
This commit is contained in:
parent
093c5bf500
commit
9d5d84ebfb
8 changed files with 204 additions and 157 deletions
|
@ -55,20 +55,21 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if !exists {
|
if !exists {
|
||||||
if err := cs.s3.client.createBucket(volumeID); err != nil {
|
if err = cs.s3.client.createBucket(volumeID); err != nil {
|
||||||
glog.V(3).Infof("failed to create volume: %v", err)
|
glog.V(3).Infof("failed to create volume: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
mounter := cs.s3.cfg.Mounter
|
mounterType := cs.s3.cfg.Mounter
|
||||||
if mounter == "" {
|
if mounterType == "" {
|
||||||
mounter = req.GetParameters()[mounterKey]
|
mounterType = req.GetParameters()[mounterKey]
|
||||||
}
|
}
|
||||||
switch mounter {
|
mounter, err := newMounter(mounterType, volumeID, cs.s3.cfg)
|
||||||
case s3qlMounter:
|
if err != nil {
|
||||||
if err := s3qlCreate(volumeID, cs.s3.cfg); err != nil {
|
return nil, err
|
||||||
return nil, err
|
}
|
||||||
}
|
if err := mounter.Format(); err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("create volume %s", volumeID)
|
glog.V(4).Infof("create volume %s", volumeID)
|
||||||
|
|
|
@ -1,37 +0,0 @@
|
||||||
package s3
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"context"
|
|
||||||
|
|
||||||
goofys "github.com/kahing/goofys/api"
|
|
||||||
)
|
|
||||||
|
|
||||||
const defaultRegion = "us-east-1"
|
|
||||||
|
|
||||||
func goofysMount(bucket string, cfg *Config, targetPath string) error {
|
|
||||||
goofysCfg := &goofys.Config{
|
|
||||||
MountPoint: targetPath,
|
|
||||||
Endpoint: cfg.Endpoint,
|
|
||||||
Region: cfg.Region,
|
|
||||||
DirMode: 0755,
|
|
||||||
FileMode: 0644,
|
|
||||||
MountOptions: map[string]string{
|
|
||||||
"allow_other": "",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if cfg.Endpoint != "" {
|
|
||||||
cfg.Region = defaultRegion
|
|
||||||
}
|
|
||||||
os.Setenv("AWS_ACCESS_KEY_ID", cfg.AccessKeyID)
|
|
||||||
os.Setenv("AWS_SECRET_ACCESS_KEY", cfg.SecretAccessKey)
|
|
||||||
|
|
||||||
_, _, err := goofys.Mount(context.Background(), bucket, goofysCfg)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Error mounting via goofys: %s", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
34
pkg/s3/mounter.go
Normal file
34
pkg/s3/mounter.go
Normal file
|
@ -0,0 +1,34 @@
|
||||||
|
package s3
|
||||||
|
|
||||||
|
import "fmt"
|
||||||
|
|
||||||
|
// Mounter interface which can be implemented
|
||||||
|
// by the different mounter types
|
||||||
|
type Mounter interface {
|
||||||
|
Format() error
|
||||||
|
Mount(targetPath string) error
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
mounterKey = "mounter"
|
||||||
|
s3fsMounterType = "s3fs"
|
||||||
|
goofysMounterType = "goofys"
|
||||||
|
s3qlMounterType = "s3ql"
|
||||||
|
)
|
||||||
|
|
||||||
|
// newMounter returns a new mounter depending on the mounterType parameter
|
||||||
|
func newMounter(mounterType string, bucket string, cfg *Config) (Mounter, error) {
|
||||||
|
switch mounterType {
|
||||||
|
case "":
|
||||||
|
case s3fsMounterType:
|
||||||
|
return newS3fsMounter(bucket, cfg)
|
||||||
|
|
||||||
|
case goofysMounterType:
|
||||||
|
return newGoofysMounter(bucket, cfg)
|
||||||
|
|
||||||
|
case s3qlMounterType:
|
||||||
|
return newS3qlMounter(bucket, cfg)
|
||||||
|
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("Error mounting bucket %s, invalid mounter specified: %s", bucket, mounterType)
|
||||||
|
}
|
63
pkg/s3/mounter_goofys.go
Normal file
63
pkg/s3/mounter_goofys.go
Normal file
|
@ -0,0 +1,63 @@
|
||||||
|
package s3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"context"
|
||||||
|
|
||||||
|
goofysApi "github.com/kahing/goofys/api"
|
||||||
|
)
|
||||||
|
|
||||||
|
const defaultRegion = "us-east-1"
|
||||||
|
|
||||||
|
// Implements Mounter
|
||||||
|
type goofysMounter struct {
|
||||||
|
bucket string
|
||||||
|
endpoint string
|
||||||
|
region string
|
||||||
|
accessKeyID string
|
||||||
|
secretAccessKey string
|
||||||
|
}
|
||||||
|
|
||||||
|
func newGoofysMounter(bucket string, cfg *Config) (Mounter, error) {
|
||||||
|
region := cfg.Region
|
||||||
|
// if endpoint is set we need a default region
|
||||||
|
if region == "" && cfg.Endpoint != "" {
|
||||||
|
region = defaultRegion
|
||||||
|
}
|
||||||
|
return &goofysMounter{
|
||||||
|
bucket: bucket,
|
||||||
|
endpoint: cfg.Endpoint,
|
||||||
|
region: region,
|
||||||
|
accessKeyID: cfg.AccessKeyID,
|
||||||
|
secretAccessKey: cfg.SecretAccessKey,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (goofys *goofysMounter) Format() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (goofys *goofysMounter) Mount(targetPath string) error {
|
||||||
|
goofysCfg := &goofysApi.Config{
|
||||||
|
MountPoint: targetPath,
|
||||||
|
Endpoint: goofys.endpoint,
|
||||||
|
Region: goofys.region,
|
||||||
|
DirMode: 0755,
|
||||||
|
FileMode: 0644,
|
||||||
|
MountOptions: map[string]string{
|
||||||
|
"allow_other": "",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
os.Setenv("AWS_ACCESS_KEY_ID", goofys.accessKeyID)
|
||||||
|
os.Setenv("AWS_SECRET_ACCESS_KEY", goofys.secretAccessKey)
|
||||||
|
|
||||||
|
_, _, err := goofysApi.Mount(context.Background(), goofys.bucket, goofysCfg)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Error mounting via goofys: %s", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
64
pkg/s3/mounter_s3fs.go
Normal file
64
pkg/s3/mounter_s3fs.go
Normal file
|
@ -0,0 +1,64 @@
|
||||||
|
package s3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Implements Mounter
|
||||||
|
type s3fsMounter struct {
|
||||||
|
bucket string
|
||||||
|
url string
|
||||||
|
region string
|
||||||
|
pwFileContent string
|
||||||
|
}
|
||||||
|
|
||||||
|
func newS3fsMounter(bucket string, cfg *Config) (Mounter, error) {
|
||||||
|
return &s3fsMounter{
|
||||||
|
bucket: bucket,
|
||||||
|
url: cfg.Endpoint,
|
||||||
|
region: cfg.Region,
|
||||||
|
pwFileContent: cfg.AccessKeyID + ":" + cfg.SecretAccessKey,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s3fs *s3fsMounter) Format() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s3fs *s3fsMounter) Mount(targetPath string) error {
|
||||||
|
if err := writes3fsPass(s3fs.pwFileContent); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
args := []string{
|
||||||
|
fmt.Sprintf("%s", s3fs.bucket),
|
||||||
|
fmt.Sprintf("%s", targetPath),
|
||||||
|
"-o", "sigv2",
|
||||||
|
"-o", "use_path_request_style",
|
||||||
|
"-o", fmt.Sprintf("url=%s", s3fs.url),
|
||||||
|
"-o", fmt.Sprintf("endpoint=%s", s3fs.region),
|
||||||
|
"-o", "allow_other",
|
||||||
|
"-o", "mp_umask=000",
|
||||||
|
}
|
||||||
|
cmd := exec.Command("s3fs", args...)
|
||||||
|
out, err := cmd.CombinedOutput()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Error mounting using s3fs, output: %s", out)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func writes3fsPass(pwFileContent string) error {
|
||||||
|
pwFileName := fmt.Sprintf("%s/.passwd-s3fs", os.Getenv("HOME"))
|
||||||
|
pwFile, err := os.OpenFile(pwFileName, os.O_RDWR|os.O_CREATE, 0600)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = pwFile.WriteString(pwFileContent)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
pwFile.Close()
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -13,7 +13,8 @@ import (
|
||||||
"gopkg.in/ini.v1"
|
"gopkg.in/ini.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
type s3fsConfig struct {
|
// Implements Mounter
|
||||||
|
type s3qlMounter struct {
|
||||||
url string
|
url string
|
||||||
bucketURL string
|
bucketURL string
|
||||||
login string
|
login string
|
||||||
|
@ -29,7 +30,7 @@ const (
|
||||||
s3qlCmdMount = "mount.s3ql"
|
s3qlCmdMount = "mount.s3ql"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newS3ql(bucket string, targetPath string, cfg *Config) (*s3fsConfig, error) {
|
func newS3qlMounter(bucket string, cfg *Config) (Mounter, error) {
|
||||||
url, err := url.Parse(cfg.Endpoint)
|
url, err := url.Parse(cfg.Endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -38,13 +39,12 @@ func newS3ql(bucket string, targetPath string, cfg *Config) (*s3fsConfig, error)
|
||||||
if strings.Contains(url.Scheme, "http") {
|
if strings.Contains(url.Scheme, "http") {
|
||||||
url.Scheme = "s3c"
|
url.Scheme = "s3c"
|
||||||
}
|
}
|
||||||
s3ql := &s3fsConfig{
|
s3ql := &s3qlMounter{
|
||||||
url: url.String(),
|
url: url.String(),
|
||||||
login: cfg.AccessKeyID,
|
login: cfg.AccessKeyID,
|
||||||
password: cfg.SecretAccessKey,
|
password: cfg.SecretAccessKey,
|
||||||
passphrase: cfg.EncryptionKey,
|
passphrase: cfg.EncryptionKey,
|
||||||
ssl: ssl,
|
ssl: ssl,
|
||||||
targetPath: targetPath,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
url.Path = path.Join(url.Path, bucket)
|
url.Path = path.Join(url.Path, bucket)
|
||||||
|
@ -57,21 +57,25 @@ func newS3ql(bucket string, targetPath string, cfg *Config) (*s3fsConfig, error)
|
||||||
return s3ql, s3ql.writeConfig()
|
return s3ql, s3ql.writeConfig()
|
||||||
}
|
}
|
||||||
|
|
||||||
func s3qlCreate(bucket string, cfg *Config) error {
|
func (s3ql *s3qlMounter) Format() error {
|
||||||
s3ql, err := newS3ql(bucket, "unknown", cfg)
|
// force creation to ignore existing data
|
||||||
if err != nil {
|
args := []string{
|
||||||
return err
|
s3ql.bucketURL,
|
||||||
|
"--force",
|
||||||
}
|
}
|
||||||
return s3ql.create()
|
|
||||||
|
p := fmt.Sprintf("%s\n%s\n", s3ql.passphrase, s3ql.passphrase)
|
||||||
|
reader := bytes.NewReader([]byte(p))
|
||||||
|
return s3qlCmd(s3qlCmdMkfs, append(args, s3ql.options...), reader)
|
||||||
}
|
}
|
||||||
|
|
||||||
func s3qlMount(bucket string, cfg *Config, targetPath string) error {
|
func (s3ql *s3qlMounter) Mount(targetPath string) error {
|
||||||
s3ql, err := newS3ql(bucket, targetPath, cfg)
|
args := []string{
|
||||||
if err != nil {
|
s3ql.bucketURL,
|
||||||
return err
|
targetPath,
|
||||||
|
"--allow-other",
|
||||||
}
|
}
|
||||||
|
return s3qlCmd(s3qlCmdMount, append(args, s3ql.options...), nil)
|
||||||
return s3ql.mount()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func s3qlCmd(s3qlCmd string, args []string, stdin io.Reader) error {
|
func s3qlCmd(s3qlCmd string, args []string, stdin io.Reader) error {
|
||||||
|
@ -87,38 +91,17 @@ func s3qlCmd(s3qlCmd string, args []string, stdin io.Reader) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cfg *s3fsConfig) create() error {
|
func (s3ql *s3qlMounter) writeConfig() error {
|
||||||
// force creation to ignore existing data
|
|
||||||
args := []string{
|
|
||||||
cfg.bucketURL,
|
|
||||||
"--force",
|
|
||||||
}
|
|
||||||
|
|
||||||
p := fmt.Sprintf("%s\n%s\n", cfg.passphrase, cfg.passphrase)
|
|
||||||
reader := bytes.NewReader([]byte(p))
|
|
||||||
return s3qlCmd(s3qlCmdMkfs, append(args, cfg.options...), reader)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfg *s3fsConfig) mount() error {
|
|
||||||
args := []string{
|
|
||||||
cfg.bucketURL,
|
|
||||||
cfg.targetPath,
|
|
||||||
"--allow-other",
|
|
||||||
}
|
|
||||||
return s3qlCmd(s3qlCmdMount, append(args, cfg.options...), nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfg *s3fsConfig) writeConfig() error {
|
|
||||||
s3qlIni := ini.Empty()
|
s3qlIni := ini.Empty()
|
||||||
section, err := s3qlIni.NewSection("s3ql")
|
section, err := s3qlIni.NewSection("s3ql")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
section.NewKey("storage-url", cfg.url)
|
section.NewKey("storage-url", s3ql.url)
|
||||||
section.NewKey("backend-login", cfg.login)
|
section.NewKey("backend-login", s3ql.login)
|
||||||
section.NewKey("backend-password", cfg.password)
|
section.NewKey("backend-password", s3ql.password)
|
||||||
section.NewKey("fs-passphrase", cfg.passphrase)
|
section.NewKey("fs-passphrase", s3ql.passphrase)
|
||||||
|
|
||||||
authDir := os.Getenv("HOME") + "/.s3ql"
|
authDir := os.Getenv("HOME") + "/.s3ql"
|
||||||
authFile := authDir + "/authinfo2"
|
authFile := authDir + "/authinfo2"
|
|
@ -17,7 +17,6 @@ limitations under the License.
|
||||||
package s3
|
package s3
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
@ -31,13 +30,6 @@ import (
|
||||||
"github.com/kubernetes-csi/drivers/pkg/csi-common"
|
"github.com/kubernetes-csi/drivers/pkg/csi-common"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
mounterKey = "mounter"
|
|
||||||
s3fsMounter = "s3fs"
|
|
||||||
goofysMounter = "goofys"
|
|
||||||
s3qlMounter = "s3ql"
|
|
||||||
)
|
|
||||||
|
|
||||||
type nodeServer struct {
|
type nodeServer struct {
|
||||||
*csicommon.DefaultNodeServer
|
*csicommon.DefaultNodeServer
|
||||||
*s3
|
*s3
|
||||||
|
@ -86,26 +78,16 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
||||||
glog.V(4).Infof("target %v\ndevice %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n",
|
glog.V(4).Infof("target %v\ndevice %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n",
|
||||||
targetPath, deviceID, readOnly, volumeID, attrib, mountFlags)
|
targetPath, deviceID, readOnly, volumeID, attrib, mountFlags)
|
||||||
|
|
||||||
mounter := ns.s3.cfg.Mounter
|
mounterType := ns.s3.cfg.Mounter
|
||||||
if mounter == "" {
|
if mounterType == "" {
|
||||||
mounter = attrib[mounterKey]
|
mounterType = attrib[mounterKey]
|
||||||
}
|
}
|
||||||
switch mounter {
|
mounter, err := newMounter(mounterType, volumeID, ns.s3.cfg)
|
||||||
case "":
|
if err != nil {
|
||||||
case s3fsMounter:
|
return nil, err
|
||||||
if err := s3fsMount(volumeID, ns.s3.cfg, targetPath); err != nil {
|
}
|
||||||
return nil, err
|
if err := mounter.Mount(targetPath); err != nil {
|
||||||
}
|
return nil, err
|
||||||
case goofysMounter:
|
|
||||||
if err := goofysMount(volumeID, ns.s3.cfg, targetPath); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
case s3qlMounter:
|
|
||||||
if err := s3qlMount(volumeID, ns.s3.cfg, targetPath); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("Error mounting bucket %s, invalid mounter specified: %s", volumeID, ns.s3.cfg.Mounter)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("s3: bucket %s successfuly mounted to %s", volumeID, targetPath)
|
glog.V(4).Infof("s3: bucket %s successfuly mounted to %s", volumeID, targetPath)
|
||||||
|
|
|
@ -1,43 +0,0 @@
|
||||||
package s3
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"os/exec"
|
|
||||||
)
|
|
||||||
|
|
||||||
func s3fsMount(bucket string, cfg *Config, targetPath string) error {
|
|
||||||
if err := writes3fsPass(cfg); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
args := []string{
|
|
||||||
fmt.Sprintf("%s", bucket),
|
|
||||||
fmt.Sprintf("%s", targetPath),
|
|
||||||
"-o", "sigv2",
|
|
||||||
"-o", "use_path_request_style",
|
|
||||||
"-o", fmt.Sprintf("url=%s", cfg.Endpoint),
|
|
||||||
"-o", fmt.Sprintf("endpoint=%s", cfg.Region),
|
|
||||||
"-o", "allow_other",
|
|
||||||
"-o", "mp_umask=000",
|
|
||||||
}
|
|
||||||
cmd := exec.Command("s3fs", args...)
|
|
||||||
out, err := cmd.CombinedOutput()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Error mounting using s3fs, output: %s", out)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func writes3fsPass(cfg *Config) error {
|
|
||||||
pwFileName := fmt.Sprintf("%s/.passwd-s3fs", os.Getenv("HOME"))
|
|
||||||
pwFile, err := os.OpenFile(pwFileName, os.O_RDWR|os.O_CREATE, 0600)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
_, err = pwFile.WriteString(cfg.AccessKeyID + ":" + cfg.SecretAccessKey)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
pwFile.Close()
|
|
||||||
return nil
|
|
||||||
}
|
|
Loading…
Reference in a new issue