Implement a metadata file and correct sizing
As the controller does not mount/create the fs we have to store the capacity somewhere so the node knows about it.
This commit is contained in:
parent
db0fbf77dd
commit
1caf469966
9 changed files with 150 additions and 73 deletions
|
@ -34,50 +34,71 @@ type controllerServer struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
|
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
|
||||||
|
volumeID := req.GetName()
|
||||||
|
|
||||||
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
|
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
|
||||||
glog.V(3).Infof("invalid create volume req: %v", req)
|
glog.V(3).Infof("invalid create volume req: %v", req)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check arguments
|
// Check arguments
|
||||||
if len(req.GetName()) == 0 {
|
if len(volumeID) == 0 {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Name missing in request")
|
return nil, status.Error(codes.InvalidArgument, "Name missing in request")
|
||||||
}
|
}
|
||||||
if req.GetVolumeCapabilities() == nil {
|
if req.GetVolumeCapabilities() == nil {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities missing in request")
|
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities missing in request")
|
||||||
}
|
}
|
||||||
|
|
||||||
volumeID := req.GetName()
|
capacityBytes := int64(req.GetCapacityRange().GetRequiredBytes())
|
||||||
|
|
||||||
glog.V(5).Infof("Got a request to create bucket %s", volumeID)
|
glog.V(5).Infof("Got a request to create bucket %s", volumeID)
|
||||||
|
|
||||||
exists, err := cs.s3.client.bucketExists(volumeID)
|
exists, err := cs.s3.client.bucketExists(volumeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if !exists {
|
if exists {
|
||||||
|
var b *bucket
|
||||||
|
b, err = cs.s3.client.getBucket(volumeID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Check if volume capacity requested is bigger than the already existing capacity
|
||||||
|
if capacityBytes > b.CapacityBytes {
|
||||||
|
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", volumeID))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
if err = cs.s3.client.createBucket(volumeID); err != nil {
|
if err = cs.s3.client.createBucket(volumeID); err != nil {
|
||||||
glog.V(3).Infof("failed to create volume: %v", err)
|
glog.V(3).Infof("failed to create volume: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
b := &bucket{
|
||||||
|
Name: volumeID,
|
||||||
|
CapacityBytes: capacityBytes,
|
||||||
|
}
|
||||||
|
if err := cs.s3.client.setBucket(b); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("create volume %s", volumeID)
|
glog.V(4).Infof("create volume %s", volumeID)
|
||||||
s3Vol := s3Volume{}
|
s3Vol := s3Volume{}
|
||||||
s3Vol.VolName = req.GetName()
|
s3Vol.VolName = volumeID
|
||||||
s3Vol.VolID = volumeID
|
s3Vol.VolID = volumeID
|
||||||
return &csi.CreateVolumeResponse{
|
return &csi.CreateVolumeResponse{
|
||||||
Volume: &csi.Volume{
|
Volume: &csi.Volume{
|
||||||
Id: volumeID,
|
Id: volumeID,
|
||||||
CapacityBytes: 1,
|
CapacityBytes: capacityBytes,
|
||||||
Attributes: req.GetParameters(),
|
Attributes: req.GetParameters(),
|
||||||
},
|
},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
|
func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
|
||||||
|
volumeID := req.GetVolumeId()
|
||||||
|
|
||||||
// Check arguments
|
// Check arguments
|
||||||
if len(req.GetVolumeId()) == 0 {
|
if len(volumeID) == 0 {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
|
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,7 +106,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||||
glog.V(3).Infof("Invalid delete volume req: %v", req)
|
glog.V(3).Infof("Invalid delete volume req: %v", req)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
volumeID := req.VolumeId
|
|
||||||
glog.V(4).Infof("Deleting volume %s", volumeID)
|
glog.V(4).Infof("Deleting volume %s", volumeID)
|
||||||
|
|
||||||
exists, err := cs.s3.client.bucketExists(volumeID)
|
exists, err := cs.s3.client.bucketExists(volumeID)
|
||||||
|
|
|
@ -25,7 +25,7 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
// newMounter returns a new mounter depending on the mounterType parameter
|
// newMounter returns a new mounter depending on the mounterType parameter
|
||||||
func newMounter(bucket string, cfg *Config) (Mounter, error) {
|
func newMounter(bucket *bucket, cfg *Config) (Mounter, error) {
|
||||||
switch cfg.Mounter {
|
switch cfg.Mounter {
|
||||||
case s3fsMounterType:
|
case s3fsMounterType:
|
||||||
return newS3fsMounter(bucket, cfg)
|
return newS3fsMounter(bucket, cfg)
|
||||||
|
@ -40,7 +40,7 @@ func newMounter(bucket string, cfg *Config) (Mounter, error) {
|
||||||
return newS3backerMounter(bucket, cfg)
|
return newS3backerMounter(bucket, cfg)
|
||||||
|
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("Error mounting bucket %s, invalid mounter specified: %s", bucket, cfg.Mounter)
|
return nil, fmt.Errorf("Error mounting bucket %s, invalid mounter specified: %s", bucket.Name, cfg.Mounter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func fuseMount(path string, command string, args []string) error {
|
func fuseMount(path string, command string, args []string) error {
|
||||||
|
|
|
@ -16,21 +16,21 @@ const (
|
||||||
|
|
||||||
// Implements Mounter
|
// Implements Mounter
|
||||||
type goofysMounter struct {
|
type goofysMounter struct {
|
||||||
bucket string
|
bucket *bucket
|
||||||
endpoint string
|
endpoint string
|
||||||
region string
|
region string
|
||||||
accessKeyID string
|
accessKeyID string
|
||||||
secretAccessKey string
|
secretAccessKey string
|
||||||
}
|
}
|
||||||
|
|
||||||
func newGoofysMounter(bucket string, cfg *Config) (Mounter, error) {
|
func newGoofysMounter(b *bucket, cfg *Config) (Mounter, error) {
|
||||||
region := cfg.Region
|
region := cfg.Region
|
||||||
// if endpoint is set we need a default region
|
// if endpoint is set we need a default region
|
||||||
if region == "" && cfg.Endpoint != "" {
|
if region == "" && cfg.Endpoint != "" {
|
||||||
region = defaultRegion
|
region = defaultRegion
|
||||||
}
|
}
|
||||||
return &goofysMounter{
|
return &goofysMounter{
|
||||||
bucket: bucket,
|
bucket: b,
|
||||||
endpoint: cfg.Endpoint,
|
endpoint: cfg.Endpoint,
|
||||||
region: region,
|
region: region,
|
||||||
accessKeyID: cfg.AccessKeyID,
|
accessKeyID: cfg.AccessKeyID,
|
||||||
|
@ -61,7 +61,7 @@ func (goofys *goofysMounter) Mount(source string, target string) error {
|
||||||
os.Setenv("AWS_ACCESS_KEY_ID", goofys.accessKeyID)
|
os.Setenv("AWS_ACCESS_KEY_ID", goofys.accessKeyID)
|
||||||
os.Setenv("AWS_SECRET_ACCESS_KEY", goofys.secretAccessKey)
|
os.Setenv("AWS_SECRET_ACCESS_KEY", goofys.secretAccessKey)
|
||||||
|
|
||||||
_, _, err := goofysApi.Mount(context.Background(), goofys.bucket, goofysCfg)
|
_, _, err := goofysApi.Mount(context.Background(), goofys.bucket.Name, goofysCfg)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Error mounting via goofys: %s", err)
|
return fmt.Errorf("Error mounting via goofys: %s", err)
|
||||||
|
|
|
@ -12,12 +12,11 @@ import (
|
||||||
|
|
||||||
// Implements Mounter
|
// Implements Mounter
|
||||||
type s3backerMounter struct {
|
type s3backerMounter struct {
|
||||||
bucket string
|
bucket *bucket
|
||||||
url string
|
url string
|
||||||
region string
|
region string
|
||||||
accessKeyID string
|
accessKeyID string
|
||||||
secretAccessKey string
|
secretAccessKey string
|
||||||
size int64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -26,23 +25,27 @@ const (
|
||||||
s3backerDevice = "file"
|
s3backerDevice = "file"
|
||||||
// blockSize to use in k
|
// blockSize to use in k
|
||||||
s3backerBlockSize = "128k"
|
s3backerBlockSize = "128k"
|
||||||
|
s3backerDefaultSize = 1024 * 1024 * 1024 // 1GiB
|
||||||
)
|
)
|
||||||
|
|
||||||
func newS3backerMounter(bucket string, cfg *Config) (Mounter, error) {
|
func newS3backerMounter(bucket *bucket, cfg *Config) (Mounter, error) {
|
||||||
|
// s3backer cannot work with 0 size volumes
|
||||||
|
if bucket.CapacityBytes == 0 {
|
||||||
|
bucket.CapacityBytes = s3backerDefaultSize
|
||||||
|
}
|
||||||
s3backer := &s3backerMounter{
|
s3backer := &s3backerMounter{
|
||||||
bucket: bucket,
|
bucket: bucket,
|
||||||
url: cfg.Endpoint,
|
url: cfg.Endpoint,
|
||||||
region: cfg.Region,
|
region: cfg.Region,
|
||||||
accessKeyID: cfg.AccessKeyID,
|
accessKeyID: cfg.AccessKeyID,
|
||||||
secretAccessKey: cfg.SecretAccessKey,
|
secretAccessKey: cfg.SecretAccessKey,
|
||||||
size: 1024 * 1024 * 1024 * 10,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return s3backer, s3backer.writePasswd()
|
return s3backer, s3backer.writePasswd()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s3backer *s3backerMounter) String() string {
|
func (s3backer *s3backerMounter) String() string {
|
||||||
return s3backer.bucket
|
return s3backer.bucket.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s3backer *s3backerMounter) Stage(stageTarget string) error {
|
func (s3backer *s3backerMounter) Stage(stageTarget string) error {
|
||||||
|
@ -86,9 +89,9 @@ func (s3backer *s3backerMounter) mountInit(path string) error {
|
||||||
// baseURL must end with /
|
// baseURL must end with /
|
||||||
fmt.Sprintf("--baseURL=%s/", s3backer.url),
|
fmt.Sprintf("--baseURL=%s/", s3backer.url),
|
||||||
fmt.Sprintf("--blockSize=%v", s3backerBlockSize),
|
fmt.Sprintf("--blockSize=%v", s3backerBlockSize),
|
||||||
fmt.Sprintf("--size=%v", s3backer.size),
|
fmt.Sprintf("--size=%v", s3backer.bucket.CapacityBytes),
|
||||||
"--listBlocks",
|
"--listBlocks",
|
||||||
s3backer.bucket,
|
s3backer.bucket.Name,
|
||||||
path,
|
path,
|
||||||
}
|
}
|
||||||
if s3backer.region != "" {
|
if s3backer.region != "" {
|
||||||
|
|
|
@ -7,7 +7,7 @@ import (
|
||||||
|
|
||||||
// Implements Mounter
|
// Implements Mounter
|
||||||
type s3fsMounter struct {
|
type s3fsMounter struct {
|
||||||
bucket string
|
bucket *bucket
|
||||||
url string
|
url string
|
||||||
region string
|
region string
|
||||||
pwFileContent string
|
pwFileContent string
|
||||||
|
@ -17,9 +17,9 @@ const (
|
||||||
s3fsCmd = "s3fs"
|
s3fsCmd = "s3fs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newS3fsMounter(bucket string, cfg *Config) (Mounter, error) {
|
func newS3fsMounter(b *bucket, cfg *Config) (Mounter, error) {
|
||||||
return &s3fsMounter{
|
return &s3fsMounter{
|
||||||
bucket: bucket,
|
bucket: b,
|
||||||
url: cfg.Endpoint,
|
url: cfg.Endpoint,
|
||||||
region: cfg.Region,
|
region: cfg.Region,
|
||||||
pwFileContent: cfg.AccessKeyID + ":" + cfg.SecretAccessKey,
|
pwFileContent: cfg.AccessKeyID + ":" + cfg.SecretAccessKey,
|
||||||
|
@ -39,7 +39,7 @@ func (s3fs *s3fsMounter) Mount(source string, target string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
args := []string{
|
args := []string{
|
||||||
fmt.Sprintf("%s", s3fs.bucket),
|
fmt.Sprintf("%s", s3fs.bucket.Name),
|
||||||
fmt.Sprintf("%s", target),
|
fmt.Sprintf("%s", target),
|
||||||
"-o", "sigv2",
|
"-o", "sigv2",
|
||||||
"-o", "use_path_request_style",
|
"-o", "use_path_request_style",
|
||||||
|
|
|
@ -14,7 +14,7 @@ import (
|
||||||
|
|
||||||
// Implements Mounter
|
// Implements Mounter
|
||||||
type s3qlMounter struct {
|
type s3qlMounter struct {
|
||||||
bucket string
|
bucket *bucket
|
||||||
url string
|
url string
|
||||||
bucketURL string
|
bucketURL string
|
||||||
login string
|
login string
|
||||||
|
@ -31,7 +31,7 @@ const (
|
||||||
s3qlCmdUnmount = "umount.s3ql"
|
s3qlCmdUnmount = "umount.s3ql"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newS3qlMounter(bucket string, cfg *Config) (Mounter, error) {
|
func newS3qlMounter(b *bucket, cfg *Config) (Mounter, error) {
|
||||||
url, err := url.Parse(cfg.Endpoint)
|
url, err := url.Parse(cfg.Endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -41,7 +41,7 @@ func newS3qlMounter(bucket string, cfg *Config) (Mounter, error) {
|
||||||
url.Scheme = "s3c"
|
url.Scheme = "s3c"
|
||||||
}
|
}
|
||||||
s3ql := &s3qlMounter{
|
s3ql := &s3qlMounter{
|
||||||
bucket: bucket,
|
bucket: b,
|
||||||
url: url.String(),
|
url: url.String(),
|
||||||
login: cfg.AccessKeyID,
|
login: cfg.AccessKeyID,
|
||||||
password: cfg.SecretAccessKey,
|
password: cfg.SecretAccessKey,
|
||||||
|
@ -49,7 +49,7 @@ func newS3qlMounter(bucket string, cfg *Config) (Mounter, error) {
|
||||||
ssl: ssl,
|
ssl: ssl,
|
||||||
}
|
}
|
||||||
|
|
||||||
url.Path = path.Join(url.Path, bucket)
|
url.Path = path.Join(url.Path, b.Name)
|
||||||
s3ql.bucketURL = url.String()
|
s3ql.bucketURL = url.String()
|
||||||
|
|
||||||
if !ssl {
|
if !ssl {
|
||||||
|
|
|
@ -36,23 +36,24 @@ type nodeServer struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
|
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
|
||||||
|
volumeID := req.GetVolumeId()
|
||||||
|
targetPath := req.GetTargetPath()
|
||||||
|
stagingTargetPath := req.GetStagingTargetPath()
|
||||||
|
|
||||||
// Check arguments
|
// Check arguments
|
||||||
if req.GetVolumeCapability() == nil {
|
if req.GetVolumeCapability() == nil {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request")
|
return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request")
|
||||||
}
|
}
|
||||||
if len(req.GetVolumeId()) == 0 {
|
if len(volumeID) == 0 {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
|
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
|
||||||
}
|
}
|
||||||
if len(req.GetStagingTargetPath()) == 0 {
|
if len(stagingTargetPath) == 0 {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Staging Target path missing in request")
|
return nil, status.Error(codes.InvalidArgument, "Staging Target path missing in request")
|
||||||
}
|
}
|
||||||
if len(req.GetTargetPath()) == 0 {
|
if len(targetPath) == 0 {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
|
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
|
||||||
}
|
}
|
||||||
|
|
||||||
targetPath := req.GetTargetPath()
|
|
||||||
stagingPath := req.GetStagingTargetPath()
|
|
||||||
notMnt, err := checkMount(targetPath)
|
notMnt, err := checkMount(targetPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
|
@ -66,57 +67,70 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
||||||
deviceID = req.GetPublishInfo()[deviceID]
|
deviceID = req.GetPublishInfo()[deviceID]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Implement readOnly & mountFlags
|
||||||
readOnly := req.GetReadonly()
|
readOnly := req.GetReadonly()
|
||||||
volumeID := req.GetVolumeId()
|
|
||||||
attrib := req.GetVolumeAttributes()
|
attrib := req.GetVolumeAttributes()
|
||||||
mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
|
mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
|
||||||
|
|
||||||
glog.V(4).Infof("target %v\ndevice %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n",
|
glog.V(4).Infof("target %v\ndevice %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n",
|
||||||
targetPath, deviceID, readOnly, volumeID, attrib, mountFlags)
|
targetPath, deviceID, readOnly, volumeID, attrib, mountFlags)
|
||||||
|
|
||||||
mounter, err := newMounter(volumeID, ns.s3.cfg)
|
b, err := ns.s3.client.getBucket(volumeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := mounter.Mount(stagingPath, targetPath); err != nil {
|
|
||||||
|
mounter, err := newMounter(b, ns.s3.cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := mounter.Mount(stagingTargetPath, targetPath); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("s3: bucket %s successfuly mounted to %s", volumeID, targetPath)
|
glog.V(4).Infof("s3: bucket %s successfuly mounted to %s", b.Name, targetPath)
|
||||||
|
|
||||||
return &csi.NodePublishVolumeResponse{}, nil
|
return &csi.NodePublishVolumeResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
|
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
|
||||||
|
volumeID := req.GetVolumeId()
|
||||||
|
targetPath := req.GetTargetPath()
|
||||||
|
|
||||||
// Check arguments
|
// Check arguments
|
||||||
if len(req.GetVolumeId()) == 0 {
|
if len(volumeID) == 0 {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
|
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
|
||||||
}
|
}
|
||||||
if len(req.GetTargetPath()) == 0 {
|
if len(targetPath) == 0 {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
|
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
|
||||||
}
|
}
|
||||||
|
|
||||||
mounter, err := newMounter(req.GetVolumeId(), ns.s3.cfg)
|
b, err := ns.s3.client.getBucket(volumeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := mounter.Unmount(req.GetTargetPath()); err != nil {
|
mounter, err := newMounter(b, ns.s3.cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := mounter.Unmount(targetPath); err != nil {
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
glog.V(4).Infof("s3: bucket %s has been unmounted.", req.GetVolumeId())
|
glog.V(4).Infof("s3: bucket %s has been unmounted.", volumeID)
|
||||||
|
|
||||||
return &csi.NodeUnpublishVolumeResponse{}, nil
|
return &csi.NodeUnpublishVolumeResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
|
func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
|
||||||
|
volumeID := req.GetVolumeId()
|
||||||
|
stagingTargetPath := req.GetStagingTargetPath()
|
||||||
|
|
||||||
// Check arguments
|
// Check arguments
|
||||||
if len(req.GetVolumeId()) == 0 {
|
if len(volumeID) == 0 {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
|
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(req.GetStagingTargetPath()) == 0 {
|
if len(stagingTargetPath) == 0 {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
|
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,20 +138,23 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
||||||
return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Volume Capability must be provided")
|
return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Volume Capability must be provided")
|
||||||
}
|
}
|
||||||
|
|
||||||
stagingPath := req.GetStagingTargetPath()
|
notMnt, err := checkMount(stagingTargetPath)
|
||||||
notMnt, err := checkMount(stagingPath)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
if !notMnt {
|
if !notMnt {
|
||||||
return &csi.NodeStageVolumeResponse{}, nil
|
return &csi.NodeStageVolumeResponse{}, nil
|
||||||
}
|
}
|
||||||
|
b, err := ns.s3.client.getBucket(volumeID)
|
||||||
mounter, err := newMounter(req.GetVolumeId(), ns.s3.cfg)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := mounter.Stage(stagingPath); err != nil {
|
|
||||||
|
mounter, err := newMounter(b, ns.s3.cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := mounter.Stage(stagingTargetPath); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,20 +162,26 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
|
func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
|
||||||
|
volumeID := req.GetVolumeId()
|
||||||
|
stagingTargetPath := req.GetStagingTargetPath()
|
||||||
|
|
||||||
// Check arguments
|
// Check arguments
|
||||||
if len(req.GetVolumeId()) == 0 {
|
if len(volumeID) == 0 {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
|
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
|
||||||
}
|
}
|
||||||
if len(req.GetStagingTargetPath()) == 0 {
|
if len(stagingTargetPath) == 0 {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
|
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
|
||||||
}
|
}
|
||||||
|
|
||||||
mounter, err := newMounter(req.GetVolumeId(), ns.s3.cfg)
|
b, err := ns.s3.client.getBucket(volumeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := mounter.Unstage(req.GetStagingTargetPath()); err != nil {
|
mounter, err := newMounter(b, ns.s3.cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := mounter.Unstage(stagingTargetPath); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
package s3
|
package s3
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
@ -9,7 +12,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
metadataName = ".meta"
|
metadataName = ".metadata.json"
|
||||||
)
|
)
|
||||||
|
|
||||||
type s3Client struct {
|
type s3Client struct {
|
||||||
|
@ -17,6 +20,11 @@ type s3Client struct {
|
||||||
minio *minio.Client
|
minio *minio.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type bucket struct {
|
||||||
|
Name string
|
||||||
|
CapacityBytes int64
|
||||||
|
}
|
||||||
|
|
||||||
func newS3Client(cfg *Config) (*s3Client, error) {
|
func newS3Client(cfg *Config) (*s3Client, error) {
|
||||||
var client = &s3Client{}
|
var client = &s3Client{}
|
||||||
|
|
||||||
|
@ -91,3 +99,32 @@ func (client *s3Client) emptyBucket(bucketName string) error {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (client *s3Client) setBucket(bucket *bucket) error {
|
||||||
|
b := new(bytes.Buffer)
|
||||||
|
json.NewEncoder(b).Encode(bucket)
|
||||||
|
opts := minio.PutObjectOptions{ContentType: "application/json"}
|
||||||
|
_, err := client.minio.PutObject(bucket.Name, metadataName, b, int64(b.Len()), opts)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (client *s3Client) getBucket(bucketName string) (*bucket, error) {
|
||||||
|
opts := minio.GetObjectOptions{}
|
||||||
|
obj, err := client.minio.GetObject(bucketName, metadataName, opts)
|
||||||
|
if err != nil {
|
||||||
|
return &bucket{}, err
|
||||||
|
}
|
||||||
|
objInfo, err := obj.Stat()
|
||||||
|
if err != nil {
|
||||||
|
return &bucket{}, err
|
||||||
|
}
|
||||||
|
b := make([]byte, objInfo.Size)
|
||||||
|
_, err = obj.Read(b)
|
||||||
|
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
return &bucket{}, err
|
||||||
|
}
|
||||||
|
var meta bucket
|
||||||
|
err = json.Unmarshal(b, &meta)
|
||||||
|
return &meta, err
|
||||||
|
}
|
||||||
|
|
|
@ -12,8 +12,6 @@ import (
|
||||||
"github.com/kubernetes-csi/csi-test/pkg/sanity"
|
"github.com/kubernetes-csi/csi-test/pkg/sanity"
|
||||||
)
|
)
|
||||||
|
|
||||||
const ()
|
|
||||||
|
|
||||||
var _ = Describe("S3Driver", func() {
|
var _ = Describe("S3Driver", func() {
|
||||||
mntDir, _ := ioutil.TempDir("", "mnt")
|
mntDir, _ := ioutil.TempDir("", "mnt")
|
||||||
stagingDir, _ := ioutil.TempDir("", "staging")
|
stagingDir, _ := ioutil.TempDir("", "staging")
|
||||||
|
@ -46,7 +44,6 @@ var _ = Describe("S3Driver", func() {
|
||||||
TargetPath: mntDir,
|
TargetPath: mntDir,
|
||||||
StagingPath: stagingDir,
|
StagingPath: stagingDir,
|
||||||
Address: csiEndpoint,
|
Address: csiEndpoint,
|
||||||
TestVolumeSize: 1,
|
|
||||||
}
|
}
|
||||||
sanity.GinkgoTest(sanityCfg)
|
sanity.GinkgoTest(sanityCfg)
|
||||||
})
|
})
|
||||||
|
@ -75,7 +72,6 @@ var _ = Describe("S3Driver", func() {
|
||||||
TargetPath: mntDir,
|
TargetPath: mntDir,
|
||||||
StagingPath: stagingDir,
|
StagingPath: stagingDir,
|
||||||
Address: csiEndpoint,
|
Address: csiEndpoint,
|
||||||
TestVolumeSize: 1,
|
|
||||||
}
|
}
|
||||||
sanity.GinkgoTest(sanityCfg)
|
sanity.GinkgoTest(sanityCfg)
|
||||||
})
|
})
|
||||||
|
@ -107,7 +103,6 @@ var _ = Describe("S3Driver", func() {
|
||||||
TargetPath: mntDir,
|
TargetPath: mntDir,
|
||||||
StagingPath: stagingDir,
|
StagingPath: stagingDir,
|
||||||
Address: csiEndpoint,
|
Address: csiEndpoint,
|
||||||
TestVolumeSize: 1,
|
|
||||||
}
|
}
|
||||||
sanity.GinkgoTest(sanityCfg)
|
sanity.GinkgoTest(sanityCfg)
|
||||||
})
|
})
|
||||||
|
@ -137,7 +132,6 @@ var _ = Describe("S3Driver", func() {
|
||||||
TargetPath: mntDir,
|
TargetPath: mntDir,
|
||||||
StagingPath: stagingDir,
|
StagingPath: stagingDir,
|
||||||
Address: csiEndpoint,
|
Address: csiEndpoint,
|
||||||
TestVolumeSize: 1,
|
|
||||||
}
|
}
|
||||||
sanity.GinkgoTest(sanityCfg)
|
sanity.GinkgoTest(sanityCfg)
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue