From 1caf4699666fdb7109ffeb313c3d3a0a7ce9adb5 Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Fri, 27 Jul 2018 12:56:28 +0200 Subject: [PATCH] Implement a metadata file and correct sizing As the controller does not mount/create the fs we have to store the capacity somewhere so the node knows about it. --- pkg/s3/controllerserver.go | 34 ++++++++++++---- pkg/s3/mounter.go | 4 +- pkg/s3/mounter_goofys.go | 8 ++-- pkg/s3/mounter_s3backer.go | 19 +++++---- pkg/s3/mounter_s3fs.go | 8 ++-- pkg/s3/mounter_s3ql.go | 8 ++-- pkg/s3/nodeserver.go | 73 ++++++++++++++++++++++------------ pkg/s3/s3-client.go | 39 +++++++++++++++++- pkg/s3/s3-driver_suite_test.go | 30 ++++++-------- 9 files changed, 150 insertions(+), 73 deletions(-) diff --git a/pkg/s3/controllerserver.go b/pkg/s3/controllerserver.go index 98da7ca..cb5bf08 100644 --- a/pkg/s3/controllerserver.go +++ b/pkg/s3/controllerserver.go @@ -34,50 +34,71 @@ type controllerServer struct { } func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { + volumeID := req.GetName() + if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { glog.V(3).Infof("invalid create volume req: %v", req) return nil, err } // Check arguments - if len(req.GetName()) == 0 { + if len(volumeID) == 0 { return nil, status.Error(codes.InvalidArgument, "Name missing in request") } if req.GetVolumeCapabilities() == nil { return nil, status.Error(codes.InvalidArgument, "Volume Capabilities missing in request") } - volumeID := req.GetName() + capacityBytes := int64(req.GetCapacityRange().GetRequiredBytes()) + glog.V(5).Infof("Got a request to create bucket %s", volumeID) exists, err := cs.s3.client.bucketExists(volumeID) if err != nil { return nil, err } - if !exists { + if exists { + var b *bucket + b, err = cs.s3.client.getBucket(volumeID) + if err != nil { + return nil, err + } + // Check if volume capacity requested is bigger than the already existing capacity + if capacityBytes > b.CapacityBytes { + return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", volumeID)) + } + } else { if err = cs.s3.client.createBucket(volumeID); err != nil { glog.V(3).Infof("failed to create volume: %v", err) return nil, err } } + b := &bucket{ + Name: volumeID, + CapacityBytes: capacityBytes, + } + if err := cs.s3.client.setBucket(b); err != nil { + return nil, err + } glog.V(4).Infof("create volume %s", volumeID) s3Vol := s3Volume{} - s3Vol.VolName = req.GetName() + s3Vol.VolName = volumeID s3Vol.VolID = volumeID return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ Id: volumeID, - CapacityBytes: 1, + CapacityBytes: capacityBytes, Attributes: req.GetParameters(), }, }, nil } func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { + volumeID := req.GetVolumeId() // Check arguments - if len(req.GetVolumeId()) == 0 { + if len(volumeID) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } @@ -85,7 +106,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol glog.V(3).Infof("Invalid delete volume req: %v", req) return nil, err } - volumeID := req.VolumeId glog.V(4).Infof("Deleting volume %s", volumeID) exists, err := cs.s3.client.bucketExists(volumeID) diff --git a/pkg/s3/mounter.go b/pkg/s3/mounter.go index 0d8426a..e7e1325 100644 --- a/pkg/s3/mounter.go +++ b/pkg/s3/mounter.go @@ -25,7 +25,7 @@ const ( ) // newMounter returns a new mounter depending on the mounterType parameter -func newMounter(bucket string, cfg *Config) (Mounter, error) { +func newMounter(bucket *bucket, cfg *Config) (Mounter, error) { switch cfg.Mounter { case s3fsMounterType: return newS3fsMounter(bucket, cfg) @@ -40,7 +40,7 @@ func newMounter(bucket string, cfg *Config) (Mounter, error) { return newS3backerMounter(bucket, cfg) } - return nil, fmt.Errorf("Error mounting bucket %s, invalid mounter specified: %s", bucket, cfg.Mounter) + return nil, fmt.Errorf("Error mounting bucket %s, invalid mounter specified: %s", bucket.Name, cfg.Mounter) } func fuseMount(path string, command string, args []string) error { diff --git a/pkg/s3/mounter_goofys.go b/pkg/s3/mounter_goofys.go index 31a37c4..326596e 100644 --- a/pkg/s3/mounter_goofys.go +++ b/pkg/s3/mounter_goofys.go @@ -16,21 +16,21 @@ const ( // Implements Mounter type goofysMounter struct { - bucket string + bucket *bucket endpoint string region string accessKeyID string secretAccessKey string } -func newGoofysMounter(bucket string, cfg *Config) (Mounter, error) { +func newGoofysMounter(b *bucket, cfg *Config) (Mounter, error) { region := cfg.Region // if endpoint is set we need a default region if region == "" && cfg.Endpoint != "" { region = defaultRegion } return &goofysMounter{ - bucket: bucket, + bucket: b, endpoint: cfg.Endpoint, region: region, accessKeyID: cfg.AccessKeyID, @@ -61,7 +61,7 @@ func (goofys *goofysMounter) Mount(source string, target string) error { os.Setenv("AWS_ACCESS_KEY_ID", goofys.accessKeyID) os.Setenv("AWS_SECRET_ACCESS_KEY", goofys.secretAccessKey) - _, _, err := goofysApi.Mount(context.Background(), goofys.bucket, goofysCfg) + _, _, err := goofysApi.Mount(context.Background(), goofys.bucket.Name, goofysCfg) if err != nil { return fmt.Errorf("Error mounting via goofys: %s", err) diff --git a/pkg/s3/mounter_s3backer.go b/pkg/s3/mounter_s3backer.go index bf6dbd8..3d43b4e 100644 --- a/pkg/s3/mounter_s3backer.go +++ b/pkg/s3/mounter_s3backer.go @@ -12,12 +12,11 @@ import ( // Implements Mounter type s3backerMounter struct { - bucket string + bucket *bucket url string region string accessKeyID string secretAccessKey string - size int64 } const ( @@ -25,24 +24,28 @@ const ( s3backerFsType = "xfs" s3backerDevice = "file" // blockSize to use in k - s3backerBlockSize = "128k" + s3backerBlockSize = "128k" + s3backerDefaultSize = 1024 * 1024 * 1024 // 1GiB ) -func newS3backerMounter(bucket string, cfg *Config) (Mounter, error) { +func newS3backerMounter(bucket *bucket, cfg *Config) (Mounter, error) { + // s3backer cannot work with 0 size volumes + if bucket.CapacityBytes == 0 { + bucket.CapacityBytes = s3backerDefaultSize + } s3backer := &s3backerMounter{ bucket: bucket, url: cfg.Endpoint, region: cfg.Region, accessKeyID: cfg.AccessKeyID, secretAccessKey: cfg.SecretAccessKey, - size: 1024 * 1024 * 1024 * 10, } return s3backer, s3backer.writePasswd() } func (s3backer *s3backerMounter) String() string { - return s3backer.bucket + return s3backer.bucket.Name } func (s3backer *s3backerMounter) Stage(stageTarget string) error { @@ -86,9 +89,9 @@ func (s3backer *s3backerMounter) mountInit(path string) error { // baseURL must end with / fmt.Sprintf("--baseURL=%s/", s3backer.url), fmt.Sprintf("--blockSize=%v", s3backerBlockSize), - fmt.Sprintf("--size=%v", s3backer.size), + fmt.Sprintf("--size=%v", s3backer.bucket.CapacityBytes), "--listBlocks", - s3backer.bucket, + s3backer.bucket.Name, path, } if s3backer.region != "" { diff --git a/pkg/s3/mounter_s3fs.go b/pkg/s3/mounter_s3fs.go index 2b90aa3..e6d6f42 100644 --- a/pkg/s3/mounter_s3fs.go +++ b/pkg/s3/mounter_s3fs.go @@ -7,7 +7,7 @@ import ( // Implements Mounter type s3fsMounter struct { - bucket string + bucket *bucket url string region string pwFileContent string @@ -17,9 +17,9 @@ const ( s3fsCmd = "s3fs" ) -func newS3fsMounter(bucket string, cfg *Config) (Mounter, error) { +func newS3fsMounter(b *bucket, cfg *Config) (Mounter, error) { return &s3fsMounter{ - bucket: bucket, + bucket: b, url: cfg.Endpoint, region: cfg.Region, pwFileContent: cfg.AccessKeyID + ":" + cfg.SecretAccessKey, @@ -39,7 +39,7 @@ func (s3fs *s3fsMounter) Mount(source string, target string) error { return err } args := []string{ - fmt.Sprintf("%s", s3fs.bucket), + fmt.Sprintf("%s", s3fs.bucket.Name), fmt.Sprintf("%s", target), "-o", "sigv2", "-o", "use_path_request_style", diff --git a/pkg/s3/mounter_s3ql.go b/pkg/s3/mounter_s3ql.go index 3bd095b..80facaa 100644 --- a/pkg/s3/mounter_s3ql.go +++ b/pkg/s3/mounter_s3ql.go @@ -14,7 +14,7 @@ import ( // Implements Mounter type s3qlMounter struct { - bucket string + bucket *bucket url string bucketURL string login string @@ -31,7 +31,7 @@ const ( s3qlCmdUnmount = "umount.s3ql" ) -func newS3qlMounter(bucket string, cfg *Config) (Mounter, error) { +func newS3qlMounter(b *bucket, cfg *Config) (Mounter, error) { url, err := url.Parse(cfg.Endpoint) if err != nil { return nil, err @@ -41,7 +41,7 @@ func newS3qlMounter(bucket string, cfg *Config) (Mounter, error) { url.Scheme = "s3c" } s3ql := &s3qlMounter{ - bucket: bucket, + bucket: b, url: url.String(), login: cfg.AccessKeyID, password: cfg.SecretAccessKey, @@ -49,7 +49,7 @@ func newS3qlMounter(bucket string, cfg *Config) (Mounter, error) { ssl: ssl, } - url.Path = path.Join(url.Path, bucket) + url.Path = path.Join(url.Path, b.Name) s3ql.bucketURL = url.String() if !ssl { diff --git a/pkg/s3/nodeserver.go b/pkg/s3/nodeserver.go index f86d6d4..27313e3 100644 --- a/pkg/s3/nodeserver.go +++ b/pkg/s3/nodeserver.go @@ -36,23 +36,24 @@ type nodeServer struct { } func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { + volumeID := req.GetVolumeId() + targetPath := req.GetTargetPath() + stagingTargetPath := req.GetStagingTargetPath() // Check arguments if req.GetVolumeCapability() == nil { return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request") } - if len(req.GetVolumeId()) == 0 { + if len(volumeID) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } - if len(req.GetStagingTargetPath()) == 0 { + if len(stagingTargetPath) == 0 { return nil, status.Error(codes.InvalidArgument, "Staging Target path missing in request") } - if len(req.GetTargetPath()) == 0 { + if len(targetPath) == 0 { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } - targetPath := req.GetTargetPath() - stagingPath := req.GetStagingTargetPath() notMnt, err := checkMount(targetPath) if err != nil { return nil, status.Error(codes.Internal, err.Error()) @@ -66,57 +67,70 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis deviceID = req.GetPublishInfo()[deviceID] } + // TODO: Implement readOnly & mountFlags readOnly := req.GetReadonly() - volumeID := req.GetVolumeId() attrib := req.GetVolumeAttributes() mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags() glog.V(4).Infof("target %v\ndevice %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n", targetPath, deviceID, readOnly, volumeID, attrib, mountFlags) - mounter, err := newMounter(volumeID, ns.s3.cfg) + b, err := ns.s3.client.getBucket(volumeID) if err != nil { return nil, err } - if err := mounter.Mount(stagingPath, targetPath); err != nil { + + mounter, err := newMounter(b, ns.s3.cfg) + if err != nil { + return nil, err + } + if err := mounter.Mount(stagingTargetPath, targetPath); err != nil { return nil, err } - glog.V(4).Infof("s3: bucket %s successfuly mounted to %s", volumeID, targetPath) + glog.V(4).Infof("s3: bucket %s successfuly mounted to %s", b.Name, targetPath) return &csi.NodePublishVolumeResponse{}, nil } func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { + volumeID := req.GetVolumeId() + targetPath := req.GetTargetPath() // Check arguments - if len(req.GetVolumeId()) == 0 { + if len(volumeID) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } - if len(req.GetTargetPath()) == 0 { + if len(targetPath) == 0 { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } - mounter, err := newMounter(req.GetVolumeId(), ns.s3.cfg) + b, err := ns.s3.client.getBucket(volumeID) if err != nil { return nil, err } - if err := mounter.Unmount(req.GetTargetPath()); err != nil { + mounter, err := newMounter(b, ns.s3.cfg) + if err != nil { + return nil, err + } + if err := mounter.Unmount(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) } - glog.V(4).Infof("s3: bucket %s has been unmounted.", req.GetVolumeId()) + glog.V(4).Infof("s3: bucket %s has been unmounted.", volumeID) return &csi.NodeUnpublishVolumeResponse{}, nil } func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { + volumeID := req.GetVolumeId() + stagingTargetPath := req.GetStagingTargetPath() // Check arguments - if len(req.GetVolumeId()) == 0 { + if len(volumeID) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } - if len(req.GetStagingTargetPath()) == 0 { + if len(stagingTargetPath) == 0 { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } @@ -124,20 +138,23 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Volume Capability must be provided") } - stagingPath := req.GetStagingTargetPath() - notMnt, err := checkMount(stagingPath) + notMnt, err := checkMount(stagingTargetPath) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } if !notMnt { return &csi.NodeStageVolumeResponse{}, nil } - - mounter, err := newMounter(req.GetVolumeId(), ns.s3.cfg) + b, err := ns.s3.client.getBucket(volumeID) if err != nil { return nil, err } - if err := mounter.Stage(stagingPath); err != nil { + + mounter, err := newMounter(b, ns.s3.cfg) + if err != nil { + return nil, err + } + if err := mounter.Stage(stagingTargetPath); err != nil { return nil, err } @@ -145,20 +162,26 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol } func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { + volumeID := req.GetVolumeId() + stagingTargetPath := req.GetStagingTargetPath() // Check arguments - if len(req.GetVolumeId()) == 0 { + if len(volumeID) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } - if len(req.GetStagingTargetPath()) == 0 { + if len(stagingTargetPath) == 0 { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } - mounter, err := newMounter(req.GetVolumeId(), ns.s3.cfg) + b, err := ns.s3.client.getBucket(volumeID) if err != nil { return nil, err } - if err := mounter.Unstage(req.GetStagingTargetPath()); err != nil { + mounter, err := newMounter(b, ns.s3.cfg) + if err != nil { + return nil, err + } + if err := mounter.Unstage(stagingTargetPath); err != nil { return nil, err } diff --git a/pkg/s3/s3-client.go b/pkg/s3/s3-client.go index 76713a3..69dc676 100644 --- a/pkg/s3/s3-client.go +++ b/pkg/s3/s3-client.go @@ -1,7 +1,10 @@ package s3 import ( + "bytes" + "encoding/json" "fmt" + "io" "net/url" "github.com/golang/glog" @@ -9,7 +12,7 @@ import ( ) const ( - metadataName = ".meta" + metadataName = ".metadata.json" ) type s3Client struct { @@ -17,6 +20,11 @@ type s3Client struct { minio *minio.Client } +type bucket struct { + Name string + CapacityBytes int64 +} + func newS3Client(cfg *Config) (*s3Client, error) { var client = &s3Client{} @@ -91,3 +99,32 @@ func (client *s3Client) emptyBucket(bucketName string) error { return nil } + +func (client *s3Client) setBucket(bucket *bucket) error { + b := new(bytes.Buffer) + json.NewEncoder(b).Encode(bucket) + opts := minio.PutObjectOptions{ContentType: "application/json"} + _, err := client.minio.PutObject(bucket.Name, metadataName, b, int64(b.Len()), opts) + return err +} + +func (client *s3Client) getBucket(bucketName string) (*bucket, error) { + opts := minio.GetObjectOptions{} + obj, err := client.minio.GetObject(bucketName, metadataName, opts) + if err != nil { + return &bucket{}, err + } + objInfo, err := obj.Stat() + if err != nil { + return &bucket{}, err + } + b := make([]byte, objInfo.Size) + _, err = obj.Read(b) + + if err != nil && err != io.EOF { + return &bucket{}, err + } + var meta bucket + err = json.Unmarshal(b, &meta) + return &meta, err +} diff --git a/pkg/s3/s3-driver_suite_test.go b/pkg/s3/s3-driver_suite_test.go index 451132d..d19fd58 100644 --- a/pkg/s3/s3-driver_suite_test.go +++ b/pkg/s3/s3-driver_suite_test.go @@ -12,8 +12,6 @@ import ( "github.com/kubernetes-csi/csi-test/pkg/sanity" ) -const () - var _ = Describe("S3Driver", func() { mntDir, _ := ioutil.TempDir("", "mnt") stagingDir, _ := ioutil.TempDir("", "staging") @@ -43,10 +41,9 @@ var _ = Describe("S3Driver", func() { Describe("CSI sanity", func() { sanityCfg := &sanity.Config{ - TargetPath: mntDir, - StagingPath: stagingDir, - Address: csiEndpoint, - TestVolumeSize: 1, + TargetPath: mntDir, + StagingPath: stagingDir, + Address: csiEndpoint, } sanity.GinkgoTest(sanityCfg) }) @@ -72,10 +69,9 @@ var _ = Describe("S3Driver", func() { Describe("CSI sanity", func() { sanityCfg := &sanity.Config{ - TargetPath: mntDir, - StagingPath: stagingDir, - Address: csiEndpoint, - TestVolumeSize: 1, + TargetPath: mntDir, + StagingPath: stagingDir, + Address: csiEndpoint, } sanity.GinkgoTest(sanityCfg) }) @@ -104,10 +100,9 @@ var _ = Describe("S3Driver", func() { Describe("CSI sanity", func() { sanityCfg := &sanity.Config{ - TargetPath: mntDir, - StagingPath: stagingDir, - Address: csiEndpoint, - TestVolumeSize: 1, + TargetPath: mntDir, + StagingPath: stagingDir, + Address: csiEndpoint, } sanity.GinkgoTest(sanityCfg) }) @@ -134,10 +129,9 @@ var _ = Describe("S3Driver", func() { Describe("CSI sanity", func() { sanityCfg := &sanity.Config{ - TargetPath: mntDir, - StagingPath: stagingDir, - Address: csiEndpoint, - TestVolumeSize: 1, + TargetPath: mntDir, + StagingPath: stagingDir, + Address: csiEndpoint, } sanity.GinkgoTest(sanityCfg) })