Add experimental s3ql mounter

This commit is contained in:
Cyrill Troxler 2018-07-16 22:27:45 +02:00
parent a165937559
commit 13eba47da6
17 changed files with 247 additions and 43 deletions

View file

@ -6,4 +6,6 @@ type Config struct {
SecretAccessKey string
Region string
Endpoint string
Mounter string
EncryptionKey string
}

View file

@ -60,6 +60,16 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, err
}
}
mounter := cs.s3.cfg.Mounter
if mounter == "" {
mounter = req.GetParameters()[mounterKey]
}
switch mounter {
case s3qlMounter:
if err := s3qlCreate(volumeID, cs.s3.cfg); err != nil {
return nil, err
}
}
glog.V(4).Infof("create volume %s", volumeID)
s3Vol := s3Volume{}
@ -123,6 +133,10 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
return nil, status.Error(codes.NotFound, fmt.Sprintf("Volume with id %s does not exist", req.GetVolumeId()))
}
// We currently support all capabilities
return &csi.ValidateVolumeCapabilitiesResponse{Supported: true}, nil
for _, cap := range req.VolumeCapabilities {
if cap.GetAccessMode().GetMode() != csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER {
return &csi.ValidateVolumeCapabilitiesResponse{Supported: false, Message: ""}, nil
}
}
return &csi.ValidateVolumeCapabilitiesResponse{Supported: true, Message: ""}, nil
}

View file

@ -35,6 +35,7 @@ const (
mounterKey = "mounter"
s3fsMounter = "s3fs"
goofysMounter = "goofys"
s3qlMounter = "s3ql"
)
type nodeServer struct {
@ -85,17 +86,26 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
glog.V(4).Infof("target %v\ndevice %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n",
targetPath, deviceID, readOnly, volumeID, attrib, mountFlags)
mounter, exists := attrib[mounterKey]
if !exists || mounter == s3fsMounter {
mounter := ns.s3.cfg.Mounter
if mounter == "" {
mounter = attrib[mounterKey]
}
switch mounter {
case "":
case s3fsMounter:
if err := s3fsMount(volumeID, ns.s3.cfg, targetPath); err != nil {
return nil, err
}
} else if mounter == goofysMounter {
case goofysMounter:
if err := goofysMount(volumeID, ns.s3.cfg, targetPath); err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("Error mounting bucket %s, invalid mounter specified: %s", volumeID, mounter)
case s3qlMounter:
if err := s3qlMount(volumeID, ns.s3.cfg, targetPath); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("Error mounting bucket %s, invalid mounter specified: %s", volumeID, ns.s3.cfg.Mounter)
}
glog.V(4).Infof("s3: bucket %s successfuly mounted to %s", volumeID, targetPath)
@ -121,29 +131,16 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return &csi.NodeUnpublishVolumeResponse{}, nil
}
func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
// Check arguments
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if len(req.GetStagingTargetPath()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
return &csi.NodeStageVolumeResponse{}, nil
func (ns *nodeServer) NodeStageVolume(
ctx context.Context,
req *csi.NodeStageVolumeRequest) (
*csi.NodeStageVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
// Check arguments
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if len(req.GetStagingTargetPath()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
return &csi.NodeUnstageVolumeResponse{}, nil
func (ns *nodeServer) NodeUnstageVolume(
ctx context.Context,
req *csi.NodeUnstageVolumeRequest) (
*csi.NodeUnstageVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

View file

@ -17,10 +17,6 @@ type s3Client struct {
minio *minio.Client
}
type bucketMetadata struct {
CapacityBytes int64
}
func newS3Client(cfg *Config) (*s3Client, error) {
var client = &s3Client{}

View file

@ -36,6 +36,7 @@ func TestDriver(t *testing.T) {
AccessKeyID: "FJDSJ",
SecretAccessKey: "DSG643HGDS",
Endpoint: "http://127.0.0.1:9000",
EncryptionKey: "IskEwCuEg6drywi",
}
driver, err := NewS3("test-node", endpoint, cfg)
if err != nil {

129
pkg/s3/s3ql.go Normal file
View file

@ -0,0 +1,129 @@
package s3
import (
"bytes"
"fmt"
"io"
"net/url"
"os"
"os/exec"
"path"
"strings"
"gopkg.in/ini.v1"
)
type s3fsConfig struct {
url string
bucketURL string
login string
password string
passphrase string
options []string
ssl bool
targetPath string
}
const (
s3qlCmdMkfs = "mkfs.s3ql"
s3qlCmdMount = "mount.s3ql"
)
func newS3ql(bucket string, targetPath string, cfg *Config) (*s3fsConfig, error) {
url, err := url.Parse(cfg.Endpoint)
if err != nil {
return nil, err
}
ssl := url.Scheme != "http"
if strings.Contains(url.Scheme, "http") {
url.Scheme = "s3c"
}
s3ql := &s3fsConfig{
url: url.String(),
login: cfg.AccessKeyID,
password: cfg.SecretAccessKey,
passphrase: cfg.EncryptionKey,
ssl: ssl,
targetPath: targetPath,
}
url.Path = path.Join(url.Path, bucket)
s3ql.bucketURL = url.String()
if !ssl {
s3ql.options = []string{"--backend-options", "no-ssl"}
}
return s3ql, s3ql.writeConfig()
}
func s3qlCreate(bucket string, cfg *Config) error {
s3ql, err := newS3ql(bucket, "unknown", cfg)
if err != nil {
return err
}
return s3ql.create()
}
func s3qlMount(bucket string, cfg *Config, targetPath string) error {
s3ql, err := newS3ql(bucket, targetPath, cfg)
if err != nil {
return err
}
return s3ql.mount()
}
func s3qlCmd(s3qlCmd string, args []string, stdin io.Reader) error {
cmd := exec.Command(s3qlCmd, args...)
if stdin != nil {
cmd.Stdin = stdin
}
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("Error running s3ql command: %s", out)
}
return nil
}
func (cfg *s3fsConfig) create() error {
// force creation to ignore existing data
args := []string{
cfg.bucketURL,
"--force",
}
p := fmt.Sprintf("%s\n%s\n", cfg.passphrase, cfg.passphrase)
reader := bytes.NewReader([]byte(p))
return s3qlCmd(s3qlCmdMkfs, append(args, cfg.options...), reader)
}
func (cfg *s3fsConfig) mount() error {
args := []string{
cfg.bucketURL,
cfg.targetPath,
"--allow-other",
}
return s3qlCmd(s3qlCmdMount, append(args, cfg.options...), nil)
}
func (cfg *s3fsConfig) writeConfig() error {
s3qlIni := ini.Empty()
section, err := s3qlIni.NewSection("s3ql")
if err != nil {
return err
}
section.NewKey("storage-url", cfg.url)
section.NewKey("backend-login", cfg.login)
section.NewKey("backend-password", cfg.password)
section.NewKey("fs-passphrase", cfg.passphrase)
authDir := os.Getenv("HOME") + "/.s3ql"
authFile := authDir + "/authinfo2"
os.Mkdir(authDir, 0700)
s3qlIni.SaveTo(authFile)
os.Chmod(authFile, 0600)
return nil
}