Rename packages

Create separate packages for driver, s3 client and mounters.
This commit is contained in:
Cyrill Troxler 2021-04-03 12:40:58 +02:00
parent d0a14c8644
commit 7528b3297c
16 changed files with 213 additions and 207 deletions

View file

@ -21,7 +21,7 @@ import (
"log" "log"
"os" "os"
"github.com/ctrox/csi-s3/pkg/s3" "github.com/ctrox/csi-s3/pkg/driver"
) )
func init() { func init() {
@ -36,7 +36,7 @@ var (
func main() { func main() {
flag.Parse() flag.Parse()
driver, err := s3.NewS3(*nodeID, *endpoint) driver, err := driver.New(*nodeID, *endpoint)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package s3 package driver
import ( import (
"crypto/sha1" "crypto/sha1"
@ -23,6 +23,8 @@ import (
"io" "io"
"strings" "strings"
"github.com/ctrox/csi-s3/pkg/mounter"
"github.com/ctrox/csi-s3/pkg/s3"
"github.com/golang/glog" "github.com/golang/glog"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
@ -36,11 +38,15 @@ type controllerServer struct {
*csicommon.DefaultControllerServer *csicommon.DefaultControllerServer
} }
const (
defaultFsPrefix = "csi-fs"
)
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
params := req.GetParameters() params := req.GetParameters()
volumeID := sanitizeVolumeID(req.GetName()) volumeID := sanitizeVolumeID(req.GetName())
if bucketName, bucketExists := params[bucketKey]; bucketExists { if bucketName, bucketExists := params[mounter.BucketKey]; bucketExists {
volumeID = sanitizeVolumeID(bucketName) volumeID = sanitizeVolumeID(bucketName)
} }
@ -59,24 +65,24 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
capacityBytes := int64(req.GetCapacityRange().GetRequiredBytes()) capacityBytes := int64(req.GetCapacityRange().GetRequiredBytes())
mounter := params[mounterTypeKey] mounter := params[mounter.TypeKey]
glog.V(4).Infof("Got a request to create volume %s", volumeID) glog.V(4).Infof("Got a request to create volume %s", volumeID)
s3, err := newS3ClientFromSecrets(req.GetSecrets()) client, err := s3.NewClientFromSecret(req.GetSecrets())
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err) return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
} }
exists, err := s3.bucketExists(volumeID) exists, err := client.BucketExists(volumeID)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to check if bucket %s exists: %v", volumeID, err) return nil, fmt.Errorf("failed to check if bucket %s exists: %v", volumeID, err)
} }
var b *bucket var b *s3.Bucket
if exists { if exists {
b, err = s3.getBucket(volumeID) b, err = client.GetBucket(volumeID)
if err != nil { if err != nil {
glog.Warningf("Bucket %s exists, but failed to get its metadata: %v", volumeID, err) glog.Warningf("Bucket %s exists, but failed to get its metadata: %v", volumeID, err)
b = &bucket{ b = &s3.Bucket{
Name: volumeID, Name: volumeID,
Mounter: mounter, Mounter: mounter,
CapacityBytes: capacityBytes, CapacityBytes: capacityBytes,
@ -91,13 +97,13 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
b.Mounter = mounter b.Mounter = mounter
} }
} else { } else {
if err = s3.createBucket(volumeID); err != nil { if err = client.CreateBucket(volumeID); err != nil {
return nil, fmt.Errorf("failed to create volume %s: %v", volumeID, err) return nil, fmt.Errorf("failed to create volume %s: %v", volumeID, err)
} }
if err = s3.createPrefix(volumeID, defaultFsPrefix); err != nil { if err = client.CreatePrefix(volumeID, defaultFsPrefix); err != nil {
return nil, fmt.Errorf("failed to create prefix %s: %v", defaultFsPrefix, err) return nil, fmt.Errorf("failed to create prefix %s: %v", defaultFsPrefix, err)
} }
b = &bucket{ b = &s3.Bucket{
Name: volumeID, Name: volumeID,
Mounter: mounter, Mounter: mounter,
CapacityBytes: capacityBytes, CapacityBytes: capacityBytes,
@ -105,7 +111,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
CreatedByCsi: !exists, CreatedByCsi: !exists,
} }
} }
if err := s3.setBucket(b); err != nil { if err := client.SetBucket(b); err != nil {
return nil, fmt.Errorf("Error setting bucket metadata: %v", err) return nil, fmt.Errorf("Error setting bucket metadata: %v", err)
} }
@ -136,21 +142,21 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
} }
glog.V(4).Infof("Deleting volume %s", volumeID) glog.V(4).Infof("Deleting volume %s", volumeID)
s3, err := newS3ClientFromSecrets(req.GetSecrets()) client, err := s3.NewClientFromSecret(req.GetSecrets())
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err) return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
} }
exists, err := s3.bucketExists(volumeID) exists, err := client.BucketExists(volumeID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if exists { if exists {
b, err := s3.getBucket(volumeID) b, err := client.GetBucket(volumeID)
if err != nil { if err != nil {
return nil, fmt.Errorf("Failed to get metadata of buckect %s", volumeID) return nil, fmt.Errorf("Failed to get metadata of buckect %s", volumeID)
} }
if b.CreatedByCsi { if b.CreatedByCsi {
if err := s3.removeBucket(volumeID); err != nil { if err := client.RemoveBucket(volumeID); err != nil {
glog.V(3).Infof("Failed to remove volume %s: %v", volumeID, err) glog.V(3).Infof("Failed to remove volume %s: %v", volumeID, err)
return nil, err return nil, err
} }
@ -175,11 +181,11 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request") return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request")
} }
s3, err := newS3ClientFromSecrets(req.GetSecrets()) s3, err := s3.NewClientFromSecret(req.GetSecrets())
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err) return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
} }
exists, err := s3.bucketExists(req.GetVolumeId()) exists, err := s3.BucketExists(req.GetVolumeId())
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package s3 package driver
import ( import (
"github.com/container-storage-interface/spec/lib/go/csi" "github.com/container-storage-interface/spec/lib/go/csi"
@ -23,7 +23,7 @@ import (
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common" csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
) )
type s3 struct { type driver struct {
driver *csicommon.CSIDriver driver *csicommon.CSIDriver
endpoint string endpoint string
@ -44,39 +44,39 @@ var (
driverName = "ch.ctrox.csi.s3-driver" driverName = "ch.ctrox.csi.s3-driver"
) )
// NewS3 initializes the driver // New initializes the driver
func NewS3(nodeID string, endpoint string) (*s3, error) { func New(nodeID string, endpoint string) (*driver, error) {
driver := csicommon.NewCSIDriver(driverName, vendorVersion, nodeID) d := csicommon.NewCSIDriver(driverName, vendorVersion, nodeID)
if driver == nil { if d == nil {
glog.Fatalln("Failed to initialize CSI Driver.") glog.Fatalln("Failed to initialize CSI Driver.")
} }
s3Driver := &s3{ s3Driver := &driver{
endpoint: endpoint, endpoint: endpoint,
driver: driver, driver: d,
} }
return s3Driver, nil return s3Driver, nil
} }
func (s3 *s3) newIdentityServer(d *csicommon.CSIDriver) *identityServer { func (s3 *driver) newIdentityServer(d *csicommon.CSIDriver) *identityServer {
return &identityServer{ return &identityServer{
DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d), DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d),
} }
} }
func (s3 *s3) newControllerServer(d *csicommon.CSIDriver) *controllerServer { func (s3 *driver) newControllerServer(d *csicommon.CSIDriver) *controllerServer {
return &controllerServer{ return &controllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d), DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
} }
} }
func (s3 *s3) newNodeServer(d *csicommon.CSIDriver) *nodeServer { func (s3 *driver) newNodeServer(d *csicommon.CSIDriver) *nodeServer {
return &nodeServer{ return &nodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d), DefaultNodeServer: csicommon.NewDefaultNodeServer(d),
} }
} }
func (s3 *s3) Run() { func (s3 *driver) Run() {
glog.Infof("Driver: %v ", driverName) glog.Infof("Driver: %v ", driverName)
glog.Infof("Version: %v ", vendorVersion) glog.Infof("Version: %v ", vendorVersion)
// Initialize default library driver // Initialize default library driver

View file

@ -1,10 +1,11 @@
package s3_test package driver_test
import ( import (
"log" "log"
"os" "os"
"github.com/ctrox/csi-s3/pkg/s3" "github.com/ctrox/csi-s3/pkg/driver"
"github.com/ctrox/csi-s3/pkg/mounter"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
@ -19,7 +20,7 @@ var _ = Describe("S3Driver", func() {
if err := os.Remove(socket); err != nil && !os.IsNotExist(err) { if err := os.Remove(socket); err != nil && !os.IsNotExist(err) {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
} }
driver, err := s3.NewS3("test-node", csiEndpoint) driver, err := driver.New("test-node", csiEndpoint)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -45,7 +46,7 @@ var _ = Describe("S3Driver", func() {
if err := os.Remove(socket); err != nil && !os.IsNotExist(err) { if err := os.Remove(socket); err != nil && !os.IsNotExist(err) {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
} }
driver, err := s3.NewS3("test-node", csiEndpoint) driver, err := driver.New("test-node", csiEndpoint)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -73,8 +74,8 @@ var _ = Describe("S3Driver", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
} }
// Clear loop device so we cover the creation of it // Clear loop device so we cover the creation of it
os.Remove(s3.S3backerLoopDevice) os.Remove(mounter.S3backerLoopDevice)
driver, err := s3.NewS3("test-node", csiEndpoint) driver, err := driver.New("test-node", csiEndpoint)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -101,7 +102,7 @@ var _ = Describe("S3Driver", func() {
if err := os.Remove(socket); err != nil && !os.IsNotExist(err) { if err := os.Remove(socket); err != nil && !os.IsNotExist(err) {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
} }
driver, err := s3.NewS3("test-node", csiEndpoint) driver, err := driver.New("test-node", csiEndpoint)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package s3 package driver_test
import ( import (
"testing" "testing"

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package s3 package driver
import ( import (
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common" csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"

View file

@ -14,12 +14,14 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package s3 package driver
import ( import (
"fmt" "fmt"
"os" "os"
"github.com/ctrox/csi-s3/pkg/mounter"
"github.com/ctrox/csi-s3/pkg/s3"
"github.com/golang/glog" "github.com/golang/glog"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -76,16 +78,16 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
glog.V(4).Infof("target %v\ndevice %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n", glog.V(4).Infof("target %v\ndevice %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n",
targetPath, deviceID, readOnly, volumeID, attrib, mountFlags) targetPath, deviceID, readOnly, volumeID, attrib, mountFlags)
s3, err := newS3ClientFromSecrets(req.GetSecrets()) s3, err := s3.NewClientFromSecret(req.GetSecrets())
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err) return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
} }
b, err := s3.getBucket(volumeID) b, err := s3.GetBucket(volumeID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
mounter, err := newMounter(b, s3.cfg) mounter, err := mounter.New(b, s3.Config)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -110,7 +112,7 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return nil, status.Error(codes.InvalidArgument, "Target path missing in request") return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
} }
if err := fuseUnmount(targetPath); err != nil { if err := mounter.FuseUnmount(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
glog.V(4).Infof("s3: bucket %s has been unmounted.", volumeID) glog.V(4).Infof("s3: bucket %s has been unmounted.", volumeID)
@ -142,15 +144,15 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
if !notMnt { if !notMnt {
return &csi.NodeStageVolumeResponse{}, nil return &csi.NodeStageVolumeResponse{}, nil
} }
s3, err := newS3ClientFromSecrets(req.GetSecrets()) client, err := s3.NewClientFromSecret(req.GetSecrets())
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err) return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
} }
b, err := s3.getBucket(volumeID) b, err := client.GetBucket(volumeID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
mounter, err := newMounter(b, s3.cfg) mounter, err := mounter.New(b, client.Config)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -1,4 +1,4 @@
package s3 package mounter
import ( import (
"fmt" "fmt"
@ -6,6 +6,7 @@ import (
"context" "context"
"github.com/ctrox/csi-s3/pkg/s3"
goofysApi "github.com/kahing/goofys/api" goofysApi "github.com/kahing/goofys/api"
) )
@ -16,14 +17,14 @@ const (
// Implements Mounter // Implements Mounter
type goofysMounter struct { type goofysMounter struct {
bucket *bucket bucket *s3.Bucket
endpoint string endpoint string
region string region string
accessKeyID string accessKeyID string
secretAccessKey string secretAccessKey string
} }
func newGoofysMounter(b *bucket, cfg *Config) (Mounter, error) { func newGoofysMounter(b *s3.Bucket, cfg *s3.Config) (Mounter, error) {
region := cfg.Region region := cfg.Region
// if endpoint is set we need a default region // if endpoint is set we need a default region
if region == "" && cfg.Endpoint != "" { if region == "" && cfg.Endpoint != "" {

View file

@ -1,4 +1,4 @@
package s3 package mounter
import ( import (
"errors" "errors"
@ -10,35 +10,83 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/ctrox/csi-s3/pkg/s3"
"github.com/golang/glog"
"github.com/mitchellh/go-ps" "github.com/mitchellh/go-ps"
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
"github.com/golang/glog"
) )
func waitForProcess(p *os.Process, backoff int) error { // Mounter interface which can be implemented
if backoff == 20 { // by the different mounter types
return fmt.Errorf("Timeout waiting for PID %v to end", p.Pid) type Mounter interface {
Stage(stagePath string) error
Unstage(stagePath string) error
Mount(source string, target string) error
}
const (
s3fsMounterType = "s3fs"
goofysMounterType = "goofys"
s3backerMounterType = "s3backer"
rcloneMounterType = "rclone"
TypeKey = "mounter"
BucketKey = "bucket"
)
// New returns a new mounter depending on the mounterType parameter
func New(bucket *s3.Bucket, cfg *s3.Config) (Mounter, error) {
mounter := bucket.Mounter
// Fall back to mounterType in cfg
if len(bucket.Mounter) == 0 {
mounter = cfg.Mounter
} }
cmdLine, err := getCmdLine(p.Pid) switch mounter {
case s3fsMounterType:
return newS3fsMounter(bucket, cfg)
case goofysMounterType:
return newGoofysMounter(bucket, cfg)
case s3backerMounterType:
return newS3backerMounter(bucket, cfg)
case rcloneMounterType:
return newRcloneMounter(bucket, cfg)
default:
// default to s3backer
return newS3backerMounter(bucket, cfg)
}
}
func fuseMount(path string, command string, args []string) error {
cmd := exec.Command(command, args...)
glog.V(3).Infof("Mounting fuse with command: %s and args: %s", command, args)
out, err := cmd.CombinedOutput()
if err != nil { if err != nil {
glog.Warningf("Error checking cmdline of PID %v, assuming it is dead: %s", p.Pid, err) return fmt.Errorf("Error fuseMount command: %s\nargs: %s\noutput: %s", command, args, out)
}
return waitForMount(path, 10*time.Second)
}
func FuseUnmount(path string) error {
if err := mount.New("").Unmount(path); err != nil {
return err
}
// as fuse quits immediately, we will try to wait until the process is done
process, err := findFuseMountProcess(path)
if err != nil {
glog.Errorf("Error getting PID of fuse mount: %s", err)
return nil return nil
} }
if cmdLine == "" { if process == nil {
// ignore defunct processes glog.Warningf("Unable to find PID of fuse mount %s, it must have finished already", path)
// TODO: debug why this happens in the first place
// seems to only happen on k8s, not on local docker
glog.Warning("Fuse process seems dead, returning")
return nil return nil
} }
if err := p.Signal(syscall.Signal(0)); err != nil { glog.Infof("Found fuse pid %v of mount %s, checking if it still runs", process.Pid, path)
glog.Warningf("Fuse process does not seem active or we are unprivileged: %s", err) return waitForProcess(process, 1)
return nil
}
glog.Infof("Fuse process with PID %v still active, waiting...", p.Pid)
time.Sleep(time.Duration(backoff*100) * time.Millisecond)
return waitForProcess(p, backoff+1)
} }
func waitForMount(path string, timeout time.Duration) error { func waitForMount(path string, timeout time.Duration) error {
@ -79,6 +127,31 @@ func findFuseMountProcess(path string) (*os.Process, error) {
return nil, nil return nil, nil
} }
func waitForProcess(p *os.Process, backoff int) error {
if backoff == 20 {
return fmt.Errorf("Timeout waiting for PID %v to end", p.Pid)
}
cmdLine, err := getCmdLine(p.Pid)
if err != nil {
glog.Warningf("Error checking cmdline of PID %v, assuming it is dead: %s", p.Pid, err)
return nil
}
if cmdLine == "" {
// ignore defunct processes
// TODO: debug why this happens in the first place
// seems to only happen on k8s, not on local docker
glog.Warning("Fuse process seems dead, returning")
return nil
}
if err := p.Signal(syscall.Signal(0)); err != nil {
glog.Warningf("Fuse process does not seem active or we are unprivileged: %s", err)
return nil
}
glog.Infof("Fuse process with PID %v still active, waiting...", p.Pid)
time.Sleep(time.Duration(backoff*100) * time.Millisecond)
return waitForProcess(p, backoff+1)
}
func getCmdLine(pid int) (string, error) { func getCmdLine(pid int) (string, error) {
cmdLineFile := fmt.Sprintf("/proc/%v/cmdline", pid) cmdLineFile := fmt.Sprintf("/proc/%v/cmdline", pid)
cmdLine, err := ioutil.ReadFile(cmdLineFile) cmdLine, err := ioutil.ReadFile(cmdLineFile)

View file

@ -1,13 +1,15 @@
package s3 package mounter
import ( import (
"fmt" "fmt"
"os" "os"
"github.com/ctrox/csi-s3/pkg/s3"
) )
// Implements Mounter // Implements Mounter
type rcloneMounter struct { type rcloneMounter struct {
bucket *bucket bucket *s3.Bucket
url string url string
region string region string
accessKeyID string accessKeyID string
@ -18,7 +20,7 @@ const (
rcloneCmd = "rclone" rcloneCmd = "rclone"
) )
func newRcloneMounter(b *bucket, cfg *Config) (Mounter, error) { func newRcloneMounter(b *s3.Bucket, cfg *s3.Config) (Mounter, error) {
return &rcloneMounter{ return &rcloneMounter{
bucket: b, bucket: b,
url: cfg.Endpoint, url: cfg.Endpoint,

View file

@ -1,4 +1,4 @@
package s3 package mounter
import ( import (
"fmt" "fmt"
@ -7,13 +7,14 @@ import (
"os/exec" "os/exec"
"path" "path"
"github.com/ctrox/csi-s3/pkg/s3"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
) )
// Implements Mounter // Implements Mounter
type s3backerMounter struct { type s3backerMounter struct {
bucket *bucket bucket *s3.Bucket
url string url string
region string region string
accessKeyID string accessKeyID string
@ -32,7 +33,7 @@ const (
S3backerLoopDevice = "/dev/loop0" S3backerLoopDevice = "/dev/loop0"
) )
func newS3backerMounter(bucket *bucket, cfg *Config) (Mounter, error) { func newS3backerMounter(bucket *s3.Bucket, cfg *s3.Config) (Mounter, error) {
url, err := url.Parse(cfg.Endpoint) url, err := url.Parse(cfg.Endpoint)
if err != nil { if err != nil {
return nil, err return nil, err
@ -71,14 +72,14 @@ func (s3backer *s3backerMounter) Stage(stageTarget string) error {
// ensure 'file' device is formatted // ensure 'file' device is formatted
err := formatFs(s3backerFsType, path.Join(stageTarget, s3backerDevice)) err := formatFs(s3backerFsType, path.Join(stageTarget, s3backerDevice))
if err != nil { if err != nil {
fuseUnmount(stageTarget) FuseUnmount(stageTarget)
} }
return err return err
} }
func (s3backer *s3backerMounter) Unstage(stageTarget string) error { func (s3backer *s3backerMounter) Unstage(stageTarget string) error {
// Unmount the s3backer fuse mount // Unmount the s3backer fuse mount
return fuseUnmount(stageTarget) return FuseUnmount(stageTarget)
} }
func (s3backer *s3backerMounter) Mount(source string, target string) error { func (s3backer *s3backerMounter) Mount(source string, target string) error {
@ -87,7 +88,7 @@ func (s3backer *s3backerMounter) Mount(source string, target string) error {
err := mount.New("").Mount(device, target, s3backerFsType, []string{}) err := mount.New("").Mount(device, target, s3backerFsType, []string{})
if err != nil { if err != nil {
// cleanup fuse mount // cleanup fuse mount
fuseUnmount(target) FuseUnmount(target)
return err return err
} }
return nil return nil

View file

@ -1,13 +1,15 @@
package s3 package mounter
import ( import (
"fmt" "fmt"
"os" "os"
"github.com/ctrox/csi-s3/pkg/s3"
) )
// Implements Mounter // Implements Mounter
type s3fsMounter struct { type s3fsMounter struct {
bucket *bucket bucket *s3.Bucket
url string url string
region string region string
pwFileContent string pwFileContent string
@ -17,7 +19,7 @@ const (
s3fsCmd = "s3fs" s3fsCmd = "s3fs"
) )
func newS3fsMounter(b *bucket, cfg *Config) (Mounter, error) { func newS3fsMounter(b *s3.Bucket, cfg *s3.Config) (Mounter, error) {
return &s3fsMounter{ return &s3fsMounter{
bucket: b, bucket: b,
url: cfg.Endpoint, url: cfg.Endpoint,

View file

@ -14,17 +14,25 @@ import (
) )
const ( const (
metadataName = ".metadata.json" metadataName = ".metadata.json"
defaultFsPrefix = "csi-fs"
) )
type s3Client struct { type s3Client struct {
cfg *Config Config *Config
minio *minio.Client minio *minio.Client
ctx context.Context ctx context.Context
} }
type bucket struct { // Config holds values to configure the driver
type Config struct {
AccessKeyID string
SecretAccessKey string
Region string
Endpoint string
Mounter string
}
type Bucket struct {
Name string Name string
Mounter string Mounter string
FSPath string FSPath string
@ -32,11 +40,11 @@ type bucket struct {
CreatedByCsi bool CreatedByCsi bool
} }
func newS3Client(cfg *Config) (*s3Client, error) { func NewClient(cfg *Config) (*s3Client, error) {
var client = &s3Client{} var client = &s3Client{}
client.cfg = cfg client.Config = cfg
u, err := url.Parse(client.cfg.Endpoint) u, err := url.Parse(client.Config.Endpoint)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -46,7 +54,7 @@ func newS3Client(cfg *Config) (*s3Client, error) {
endpoint = u.Hostname() + ":" + u.Port() endpoint = u.Hostname() + ":" + u.Port()
} }
minioClient, err := minio.New(endpoint, &minio.Options{ minioClient, err := minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(client.cfg.AccessKeyID, client.cfg.SecretAccessKey, client.cfg.Region), Creds: credentials.NewStaticV4(client.Config.AccessKeyID, client.Config.SecretAccessKey, client.Config.Region),
Secure: ssl, Secure: ssl,
}) })
if err != nil { if err != nil {
@ -57,26 +65,26 @@ func newS3Client(cfg *Config) (*s3Client, error) {
return client, nil return client, nil
} }
func newS3ClientFromSecrets(secrets map[string]string) (*s3Client, error) { func NewClientFromSecret(secret map[string]string) (*s3Client, error) {
return newS3Client(&Config{ return NewClient(&Config{
AccessKeyID: secrets["accessKeyID"], AccessKeyID: secret["accessKeyID"],
SecretAccessKey: secrets["secretAccessKey"], SecretAccessKey: secret["secretAccessKey"],
Region: secrets["region"], Region: secret["region"],
Endpoint: secrets["endpoint"], Endpoint: secret["endpoint"],
// Mounter is set in the volume preferences, not secrets // Mounter is set in the volume preferences, not secrets
Mounter: "", Mounter: "",
}) })
} }
func (client *s3Client) bucketExists(bucketName string) (bool, error) { func (client *s3Client) BucketExists(bucketName string) (bool, error) {
return client.minio.BucketExists(client.ctx, bucketName) return client.minio.BucketExists(client.ctx, bucketName)
} }
func (client *s3Client) createBucket(bucketName string) error { func (client *s3Client) CreateBucket(bucketName string) error {
return client.minio.MakeBucket(client.ctx, bucketName, minio.MakeBucketOptions{Region: client.cfg.Region}) return client.minio.MakeBucket(client.ctx, bucketName, minio.MakeBucketOptions{Region: client.Config.Region})
} }
func (client *s3Client) createPrefix(bucketName string, prefix string) error { func (client *s3Client) CreatePrefix(bucketName string, prefix string) error {
_, err := client.minio.PutObject(client.ctx, bucketName, prefix+"/", bytes.NewReader([]byte("")), 0, minio.PutObjectOptions{}) _, err := client.minio.PutObject(client.ctx, bucketName, prefix+"/", bytes.NewReader([]byte("")), 0, minio.PutObjectOptions{})
if err != nil { if err != nil {
return err return err
@ -84,7 +92,11 @@ func (client *s3Client) createPrefix(bucketName string, prefix string) error {
return nil return nil
} }
func (client *s3Client) removeBucket(bucketName string) error { func (client *s3Client) RemovePrefix(bucketName string, prefix string) error {
return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
}
func (client *s3Client) RemoveBucket(bucketName string) error {
if err := client.emptyBucket(bucketName); err != nil { if err := client.emptyBucket(bucketName); err != nil {
return err return err
} }
@ -133,11 +145,10 @@ func (client *s3Client) emptyBucket(bucketName string) error {
} }
} }
// ensure our prefix is also removed return nil
return client.minio.RemoveObject(client.ctx, bucketName, defaultFsPrefix, minio.RemoveObjectOptions{})
} }
func (client *s3Client) setBucket(bucket *bucket) error { func (client *s3Client) SetBucket(bucket *Bucket) error {
b := new(bytes.Buffer) b := new(bytes.Buffer)
json.NewEncoder(b).Encode(bucket) json.NewEncoder(b).Encode(bucket)
opts := minio.PutObjectOptions{ContentType: "application/json"} opts := minio.PutObjectOptions{ContentType: "application/json"}
@ -145,23 +156,23 @@ func (client *s3Client) setBucket(bucket *bucket) error {
return err return err
} }
func (client *s3Client) getBucket(bucketName string) (*bucket, error) { func (client *s3Client) GetBucket(bucketName string) (*Bucket, error) {
opts := minio.GetObjectOptions{} opts := minio.GetObjectOptions{}
obj, err := client.minio.GetObject(client.ctx, bucketName, metadataName, opts) obj, err := client.minio.GetObject(client.ctx, bucketName, metadataName, opts)
if err != nil { if err != nil {
return &bucket{}, err return &Bucket{}, err
} }
objInfo, err := obj.Stat() objInfo, err := obj.Stat()
if err != nil { if err != nil {
return &bucket{}, err return &Bucket{}, err
} }
b := make([]byte, objInfo.Size) b := make([]byte, objInfo.Size)
_, err = obj.Read(b) _, err = obj.Read(b)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return &bucket{}, err return &Bucket{}, err
} }
var meta bucket var meta Bucket
err = json.Unmarshal(b, &meta) err = json.Unmarshal(b, &meta)
return &meta, err return &meta, err
} }

View file

@ -1,10 +0,0 @@
package s3
// Config holds values to configure the driver
type Config struct {
AccessKeyID string
SecretAccessKey string
Region string
Endpoint string
Mounter string
}

View file

@ -1,83 +0,0 @@
package s3
import (
"fmt"
"os/exec"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util/mount"
)
// Mounter interface which can be implemented
// by the different mounter types
type Mounter interface {
Stage(stagePath string) error
Unstage(stagePath string) error
Mount(source string, target string) error
}
const (
s3fsMounterType = "s3fs"
goofysMounterType = "goofys"
s3backerMounterType = "s3backer"
rcloneMounterType = "rclone"
mounterTypeKey = "mounter"
bucketKey = "bucket"
)
// newMounter returns a new mounter depending on the mounterType parameter
func newMounter(bucket *bucket, cfg *Config) (Mounter, error) {
mounter := bucket.Mounter
// Fall back to mounterType in cfg
if len(bucket.Mounter) == 0 {
mounter = cfg.Mounter
}
switch mounter {
case s3fsMounterType:
return newS3fsMounter(bucket, cfg)
case goofysMounterType:
return newGoofysMounter(bucket, cfg)
case s3backerMounterType:
return newS3backerMounter(bucket, cfg)
case rcloneMounterType:
return newRcloneMounter(bucket, cfg)
default:
// default to s3backer
return newS3backerMounter(bucket, cfg)
}
}
func fuseMount(path string, command string, args []string) error {
cmd := exec.Command(command, args...)
glog.V(3).Infof("Mounting fuse with command: %s and args: %s", command, args)
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("Error fuseMount command: %s\nargs: %s\noutput: %s", command, args, out)
}
return waitForMount(path, 10*time.Second)
}
func fuseUnmount(path string) error {
if err := mount.New("").Unmount(path); err != nil {
return err
}
// as fuse quits immediately, we will try to wait until the process is done
process, err := findFuseMountProcess(path)
if err != nil {
glog.Errorf("Error getting PID of fuse mount: %s", err)
return nil
}
if process == nil {
glog.Warningf("Unable to find PID of fuse mount %s, it must have finished already", path)
return nil
}
glog.Infof("Found fuse pid %v of mount %s, checking if it still runs", process.Pid, path)
return waitForProcess(process, 1)
}

View file

@ -5,4 +5,4 @@ export MINIO_SECRET_KEY=DSG643HGDS
mkdir -p /tmp/minio mkdir -p /tmp/minio
minio server /tmp/minio &>/dev/null & minio server /tmp/minio &>/dev/null &
sleep 5 sleep 5
go test github.com/ctrox/csi-s3/pkg/s3 -cover go test ./... -cover