Only delete bucket if the volume is the alone user

This commit is contained in:
Cyrill Troxler 2021-04-05 15:22:15 +02:00
parent 1e506ad2cc
commit b11261ebe7
3 changed files with 38 additions and 57 deletions

View file

@ -45,7 +45,8 @@ const (
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
params := req.GetParameters() params := req.GetParameters()
capacityBytes := int64(req.GetCapacityRange().GetRequiredBytes())
mounterType := params[mounter.TypeKey]
volumeID := sanitizeVolumeID(req.GetName()) volumeID := sanitizeVolumeID(req.GetName())
bucketName := volumeID bucketName := volumeID
prefix := "" prefix := ""
@ -70,58 +71,47 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities missing in request") return nil, status.Error(codes.InvalidArgument, "Volume Capabilities missing in request")
} }
capacityBytes := int64(req.GetCapacityRange().GetRequiredBytes())
mounter := params[mounter.TypeKey]
glog.V(4).Infof("Got a request to create volume %s", volumeID) glog.V(4).Infof("Got a request to create volume %s", volumeID)
meta := &s3.FSMeta{
BucketName: bucketName,
Prefix: prefix,
Mounter: mounterType,
CapacityBytes: capacityBytes,
FSPath: defaultFsPath,
}
client, err := s3.NewClientFromSecret(req.GetSecrets()) client, err := s3.NewClientFromSecret(req.GetSecrets())
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err) return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
} }
exists, err := client.BucketExists(bucketName) exists, err := client.BucketExists(bucketName)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to check if bucket %s exists: %v", volumeID, err) return nil, fmt.Errorf("failed to check if bucket %s exists: %v", volumeID, err)
} }
var meta *s3.FSMeta
if exists {
meta, err = client.GetFSMeta(bucketName, prefix)
if err != nil { if exists {
glog.Warningf("Bucket %s exists, but failed to get its metadata: %v", volumeID, err) // get meta, ignore errors as it could just mean meta does not exist yet
meta = &s3.FSMeta{ m, err := client.GetFSMeta(bucketName, prefix)
BucketName: bucketName, if err == nil {
Prefix: prefix,
Mounter: mounter,
CapacityBytes: capacityBytes,
FSPath: defaultFsPath,
CreatedByCsi: false,
}
} else {
// Check if volume capacity requested is bigger than the already existing capacity // Check if volume capacity requested is bigger than the already existing capacity
if capacityBytes > meta.CapacityBytes { if capacityBytes > m.CapacityBytes {
return nil, status.Error( return nil, status.Error(
codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", volumeID), codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", volumeID),
) )
} }
meta.Mounter = mounter
} }
} else { } else {
if err = client.CreateBucket(bucketName); err != nil { if err = client.CreateBucket(bucketName); err != nil {
return nil, fmt.Errorf("failed to create bucket %s: %v", bucketName, err) return nil, fmt.Errorf("failed to create bucket %s: %v", bucketName, err)
} }
if err = client.CreatePrefix(bucketName, path.Join(prefix, defaultFsPath)); err != nil {
return nil, fmt.Errorf("failed to create prefix %s: %v", path.Join(prefix, defaultFsPath), err)
}
meta = &s3.FSMeta{
BucketName: bucketName,
Prefix: prefix,
Mounter: mounter,
CapacityBytes: capacityBytes,
FSPath: defaultFsPath,
CreatedByCsi: !exists,
}
} }
if err = client.CreatePrefix(bucketName, path.Join(prefix, defaultFsPath)); err != nil {
return nil, fmt.Errorf("failed to create prefix %s: %v", path.Join(prefix, defaultFsPath), err)
}
if err := client.SetFSMeta(meta); err != nil { if err := client.SetFSMeta(meta); err != nil {
return nil, fmt.Errorf("error setting bucket metadata: %w", err) return nil, fmt.Errorf("error setting bucket metadata: %w", err)
} }
@ -155,38 +145,30 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err) return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
} }
exists, err := client.BucketExists(bucketName)
if err != nil { if _, err := client.GetFSMeta(bucketName, prefix); err != nil {
return nil, err glog.V(5).Infof("FSMeta of volume %s does not exist, ignoring delete request", volumeID)
return &csi.DeleteVolumeResponse{}, nil
} }
if exists {
meta, err := client.GetFSMeta(bucketName, prefix) if prefix == "" {
if err != nil { // prefix is empty, we delete the whole bucket
return nil, fmt.Errorf("failed to get metadata of buckect %s", volumeID) if err := client.RemoveBucket(bucketName); err != nil {
} glog.V(3).Infof("Failed to remove volume %s: %v", volumeID, err)
if prefix != "" { return nil, err
if err := client.RemovePrefix(bucketName, prefix); err != nil {
return nil, fmt.Errorf("unable to remove prefix: %w", err)
}
}
if meta.CreatedByCsi {
if err := client.RemoveBucket(bucketName); err != nil {
glog.V(3).Infof("Failed to remove volume %s: %v", volumeID, err)
return nil, err
}
glog.V(4).Infof("Bucket %s removed", volumeID)
} else {
glog.V(4).Infof("Bucket %s is not created by csi-s3, will not be deleted by csi-s3 automatically.", volumeID)
} }
glog.V(4).Infof("Bucket %s removed", bucketName)
} else { } else {
glog.V(5).Infof("Bucket %s does not exist, ignoring request", volumeID) if err := client.RemovePrefix(bucketName, prefix); err != nil {
return nil, fmt.Errorf("unable to remove prefix: %w", err)
}
glog.V(4).Infof("Prefix %s removed", prefix)
} }
return &csi.DeleteVolumeResponse{}, nil return &csi.DeleteVolumeResponse{}, nil
} }
func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
// Check arguments // Check arguments
if len(req.GetVolumeId()) == 0 { if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")

View file

@ -39,7 +39,6 @@ type FSMeta struct {
Mounter string `json:"Mounter"` Mounter string `json:"Mounter"`
FSPath string `json:"FSPath"` FSPath string `json:"FSPath"`
CapacityBytes int64 `json:"CapacityBytes"` CapacityBytes int64 `json:"CapacityBytes"`
CreatedByCsi bool `json:"CreatedByCsi"`
} }
func NewClient(cfg *Config) (*s3Client, error) { func NewClient(cfg *Config) (*s3Client, error) {

View file

@ -5,4 +5,4 @@ export MINIO_SECRET_KEY=DSG643HGDS
mkdir -p /tmp/minio mkdir -p /tmp/minio
minio server /tmp/minio &>/dev/null & minio server /tmp/minio &>/dev/null &
sleep 5 sleep 5
go test ./... -cover go test ./... -cover -ginkgo.noisySkippings=false