commit
66390f76b7
19 changed files with 638 additions and 147 deletions
8
Gopkg.lock
generated
8
Gopkg.lock
generated
|
@ -158,6 +158,12 @@
|
||||||
packages = ["."]
|
packages = ["."]
|
||||||
revision = "3864e76763d94a6df2f9960b16a20a33da9f9a66"
|
revision = "3864e76763d94a6df2f9960b16a20a33da9f9a66"
|
||||||
|
|
||||||
|
[[projects]]
|
||||||
|
branch = "master"
|
||||||
|
name = "github.com/mitchellh/go-ps"
|
||||||
|
packages = ["."]
|
||||||
|
revision = "4fdf99ab29366514c69ccccddab5dc58b8d84062"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
name = "github.com/onsi/ginkgo"
|
name = "github.com/onsi/ginkgo"
|
||||||
packages = [
|
packages = [
|
||||||
|
@ -372,6 +378,6 @@
|
||||||
[solve-meta]
|
[solve-meta]
|
||||||
analyzer-name = "dep"
|
analyzer-name = "dep"
|
||||||
analyzer-version = 1
|
analyzer-version = 1
|
||||||
inputs-digest = "659f47734b56af7fba146039231841e82cc4c3c95dff2814ec3688e967790a50"
|
inputs-digest = "a655e5231fe7b5c962579c8f032338c53177a95a3160bef4628f0761f714f730"
|
||||||
solver-name = "gps-cdcl"
|
solver-name = "gps-cdcl"
|
||||||
solver-version = 1
|
solver-version = 1
|
||||||
|
|
|
@ -41,3 +41,7 @@
|
||||||
[[constraint]]
|
[[constraint]]
|
||||||
name = "gopkg.in/ini.v1"
|
name = "gopkg.in/ini.v1"
|
||||||
version = "1.38.1"
|
version = "1.38.1"
|
||||||
|
|
||||||
|
[[constraint]]
|
||||||
|
branch = "master"
|
||||||
|
name = "github.com/mitchellh/go-ps"
|
||||||
|
|
2
Makefile
2
Makefile
|
@ -27,7 +27,7 @@ test:
|
||||||
docker build -t $(TEST_IMAGE_TAG) -f test/Dockerfile .
|
docker build -t $(TEST_IMAGE_TAG) -f test/Dockerfile .
|
||||||
docker run --rm --privileged -v $(PWD):$(PROJECT_DIR):ro -v /dev:/dev $(TEST_IMAGE_TAG)
|
docker run --rm --privileged -v $(PWD):$(PROJECT_DIR):ro -v /dev:/dev $(TEST_IMAGE_TAG)
|
||||||
container: build
|
container: build
|
||||||
docker build -t $(IMAGE_TAG) -f cmd/s3driver/Dockerfile.s3ql .
|
docker build -t $(IMAGE_TAG) -f cmd/s3driver/Dockerfile .
|
||||||
push: container
|
push: container
|
||||||
docker push $(IMAGE_TAG)
|
docker push $(IMAGE_TAG)
|
||||||
clean:
|
clean:
|
||||||
|
|
|
@ -76,6 +76,7 @@ As seen in the deployment example above, the driver can be configured to use one
|
||||||
* [s3fs](https://github.com/s3fs-fuse/s3fs-fuse)
|
* [s3fs](https://github.com/s3fs-fuse/s3fs-fuse)
|
||||||
* [goofys](https://github.com/kahing/goofys)
|
* [goofys](https://github.com/kahing/goofys)
|
||||||
* [s3ql](https://github.com/s3ql/s3ql)
|
* [s3ql](https://github.com/s3ql/s3ql)
|
||||||
|
* [s3backer](https://github.com/archiecobbs/s3backer)
|
||||||
|
|
||||||
All mounters have different strengths and weaknesses depending on your use case. Here are some characteristics which should help you choose a mounter:
|
All mounters have different strengths and weaknesses depending on your use case. Here are some characteristics which should help you choose a mounter:
|
||||||
|
|
||||||
|
@ -98,6 +99,14 @@ All mounters have different strengths and weaknesses depending on your use case.
|
||||||
* Supports compression before upload
|
* Supports compression before upload
|
||||||
* Supports encryption before upload
|
* Supports encryption before upload
|
||||||
|
|
||||||
|
### s3backer
|
||||||
|
* Represents a block device stored on S3
|
||||||
|
* Allows to use a real filesystem
|
||||||
|
* Files are not readable with other S3 clients
|
||||||
|
* Support appends
|
||||||
|
* Supports compression before upload (Not yet implemented in this driver)
|
||||||
|
* Supports encryption before upload (Not yet implemented in this driver)
|
||||||
|
|
||||||
# Limitations
|
# Limitations
|
||||||
As S3 is not a real file system there are some limitations to consider here. Depending on what mounter you are using, you will have different levels of POSIX compability. Also depending on what S3 storage backend you are using there are not always consistency guarantees. The detailed limitations can be found on the documentation of [s3fs](https://github.com/s3fs-fuse/s3fs-fuse#limitations) and [goofys](https://github.com/kahing/goofys#current-status).
|
As S3 is not a real file system there are some limitations to consider here. Depending on what mounter you are using, you will have different levels of POSIX compability. Also depending on what S3 storage backend you are using there are not always consistency guarantees. The detailed limitations can be found on the documentation of [s3fs](https://github.com/s3fs-fuse/s3fs-fuse#limitations) and [goofys](https://github.com/kahing/goofys#current-status).
|
||||||
|
|
||||||
|
|
|
@ -1,8 +1,30 @@
|
||||||
FROM debian:stretch
|
FROM golang:stretch
|
||||||
LABEL maintainers="Cyrill Troxler <cyrilltroxler@gmail.com>"
|
LABEL maintainers="Cyrill Troxler <cyrilltroxler@gmail.com>"
|
||||||
LABEL description="s3 fuse csi plugin"
|
LABEL description="s3 fuse csi plugin"
|
||||||
|
|
||||||
RUN apt-get update && apt-get install -y s3fs && rm -rf /var/lib/apt/lists/*
|
RUN apt-get update && apt-get install -y \
|
||||||
|
s3fs \
|
||||||
|
build-essential \
|
||||||
|
autoconf \
|
||||||
|
libcurl4-openssl-dev \
|
||||||
|
libfuse-dev \
|
||||||
|
libexpat1-dev \
|
||||||
|
libssl-dev \
|
||||||
|
zlib1g-dev \
|
||||||
|
xfsprogs \
|
||||||
|
psmisc \
|
||||||
|
git && \
|
||||||
|
rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
# Compile & install s3backer
|
||||||
|
RUN git clone https://github.com/archiecobbs/s3backer.git ./s3backer
|
||||||
|
|
||||||
|
WORKDIR "./s3backer"
|
||||||
|
|
||||||
|
RUN ./autogen.sh && \
|
||||||
|
./configure && \
|
||||||
|
make && \
|
||||||
|
make install
|
||||||
|
|
||||||
COPY ./_output/s3driver /s3driver
|
COPY ./_output/s3driver /s3driver
|
||||||
ENTRYPOINT ["/s3driver"]
|
ENTRYPOINT ["/s3driver"]
|
||||||
|
|
|
@ -130,8 +130,8 @@ spec:
|
||||||
- name: pods-mount-dir
|
- name: pods-mount-dir
|
||||||
mountPath: /var/lib/kubelet/pods
|
mountPath: /var/lib/kubelet/pods
|
||||||
mountPropagation: "Bidirectional"
|
mountPropagation: "Bidirectional"
|
||||||
- name: device-dir
|
- name: fuse-device
|
||||||
mountPath: /dev
|
mountPath: /dev/fuse
|
||||||
volumes:
|
volumes:
|
||||||
- name: plugin-dir
|
- name: plugin-dir
|
||||||
hostPath:
|
hostPath:
|
||||||
|
@ -141,6 +141,6 @@ spec:
|
||||||
hostPath:
|
hostPath:
|
||||||
path: /var/lib/kubelet/pods
|
path: /var/lib/kubelet/pods
|
||||||
type: Directory
|
type: Directory
|
||||||
- name: device-dir
|
- name: fuse-device
|
||||||
hostPath:
|
hostPath:
|
||||||
path: /dev
|
path: /dev/fuse
|
||||||
|
|
|
@ -78,7 +78,7 @@ spec:
|
||||||
volumeMounts:
|
volumeMounts:
|
||||||
- name: socket-dir
|
- name: socket-dir
|
||||||
mountPath: /var/lib/kubelet/plugins/ch.ctrox.csi.s3-driver
|
mountPath: /var/lib/kubelet/plugins/ch.ctrox.csi.s3-driver
|
||||||
- name: s3-csi-driver
|
- name: csi-s3-driver
|
||||||
image: ctrox/csi-s3-driver:0.2.0
|
image: ctrox/csi-s3-driver:0.2.0
|
||||||
args:
|
args:
|
||||||
- "--endpoint=$(CSI_ENDPOINT)"
|
- "--endpoint=$(CSI_ENDPOINT)"
|
||||||
|
|
|
@ -34,58 +34,71 @@ type controllerServer struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
|
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
|
||||||
|
volumeID := req.GetName()
|
||||||
|
|
||||||
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
|
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
|
||||||
glog.V(3).Infof("invalid create volume req: %v", req)
|
glog.V(3).Infof("invalid create volume req: %v", req)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check arguments
|
// Check arguments
|
||||||
if len(req.GetName()) == 0 {
|
if len(volumeID) == 0 {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Name missing in request")
|
return nil, status.Error(codes.InvalidArgument, "Name missing in request")
|
||||||
}
|
}
|
||||||
if req.GetVolumeCapabilities() == nil {
|
if req.GetVolumeCapabilities() == nil {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities missing in request")
|
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities missing in request")
|
||||||
}
|
}
|
||||||
|
|
||||||
volumeID := req.GetName()
|
capacityBytes := int64(req.GetCapacityRange().GetRequiredBytes())
|
||||||
|
|
||||||
glog.V(5).Infof("Got a request to create bucket %s", volumeID)
|
glog.V(5).Infof("Got a request to create bucket %s", volumeID)
|
||||||
|
|
||||||
exists, err := cs.s3.client.bucketExists(volumeID)
|
exists, err := cs.s3.client.bucketExists(volumeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if !exists {
|
if exists {
|
||||||
|
var b *bucket
|
||||||
|
b, err = cs.s3.client.getBucket(volumeID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Check if volume capacity requested is bigger than the already existing capacity
|
||||||
|
if capacityBytes > b.CapacityBytes {
|
||||||
|
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", volumeID))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
if err = cs.s3.client.createBucket(volumeID); err != nil {
|
if err = cs.s3.client.createBucket(volumeID); err != nil {
|
||||||
glog.V(3).Infof("failed to create volume: %v", err)
|
glog.V(3).Infof("failed to create volume: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
b := &bucket{
|
||||||
mounter, err := newMounter(volumeID, cs.s3.cfg)
|
Name: volumeID,
|
||||||
if err != nil {
|
CapacityBytes: capacityBytes,
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
if err := mounter.Format(); err != nil {
|
if err := cs.s3.client.setBucket(b); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("create volume %s", volumeID)
|
glog.V(4).Infof("create volume %s", volumeID)
|
||||||
s3Vol := s3Volume{}
|
s3Vol := s3Volume{}
|
||||||
s3Vol.VolName = req.GetName()
|
s3Vol.VolName = volumeID
|
||||||
s3Vol.VolID = volumeID
|
s3Vol.VolID = volumeID
|
||||||
return &csi.CreateVolumeResponse{
|
return &csi.CreateVolumeResponse{
|
||||||
Volume: &csi.Volume{
|
Volume: &csi.Volume{
|
||||||
Id: volumeID,
|
Id: volumeID,
|
||||||
CapacityBytes: 1,
|
CapacityBytes: capacityBytes,
|
||||||
Attributes: req.GetParameters(),
|
Attributes: req.GetParameters(),
|
||||||
},
|
},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
|
func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
|
||||||
|
volumeID := req.GetVolumeId()
|
||||||
|
|
||||||
// Check arguments
|
// Check arguments
|
||||||
if len(req.GetVolumeId()) == 0 {
|
if len(volumeID) == 0 {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
|
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -93,7 +106,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||||
glog.V(3).Infof("Invalid delete volume req: %v", req)
|
glog.V(3).Infof("Invalid delete volume req: %v", req)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
volumeID := req.VolumeId
|
|
||||||
glog.V(4).Infof("Deleting volume %s", volumeID)
|
glog.V(4).Infof("Deleting volume %s", volumeID)
|
||||||
|
|
||||||
exists, err := cs.s3.client.bucketExists(volumeID)
|
exists, err := cs.s3.client.bucketExists(volumeID)
|
||||||
|
|
|
@ -1,23 +1,31 @@
|
||||||
package s3
|
package s3
|
||||||
|
|
||||||
import "fmt"
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os/exec"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
"k8s.io/kubernetes/pkg/util/mount"
|
||||||
|
)
|
||||||
|
|
||||||
// Mounter interface which can be implemented
|
// Mounter interface which can be implemented
|
||||||
// by the different mounter types
|
// by the different mounter types
|
||||||
type Mounter interface {
|
type Mounter interface {
|
||||||
Format() error
|
Stage(stagePath string) error
|
||||||
Mount(targetPath string) error
|
Unstage(stagePath string) error
|
||||||
Unmount(targetPath string) error
|
Mount(source string, target string) error
|
||||||
|
Unmount(target string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
s3fsMounterType = "s3fs"
|
s3fsMounterType = "s3fs"
|
||||||
goofysMounterType = "goofys"
|
goofysMounterType = "goofys"
|
||||||
s3qlMounterType = "s3ql"
|
s3qlMounterType = "s3ql"
|
||||||
|
s3backerMounterType = "s3backer"
|
||||||
)
|
)
|
||||||
|
|
||||||
// newMounter returns a new mounter depending on the mounterType parameter
|
// newMounter returns a new mounter depending on the mounterType parameter
|
||||||
func newMounter(bucket string, cfg *Config) (Mounter, error) {
|
func newMounter(bucket *bucket, cfg *Config) (Mounter, error) {
|
||||||
switch cfg.Mounter {
|
switch cfg.Mounter {
|
||||||
case s3fsMounterType:
|
case s3fsMounterType:
|
||||||
return newS3fsMounter(bucket, cfg)
|
return newS3fsMounter(bucket, cfg)
|
||||||
|
@ -28,6 +36,38 @@ func newMounter(bucket string, cfg *Config) (Mounter, error) {
|
||||||
case s3qlMounterType:
|
case s3qlMounterType:
|
||||||
return newS3qlMounter(bucket, cfg)
|
return newS3qlMounter(bucket, cfg)
|
||||||
|
|
||||||
|
case s3backerMounterType:
|
||||||
|
return newS3backerMounter(bucket, cfg)
|
||||||
|
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("Error mounting bucket %s, invalid mounter specified: %s", bucket, cfg.Mounter)
|
return nil, fmt.Errorf("Error mounting bucket %s, invalid mounter specified: %s", bucket.Name, cfg.Mounter)
|
||||||
|
}
|
||||||
|
|
||||||
|
func fuseMount(path string, command string, args []string) error {
|
||||||
|
cmd := exec.Command(command, args...)
|
||||||
|
|
||||||
|
out, err := cmd.CombinedOutput()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Error fuseMount command: %s\nargs: %s\noutput: %s", command, args, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func fuseUnmount(path string, command string) error {
|
||||||
|
if err := mount.New("").Unmount(path); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// as fuse quits immediately, we will try to wait until the process is done
|
||||||
|
process, err := findFuseMountProcess(path, command)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error getting PID of fuse mount: %s", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if process == nil {
|
||||||
|
glog.Warningf("Unable to find PID of fuse mount %s, it must have finished already", path)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
glog.Infof("Found fuse pid %v of mount %s, checking if it still runs", process.Pid, path)
|
||||||
|
return waitForProcess(process, 1)
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,28 +7,30 @@ import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
goofysApi "github.com/kahing/goofys/api"
|
goofysApi "github.com/kahing/goofys/api"
|
||||||
"k8s.io/kubernetes/pkg/util/mount"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const defaultRegion = "us-east-1"
|
const (
|
||||||
|
goofysCmd = "goofys"
|
||||||
|
defaultRegion = "us-east-1"
|
||||||
|
)
|
||||||
|
|
||||||
// Implements Mounter
|
// Implements Mounter
|
||||||
type goofysMounter struct {
|
type goofysMounter struct {
|
||||||
bucket string
|
bucket *bucket
|
||||||
endpoint string
|
endpoint string
|
||||||
region string
|
region string
|
||||||
accessKeyID string
|
accessKeyID string
|
||||||
secretAccessKey string
|
secretAccessKey string
|
||||||
}
|
}
|
||||||
|
|
||||||
func newGoofysMounter(bucket string, cfg *Config) (Mounter, error) {
|
func newGoofysMounter(b *bucket, cfg *Config) (Mounter, error) {
|
||||||
region := cfg.Region
|
region := cfg.Region
|
||||||
// if endpoint is set we need a default region
|
// if endpoint is set we need a default region
|
||||||
if region == "" && cfg.Endpoint != "" {
|
if region == "" && cfg.Endpoint != "" {
|
||||||
region = defaultRegion
|
region = defaultRegion
|
||||||
}
|
}
|
||||||
return &goofysMounter{
|
return &goofysMounter{
|
||||||
bucket: bucket,
|
bucket: b,
|
||||||
endpoint: cfg.Endpoint,
|
endpoint: cfg.Endpoint,
|
||||||
region: region,
|
region: region,
|
||||||
accessKeyID: cfg.AccessKeyID,
|
accessKeyID: cfg.AccessKeyID,
|
||||||
|
@ -36,13 +38,17 @@ func newGoofysMounter(bucket string, cfg *Config) (Mounter, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (goofys *goofysMounter) Format() error {
|
func (goofys *goofysMounter) Stage(stageTarget string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (goofys *goofysMounter) Mount(targetPath string) error {
|
func (goofys *goofysMounter) Unstage(stageTarget string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (goofys *goofysMounter) Mount(source string, target string) error {
|
||||||
goofysCfg := &goofysApi.Config{
|
goofysCfg := &goofysApi.Config{
|
||||||
MountPoint: targetPath,
|
MountPoint: target,
|
||||||
Endpoint: goofys.endpoint,
|
Endpoint: goofys.endpoint,
|
||||||
Region: goofys.region,
|
Region: goofys.region,
|
||||||
DirMode: 0755,
|
DirMode: 0755,
|
||||||
|
@ -55,7 +61,7 @@ func (goofys *goofysMounter) Mount(targetPath string) error {
|
||||||
os.Setenv("AWS_ACCESS_KEY_ID", goofys.accessKeyID)
|
os.Setenv("AWS_ACCESS_KEY_ID", goofys.accessKeyID)
|
||||||
os.Setenv("AWS_SECRET_ACCESS_KEY", goofys.secretAccessKey)
|
os.Setenv("AWS_SECRET_ACCESS_KEY", goofys.secretAccessKey)
|
||||||
|
|
||||||
_, _, err := goofysApi.Mount(context.Background(), goofys.bucket, goofysCfg)
|
_, _, err := goofysApi.Mount(context.Background(), goofys.bucket.Name, goofysCfg)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Error mounting via goofys: %s", err)
|
return fmt.Errorf("Error mounting via goofys: %s", err)
|
||||||
|
@ -64,5 +70,5 @@ func (goofys *goofysMounter) Mount(targetPath string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (goofys *goofysMounter) Unmount(targetPath string) error {
|
func (goofys *goofysMounter) Unmount(targetPath string) error {
|
||||||
return mount.New("").Unmount(targetPath)
|
return fuseUnmount(targetPath, goofysCmd)
|
||||||
}
|
}
|
||||||
|
|
139
pkg/s3/mounter_s3backer.go
Normal file
139
pkg/s3/mounter_s3backer.go
Normal file
|
@ -0,0 +1,139 @@
|
||||||
|
package s3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
"k8s.io/kubernetes/pkg/util/mount"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Implements Mounter
|
||||||
|
type s3backerMounter struct {
|
||||||
|
bucket *bucket
|
||||||
|
url string
|
||||||
|
region string
|
||||||
|
accessKeyID string
|
||||||
|
secretAccessKey string
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
s3backerCmd = "s3backer"
|
||||||
|
s3backerFsType = "xfs"
|
||||||
|
s3backerDevice = "file"
|
||||||
|
// blockSize to use in k
|
||||||
|
s3backerBlockSize = "128k"
|
||||||
|
s3backerDefaultSize = 1024 * 1024 * 1024 // 1GiB
|
||||||
|
)
|
||||||
|
|
||||||
|
func newS3backerMounter(bucket *bucket, cfg *Config) (Mounter, error) {
|
||||||
|
// s3backer cannot work with 0 size volumes
|
||||||
|
if bucket.CapacityBytes == 0 {
|
||||||
|
bucket.CapacityBytes = s3backerDefaultSize
|
||||||
|
}
|
||||||
|
s3backer := &s3backerMounter{
|
||||||
|
bucket: bucket,
|
||||||
|
url: cfg.Endpoint,
|
||||||
|
region: cfg.Region,
|
||||||
|
accessKeyID: cfg.AccessKeyID,
|
||||||
|
secretAccessKey: cfg.SecretAccessKey,
|
||||||
|
}
|
||||||
|
|
||||||
|
return s3backer, s3backer.writePasswd()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s3backer *s3backerMounter) String() string {
|
||||||
|
return s3backer.bucket.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s3backer *s3backerMounter) Stage(stageTarget string) error {
|
||||||
|
// s3backer requires two mounts
|
||||||
|
// first mount will fuse mount the bucket to a single 'file'
|
||||||
|
if err := s3backer.mountInit(stageTarget); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// ensure 'file' device is formatted
|
||||||
|
err := formatFs(s3backerFsType, path.Join(stageTarget, s3backerDevice))
|
||||||
|
if err != nil {
|
||||||
|
fuseUnmount(stageTarget, s3backerCmd)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s3backer *s3backerMounter) Unstage(stageTarget string) error {
|
||||||
|
// Unmount the s3backer fuse mount
|
||||||
|
return fuseUnmount(stageTarget, s3backerCmd)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s3backer *s3backerMounter) Mount(source string, target string) error {
|
||||||
|
device := path.Join(source, s3backerDevice)
|
||||||
|
// second mount will mount the 'file' as a filesystem
|
||||||
|
err := mount.New("").Mount(device, target, s3backerFsType, []string{})
|
||||||
|
if err != nil {
|
||||||
|
// cleanup fuse mount
|
||||||
|
fuseUnmount(target, s3backerCmd)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s3backer *s3backerMounter) Unmount(targetPath string) error {
|
||||||
|
// Unmount the filesystem first
|
||||||
|
return mount.New("").Unmount(targetPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s3backer *s3backerMounter) mountInit(path string) error {
|
||||||
|
args := []string{
|
||||||
|
// baseURL must end with /
|
||||||
|
fmt.Sprintf("--baseURL=%s/", s3backer.url),
|
||||||
|
fmt.Sprintf("--blockSize=%v", s3backerBlockSize),
|
||||||
|
fmt.Sprintf("--size=%v", s3backer.bucket.CapacityBytes),
|
||||||
|
"--listBlocks",
|
||||||
|
s3backer.bucket.Name,
|
||||||
|
path,
|
||||||
|
}
|
||||||
|
if s3backer.region != "" {
|
||||||
|
args = append(args, fmt.Sprintf("--region=%s", s3backer.region))
|
||||||
|
}
|
||||||
|
|
||||||
|
return fuseMount(path, s3backerCmd, args)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s3backer *s3backerMounter) writePasswd() error {
|
||||||
|
pwFileName := fmt.Sprintf("%s/.s3backer_passwd", os.Getenv("HOME"))
|
||||||
|
pwFile, err := os.OpenFile(pwFileName, os.O_RDWR|os.O_CREATE, 0600)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = pwFile.WriteString(s3backer.accessKeyID + ":" + s3backer.secretAccessKey)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
pwFile.Close()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func formatFs(fsType string, device string) error {
|
||||||
|
diskMounter := &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: mount.NewOsExec()}
|
||||||
|
format, err := diskMounter.GetDiskFormat(device)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if format != "" {
|
||||||
|
glog.Infof("Disk %s is already formatted with format %s", device, format)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
args := []string{
|
||||||
|
device,
|
||||||
|
}
|
||||||
|
cmd := exec.Command("mkfs."+fsType, args...)
|
||||||
|
|
||||||
|
out, err := cmd.CombinedOutput()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Error formatting disk: %s", out)
|
||||||
|
}
|
||||||
|
glog.Infof("Formatting fs with type %s", fsType)
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -3,39 +3,44 @@ package s3
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/util/mount"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Implements Mounter
|
// Implements Mounter
|
||||||
type s3fsMounter struct {
|
type s3fsMounter struct {
|
||||||
bucket string
|
bucket *bucket
|
||||||
url string
|
url string
|
||||||
region string
|
region string
|
||||||
pwFileContent string
|
pwFileContent string
|
||||||
}
|
}
|
||||||
|
|
||||||
func newS3fsMounter(bucket string, cfg *Config) (Mounter, error) {
|
const (
|
||||||
|
s3fsCmd = "s3fs"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newS3fsMounter(b *bucket, cfg *Config) (Mounter, error) {
|
||||||
return &s3fsMounter{
|
return &s3fsMounter{
|
||||||
bucket: bucket,
|
bucket: b,
|
||||||
url: cfg.Endpoint,
|
url: cfg.Endpoint,
|
||||||
region: cfg.Region,
|
region: cfg.Region,
|
||||||
pwFileContent: cfg.AccessKeyID + ":" + cfg.SecretAccessKey,
|
pwFileContent: cfg.AccessKeyID + ":" + cfg.SecretAccessKey,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s3fs *s3fsMounter) Format() error {
|
func (s3fs *s3fsMounter) Stage(stageTarget string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s3fs *s3fsMounter) Mount(targetPath string) error {
|
func (s3fs *s3fsMounter) Unstage(stageTarget string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s3fs *s3fsMounter) Mount(source string, target string) error {
|
||||||
if err := writes3fsPass(s3fs.pwFileContent); err != nil {
|
if err := writes3fsPass(s3fs.pwFileContent); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
args := []string{
|
args := []string{
|
||||||
fmt.Sprintf("%s", s3fs.bucket),
|
fmt.Sprintf("%s", s3fs.bucket.Name),
|
||||||
fmt.Sprintf("%s", targetPath),
|
fmt.Sprintf("%s", target),
|
||||||
"-o", "sigv2",
|
"-o", "sigv2",
|
||||||
"-o", "use_path_request_style",
|
"-o", "use_path_request_style",
|
||||||
"-o", fmt.Sprintf("url=%s", s3fs.url),
|
"-o", fmt.Sprintf("url=%s", s3fs.url),
|
||||||
|
@ -43,16 +48,11 @@ func (s3fs *s3fsMounter) Mount(targetPath string) error {
|
||||||
"-o", "allow_other",
|
"-o", "allow_other",
|
||||||
"-o", "mp_umask=000",
|
"-o", "mp_umask=000",
|
||||||
}
|
}
|
||||||
cmd := exec.Command("s3fs", args...)
|
return fuseMount(target, s3fsCmd, args)
|
||||||
out, err := cmd.CombinedOutput()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Error mounting using s3fs, output: %s", out)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s3fs *s3fsMounter) Unmount(targetPath string) error {
|
func (s3fs *s3fsMounter) Unmount(target string) error {
|
||||||
return mount.New("").Unmount(targetPath)
|
return fuseUnmount(target, s3fsCmd)
|
||||||
}
|
}
|
||||||
|
|
||||||
func writes3fsPass(pwFileContent string) error {
|
func writes3fsPass(pwFileContent string) error {
|
||||||
|
|
|
@ -3,7 +3,6 @@ package s3
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
|
@ -15,6 +14,7 @@ import (
|
||||||
|
|
||||||
// Implements Mounter
|
// Implements Mounter
|
||||||
type s3qlMounter struct {
|
type s3qlMounter struct {
|
||||||
|
bucket *bucket
|
||||||
url string
|
url string
|
||||||
bucketURL string
|
bucketURL string
|
||||||
login string
|
login string
|
||||||
|
@ -31,7 +31,7 @@ const (
|
||||||
s3qlCmdUnmount = "umount.s3ql"
|
s3qlCmdUnmount = "umount.s3ql"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newS3qlMounter(bucket string, cfg *Config) (Mounter, error) {
|
func newS3qlMounter(b *bucket, cfg *Config) (Mounter, error) {
|
||||||
url, err := url.Parse(cfg.Endpoint)
|
url, err := url.Parse(cfg.Endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -41,6 +41,7 @@ func newS3qlMounter(bucket string, cfg *Config) (Mounter, error) {
|
||||||
url.Scheme = "s3c"
|
url.Scheme = "s3c"
|
||||||
}
|
}
|
||||||
s3ql := &s3qlMounter{
|
s3ql := &s3qlMounter{
|
||||||
|
bucket: b,
|
||||||
url: url.String(),
|
url: url.String(),
|
||||||
login: cfg.AccessKeyID,
|
login: cfg.AccessKeyID,
|
||||||
password: cfg.SecretAccessKey,
|
password: cfg.SecretAccessKey,
|
||||||
|
@ -48,7 +49,7 @@ func newS3qlMounter(bucket string, cfg *Config) (Mounter, error) {
|
||||||
ssl: ssl,
|
ssl: ssl,
|
||||||
}
|
}
|
||||||
|
|
||||||
url.Path = path.Join(url.Path, bucket)
|
url.Path = path.Join(url.Path, b.Name)
|
||||||
s3ql.bucketURL = url.String()
|
s3ql.bucketURL = url.String()
|
||||||
|
|
||||||
if !ssl {
|
if !ssl {
|
||||||
|
@ -58,7 +59,7 @@ func newS3qlMounter(bucket string, cfg *Config) (Mounter, error) {
|
||||||
return s3ql, s3ql.writeConfig()
|
return s3ql, s3ql.writeConfig()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s3ql *s3qlMounter) Format() error {
|
func (s3ql *s3qlMounter) Stage(stagePath string) error {
|
||||||
// force creation to ignore existing data
|
// force creation to ignore existing data
|
||||||
args := []string{
|
args := []string{
|
||||||
s3ql.bucketURL,
|
s3ql.bucketURL,
|
||||||
|
@ -67,27 +68,8 @@ func (s3ql *s3qlMounter) Format() error {
|
||||||
|
|
||||||
p := fmt.Sprintf("%s\n%s\n", s3ql.passphrase, s3ql.passphrase)
|
p := fmt.Sprintf("%s\n%s\n", s3ql.passphrase, s3ql.passphrase)
|
||||||
reader := bytes.NewReader([]byte(p))
|
reader := bytes.NewReader([]byte(p))
|
||||||
return s3qlCmd(s3qlCmdMkfs, append(args, s3ql.options...), reader)
|
cmd := exec.Command(s3qlCmdMkfs, append(args, s3ql.options...)...)
|
||||||
}
|
cmd.Stdin = reader
|
||||||
|
|
||||||
func (s3ql *s3qlMounter) Mount(targetPath string) error {
|
|
||||||
args := []string{
|
|
||||||
s3ql.bucketURL,
|
|
||||||
targetPath,
|
|
||||||
"--allow-other",
|
|
||||||
}
|
|
||||||
return s3qlCmd(s3qlCmdMount, append(args, s3ql.options...), nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s3ql *s3qlMounter) Unmount(targetPath string) error {
|
|
||||||
return s3qlCmd(s3qlCmdUnmount, []string{targetPath}, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func s3qlCmd(s3qlCmd string, args []string, stdin io.Reader) error {
|
|
||||||
cmd := exec.Command(s3qlCmd, args...)
|
|
||||||
if stdin != nil {
|
|
||||||
cmd.Stdin = stdin
|
|
||||||
}
|
|
||||||
|
|
||||||
out, err := cmd.CombinedOutput()
|
out, err := cmd.CombinedOutput()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -96,6 +78,23 @@ func s3qlCmd(s3qlCmd string, args []string, stdin io.Reader) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s3ql *s3qlMounter) Unstage(stagePath string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s3ql *s3qlMounter) Mount(source string, target string) error {
|
||||||
|
args := []string{
|
||||||
|
s3ql.bucketURL,
|
||||||
|
target,
|
||||||
|
"--allow-other",
|
||||||
|
}
|
||||||
|
return fuseMount(target, s3qlCmdMount, append(args, s3ql.options...))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s3ql *s3qlMounter) Unmount(target string) error {
|
||||||
|
return fuseUnmount(target, s3qlCmdMount)
|
||||||
|
}
|
||||||
|
|
||||||
func (s3ql *s3qlMounter) writeConfig() error {
|
func (s3ql *s3qlMounter) writeConfig() error {
|
||||||
s3qlIni := ini.Empty()
|
s3qlIni := ini.Empty()
|
||||||
section, err := s3qlIni.NewSection("s3ql")
|
section, err := s3qlIni.NewSection("s3ql")
|
||||||
|
|
|
@ -36,31 +36,28 @@ type nodeServer struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
|
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
|
||||||
|
volumeID := req.GetVolumeId()
|
||||||
|
targetPath := req.GetTargetPath()
|
||||||
|
stagingTargetPath := req.GetStagingTargetPath()
|
||||||
|
|
||||||
// Check arguments
|
// Check arguments
|
||||||
if req.GetVolumeCapability() == nil {
|
if req.GetVolumeCapability() == nil {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request")
|
return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request")
|
||||||
}
|
}
|
||||||
if len(req.GetVolumeId()) == 0 {
|
if len(volumeID) == 0 {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
|
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
|
||||||
}
|
}
|
||||||
if len(req.GetTargetPath()) == 0 {
|
if len(stagingTargetPath) == 0 {
|
||||||
|
return nil, status.Error(codes.InvalidArgument, "Staging Target path missing in request")
|
||||||
|
}
|
||||||
|
if len(targetPath) == 0 {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
|
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
|
||||||
}
|
}
|
||||||
|
|
||||||
targetPath := req.GetTargetPath()
|
notMnt, err := checkMount(targetPath)
|
||||||
notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
|
||||||
if err = os.MkdirAll(targetPath, 0750); err != nil {
|
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
notMnt = true
|
|
||||||
} else {
|
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !notMnt {
|
if !notMnt {
|
||||||
return &csi.NodePublishVolumeResponse{}, nil
|
return &csi.NodePublishVolumeResponse{}, nil
|
||||||
}
|
}
|
||||||
|
@ -70,59 +67,156 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
||||||
deviceID = req.GetPublishInfo()[deviceID]
|
deviceID = req.GetPublishInfo()[deviceID]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Implement readOnly & mountFlags
|
||||||
readOnly := req.GetReadonly()
|
readOnly := req.GetReadonly()
|
||||||
volumeID := req.GetVolumeId()
|
|
||||||
attrib := req.GetVolumeAttributes()
|
attrib := req.GetVolumeAttributes()
|
||||||
mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
|
mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
|
||||||
|
|
||||||
glog.V(4).Infof("target %v\ndevice %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n",
|
glog.V(4).Infof("target %v\ndevice %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n",
|
||||||
targetPath, deviceID, readOnly, volumeID, attrib, mountFlags)
|
targetPath, deviceID, readOnly, volumeID, attrib, mountFlags)
|
||||||
|
|
||||||
mounter, err := newMounter(volumeID, ns.s3.cfg)
|
b, err := ns.s3.client.getBucket(volumeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := mounter.Mount(targetPath); err != nil {
|
|
||||||
|
mounter, err := newMounter(b, ns.s3.cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := mounter.Mount(stagingTargetPath, targetPath); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("s3: bucket %s successfuly mounted to %s", volumeID, targetPath)
|
glog.V(4).Infof("s3: bucket %s successfuly mounted to %s", b.Name, targetPath)
|
||||||
|
|
||||||
return &csi.NodePublishVolumeResponse{}, nil
|
return &csi.NodePublishVolumeResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
|
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
|
||||||
|
volumeID := req.GetVolumeId()
|
||||||
|
targetPath := req.GetTargetPath()
|
||||||
|
|
||||||
// Check arguments
|
// Check arguments
|
||||||
if len(req.GetVolumeId()) == 0 {
|
if len(volumeID) == 0 {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
|
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
|
||||||
}
|
}
|
||||||
if len(req.GetTargetPath()) == 0 {
|
if len(targetPath) == 0 {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
|
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
|
||||||
}
|
}
|
||||||
|
|
||||||
mounter, err := newMounter(req.GetVolumeId(), ns.s3.cfg)
|
b, err := ns.s3.client.getBucket(volumeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := mounter.Unmount(req.GetTargetPath()); err != nil {
|
mounter, err := newMounter(b, ns.s3.cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := mounter.Unmount(targetPath); err != nil {
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
glog.V(4).Infof("s3: bucket %s has been unmounted.", req.GetVolumeId())
|
glog.V(4).Infof("s3: bucket %s has been unmounted.", volumeID)
|
||||||
|
|
||||||
return &csi.NodeUnpublishVolumeResponse{}, nil
|
return &csi.NodeUnpublishVolumeResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ns *nodeServer) NodeStageVolume(
|
func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
|
||||||
ctx context.Context,
|
volumeID := req.GetVolumeId()
|
||||||
req *csi.NodeStageVolumeRequest) (
|
stagingTargetPath := req.GetStagingTargetPath()
|
||||||
*csi.NodeStageVolumeResponse, error) {
|
|
||||||
return nil, status.Error(codes.Unimplemented, "")
|
// Check arguments
|
||||||
|
if len(volumeID) == 0 {
|
||||||
|
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ns *nodeServer) NodeUnstageVolume(
|
if len(stagingTargetPath) == 0 {
|
||||||
ctx context.Context,
|
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
|
||||||
req *csi.NodeUnstageVolumeRequest) (
|
}
|
||||||
*csi.NodeUnstageVolumeResponse, error) {
|
|
||||||
return nil, status.Error(codes.Unimplemented, "")
|
if req.VolumeCapability == nil {
|
||||||
|
return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Volume Capability must be provided")
|
||||||
|
}
|
||||||
|
|
||||||
|
notMnt, err := checkMount(stagingTargetPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
if !notMnt {
|
||||||
|
return &csi.NodeStageVolumeResponse{}, nil
|
||||||
|
}
|
||||||
|
b, err := ns.s3.client.getBucket(volumeID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
mounter, err := newMounter(b, ns.s3.cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := mounter.Stage(stagingTargetPath); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &csi.NodeStageVolumeResponse{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
|
||||||
|
volumeID := req.GetVolumeId()
|
||||||
|
stagingTargetPath := req.GetStagingTargetPath()
|
||||||
|
|
||||||
|
// Check arguments
|
||||||
|
if len(volumeID) == 0 {
|
||||||
|
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
|
||||||
|
}
|
||||||
|
if len(stagingTargetPath) == 0 {
|
||||||
|
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := ns.s3.client.getBucket(volumeID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
mounter, err := newMounter(b, ns.s3.cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := mounter.Unstage(stagingTargetPath); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &csi.NodeUnstageVolumeResponse{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NodeGetCapabilities returns the supported capabilities of the node server
|
||||||
|
func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
|
||||||
|
// currently there is a single NodeServer capability according to the spec
|
||||||
|
nscap := &csi.NodeServiceCapability{
|
||||||
|
Type: &csi.NodeServiceCapability_Rpc{
|
||||||
|
Rpc: &csi.NodeServiceCapability_RPC{
|
||||||
|
Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return &csi.NodeGetCapabilitiesResponse{
|
||||||
|
Capabilities: []*csi.NodeServiceCapability{
|
||||||
|
nscap,
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkMount(targetPath string) (bool, error) {
|
||||||
|
notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath)
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
if err = os.MkdirAll(targetPath, 0750); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
notMnt = true
|
||||||
|
} else {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return notMnt, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
package s3
|
package s3
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
@ -9,7 +12,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
metadataName = ".meta"
|
metadataName = ".metadata.json"
|
||||||
)
|
)
|
||||||
|
|
||||||
type s3Client struct {
|
type s3Client struct {
|
||||||
|
@ -17,6 +20,11 @@ type s3Client struct {
|
||||||
minio *minio.Client
|
minio *minio.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type bucket struct {
|
||||||
|
Name string
|
||||||
|
CapacityBytes int64
|
||||||
|
}
|
||||||
|
|
||||||
func newS3Client(cfg *Config) (*s3Client, error) {
|
func newS3Client(cfg *Config) (*s3Client, error) {
|
||||||
var client = &s3Client{}
|
var client = &s3Client{}
|
||||||
|
|
||||||
|
@ -91,3 +99,32 @@ func (client *s3Client) emptyBucket(bucketName string) error {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (client *s3Client) setBucket(bucket *bucket) error {
|
||||||
|
b := new(bytes.Buffer)
|
||||||
|
json.NewEncoder(b).Encode(bucket)
|
||||||
|
opts := minio.PutObjectOptions{ContentType: "application/json"}
|
||||||
|
_, err := client.minio.PutObject(bucket.Name, metadataName, b, int64(b.Len()), opts)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (client *s3Client) getBucket(bucketName string) (*bucket, error) {
|
||||||
|
opts := minio.GetObjectOptions{}
|
||||||
|
obj, err := client.minio.GetObject(bucketName, metadataName, opts)
|
||||||
|
if err != nil {
|
||||||
|
return &bucket{}, err
|
||||||
|
}
|
||||||
|
objInfo, err := obj.Stat()
|
||||||
|
if err != nil {
|
||||||
|
return &bucket{}, err
|
||||||
|
}
|
||||||
|
b := make([]byte, objInfo.Size)
|
||||||
|
_, err = obj.Read(b)
|
||||||
|
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
return &bucket{}, err
|
||||||
|
}
|
||||||
|
var meta bucket
|
||||||
|
err = json.Unmarshal(b, &meta)
|
||||||
|
return &meta, err
|
||||||
|
}
|
||||||
|
|
|
@ -12,16 +12,13 @@ import (
|
||||||
"github.com/kubernetes-csi/csi-test/pkg/sanity"
|
"github.com/kubernetes-csi/csi-test/pkg/sanity"
|
||||||
)
|
)
|
||||||
|
|
||||||
const ()
|
|
||||||
|
|
||||||
var _ = Describe("S3Driver", func() {
|
var _ = Describe("S3Driver", func() {
|
||||||
mntDir, err := ioutil.TempDir("", "mnt")
|
mntDir, _ := ioutil.TempDir("", "mnt")
|
||||||
if err != nil {
|
stagingDir, _ := ioutil.TempDir("", "staging")
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
}
|
|
||||||
|
|
||||||
AfterSuite(func() {
|
AfterSuite(func() {
|
||||||
os.RemoveAll(mntDir)
|
os.RemoveAll(mntDir)
|
||||||
|
os.RemoveAll(stagingDir)
|
||||||
})
|
})
|
||||||
|
|
||||||
Context("goofys", func() {
|
Context("goofys", func() {
|
||||||
|
@ -45,8 +42,8 @@ var _ = Describe("S3Driver", func() {
|
||||||
Describe("CSI sanity", func() {
|
Describe("CSI sanity", func() {
|
||||||
sanityCfg := &sanity.Config{
|
sanityCfg := &sanity.Config{
|
||||||
TargetPath: mntDir,
|
TargetPath: mntDir,
|
||||||
|
StagingPath: stagingDir,
|
||||||
Address: csiEndpoint,
|
Address: csiEndpoint,
|
||||||
TestVolumeSize: 1,
|
|
||||||
}
|
}
|
||||||
sanity.GinkgoTest(sanityCfg)
|
sanity.GinkgoTest(sanityCfg)
|
||||||
})
|
})
|
||||||
|
@ -70,13 +67,11 @@ var _ = Describe("S3Driver", func() {
|
||||||
}
|
}
|
||||||
go driver.Run()
|
go driver.Run()
|
||||||
|
|
||||||
defer os.RemoveAll(mntDir)
|
|
||||||
|
|
||||||
Describe("CSI sanity", func() {
|
Describe("CSI sanity", func() {
|
||||||
sanityCfg := &sanity.Config{
|
sanityCfg := &sanity.Config{
|
||||||
TargetPath: mntDir,
|
TargetPath: mntDir,
|
||||||
|
StagingPath: stagingDir,
|
||||||
Address: csiEndpoint,
|
Address: csiEndpoint,
|
||||||
TestVolumeSize: 1,
|
|
||||||
}
|
}
|
||||||
sanity.GinkgoTest(sanityCfg)
|
sanity.GinkgoTest(sanityCfg)
|
||||||
})
|
})
|
||||||
|
@ -106,8 +101,37 @@ var _ = Describe("S3Driver", func() {
|
||||||
Describe("CSI sanity", func() {
|
Describe("CSI sanity", func() {
|
||||||
sanityCfg := &sanity.Config{
|
sanityCfg := &sanity.Config{
|
||||||
TargetPath: mntDir,
|
TargetPath: mntDir,
|
||||||
|
StagingPath: stagingDir,
|
||||||
|
Address: csiEndpoint,
|
||||||
|
}
|
||||||
|
sanity.GinkgoTest(sanityCfg)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
Context("s3backer", func() {
|
||||||
|
socket := "/tmp/csi-s3backer.sock"
|
||||||
|
csiEndpoint := "unix://" + socket
|
||||||
|
|
||||||
|
cfg := &s3.Config{
|
||||||
|
AccessKeyID: "FJDSJ",
|
||||||
|
SecretAccessKey: "DSG643HGDS",
|
||||||
|
Endpoint: "http://127.0.0.1:9000",
|
||||||
|
Mounter: "s3backer",
|
||||||
|
}
|
||||||
|
if err := os.Remove(socket); err != nil && !os.IsNotExist(err) {
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
}
|
||||||
|
driver, err := s3.NewS3("test-node", csiEndpoint, cfg)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
go driver.Run()
|
||||||
|
|
||||||
|
Describe("CSI sanity", func() {
|
||||||
|
sanityCfg := &sanity.Config{
|
||||||
|
TargetPath: mntDir,
|
||||||
|
StagingPath: stagingDir,
|
||||||
Address: csiEndpoint,
|
Address: csiEndpoint,
|
||||||
TestVolumeSize: 1,
|
|
||||||
}
|
}
|
||||||
sanity.GinkgoTest(sanityCfg)
|
sanity.GinkgoTest(sanityCfg)
|
||||||
})
|
})
|
||||||
|
|
69
pkg/s3/util.go
Normal file
69
pkg/s3/util.go
Normal file
|
@ -0,0 +1,69 @@
|
||||||
|
package s3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/mitchellh/go-ps"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
)
|
||||||
|
|
||||||
|
func waitForProcess(p *os.Process, backoff int) error {
|
||||||
|
if backoff == 20 {
|
||||||
|
return fmt.Errorf("Timeout waiting for PID %v to end", p.Pid)
|
||||||
|
}
|
||||||
|
cmdLine, err := getCmdLine(p.Pid)
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("Error checking cmdline of PID %v, assuming it is dead: %s", p.Pid, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if cmdLine == "" {
|
||||||
|
// ignore defunct processes
|
||||||
|
// TODO: debug why this happens in the first place
|
||||||
|
// seems to only happen on k8s, not on local docker
|
||||||
|
glog.Warning("Fuse process seems dead, returning")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := p.Signal(syscall.Signal(0)); err != nil {
|
||||||
|
glog.Warningf("Fuse process does not seem active or we are unprivileged: %s", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
glog.Infof("Fuse process with PID %v still active, waiting...", p.Pid)
|
||||||
|
time.Sleep(time.Duration(backoff*100) * time.Millisecond)
|
||||||
|
return waitForProcess(p, backoff+1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func findFuseMountProcess(path string, name string) (*os.Process, error) {
|
||||||
|
processes, err := ps.Processes()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, p := range processes {
|
||||||
|
if strings.Contains(p.Executable(), name) {
|
||||||
|
cmdLine, err := getCmdLine(p.Pid())
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Unable to get cmdline of PID %v: %s", p.Pid(), err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if strings.Contains(cmdLine, path) {
|
||||||
|
glog.Infof("Found matching pid %v on path %s", p.Pid(), path)
|
||||||
|
return os.FindProcess(p.Pid())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getCmdLine(pid int) (string, error) {
|
||||||
|
cmdLineFile := fmt.Sprintf("/proc/%v/cmdline", pid)
|
||||||
|
cmdLine, err := ioutil.ReadFile(cmdLineFile)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return string(cmdLine), nil
|
||||||
|
}
|
|
@ -1,23 +1,29 @@
|
||||||
FROM golang:stretch
|
FROM golang:stretch
|
||||||
LABEL maintainers="Cyrill Troxler <cyrilltroxler@gmail.com>"
|
LABEL maintainers="Cyrill Troxler <cyrilltroxler@gmail.com>"
|
||||||
LABEL description="s3 fuse csi plugin"
|
LABEL description="s3 fuse csi plugin"
|
||||||
ARG S3QL_VERSION=release-2.28
|
|
||||||
|
|
||||||
RUN apt-get update && \
|
RUN apt-get update && apt-get install -y \
|
||||||
apt-get install -y \
|
s3fs \
|
||||||
s3fs wget psmisc procps python3 python3-setuptools \
|
build-essential \
|
||||||
python3-dev python3-pip python3-llfuse pkg-config \
|
autoconf \
|
||||||
sqlite3 libsqlite3-dev python3-apsw cython && \
|
libcurl4-openssl-dev \
|
||||||
|
libfuse-dev \
|
||||||
|
libexpat1-dev \
|
||||||
|
libssl-dev \
|
||||||
|
zlib1g-dev \
|
||||||
|
xfsprogs \
|
||||||
|
psmisc \
|
||||||
|
git && \
|
||||||
rm -rf /var/lib/apt/lists/*
|
rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
RUN pip3 install defusedxml dugong requests pycrypto
|
RUN git clone https://github.com/archiecobbs/s3backer.git ./s3backer
|
||||||
|
|
||||||
WORKDIR /usr/src
|
WORKDIR "./s3backer"
|
||||||
RUN wget -q https://github.com/s3ql/s3ql/archive/${S3QL_VERSION}.tar.gz
|
|
||||||
RUN tar -xzf ${S3QL_VERSION}.tar.gz
|
RUN ["./autogen.sh"]
|
||||||
WORKDIR /usr/src/s3ql-${S3QL_VERSION}
|
RUN ["./configure"]
|
||||||
RUN python3 setup.py build_cython build_ext --inplace
|
RUN ["make"]
|
||||||
RUN python3 setup.py install
|
RUN ["make", "install"]
|
||||||
|
|
||||||
RUN go get -u github.com/minio/minio && go install github.com/minio/minio/cmd
|
RUN go get -u github.com/minio/minio && go install github.com/minio/minio/cmd
|
||||||
|
|
||||||
|
|
24
test/Dockerfile.s3ql
Normal file
24
test/Dockerfile.s3ql
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
FROM golang:stretch
|
||||||
|
LABEL maintainers="Cyrill Troxler <cyrilltroxler@gmail.com>"
|
||||||
|
LABEL description="s3 fuse csi plugin"
|
||||||
|
ARG S3QL_VERSION=release-2.28
|
||||||
|
|
||||||
|
RUN apt-get update && \
|
||||||
|
apt-get install -y \
|
||||||
|
s3fs wget psmisc procps python3 python3-setuptools \
|
||||||
|
python3-dev python3-pip python3-llfuse pkg-config \
|
||||||
|
sqlite3 libsqlite3-dev python3-apsw cython && \
|
||||||
|
rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
RUN pip3 install defusedxml dugong requests pycrypto
|
||||||
|
|
||||||
|
WORKDIR /usr/src
|
||||||
|
RUN wget -q https://github.com/s3ql/s3ql/archive/${S3QL_VERSION}.tar.gz
|
||||||
|
RUN tar -xzf ${S3QL_VERSION}.tar.gz
|
||||||
|
WORKDIR /usr/src/s3ql-${S3QL_VERSION}
|
||||||
|
RUN python3 setup.py build_cython build_ext --inplace
|
||||||
|
RUN python3 setup.py install
|
||||||
|
|
||||||
|
RUN go get -u github.com/minio/minio && go install github.com/minio/minio/cmd
|
||||||
|
|
||||||
|
CMD ["/go/src/github.com/ctrox/csi-s3-driver/test/test.sh"]
|
Loading…
Reference in a new issue