Supply credentials using volume secrets instead of cli config

This commit is contained in:
Cyrill Troxler 2019-03-10 12:19:02 +01:00
parent 59fd15b628
commit a670d7fb4d
18 changed files with 136 additions and 146 deletions

View file

@ -29,12 +29,11 @@ import (
"google.golang.org/grpc/status"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/drivers/pkg/csi-common"
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
)
type controllerServer struct {
*csicommon.DefaultControllerServer
*s3
}
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
@ -59,13 +58,17 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
glog.V(4).Infof("Got a request to create volume %s", volumeID)
exists, err := cs.s3.client.bucketExists(volumeID)
s3, err := newS3ClientFromSecrets(req.GetSecrets())
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
exists, err := s3.bucketExists(volumeID)
if err != nil {
return nil, fmt.Errorf("failed to check if bucket %s exists: %v", volumeID, err)
}
if exists {
var b *bucket
b, err = cs.s3.client.getBucket(volumeID)
b, err = s3.getBucket(volumeID)
if err != nil {
return nil, fmt.Errorf("failed to get bucket metadata of bucket %s: %v", volumeID, err)
}
@ -74,10 +77,10 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", volumeID))
}
} else {
if err = cs.s3.client.createBucket(volumeID); err != nil {
if err = s3.createBucket(volumeID); err != nil {
return nil, fmt.Errorf("failed to create volume %s: %v", volumeID, err)
}
if err = cs.s3.client.createPrefix(volumeID, fsPrefix); err != nil {
if err = s3.createPrefix(volumeID, fsPrefix); err != nil {
return nil, fmt.Errorf("failed to create prefix %s: %v", fsPrefix, err)
}
}
@ -87,7 +90,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
CapacityBytes: capacityBytes,
FSPath: fsPrefix,
}
if err := cs.s3.client.setBucket(b); err != nil {
if err := s3.setBucket(b); err != nil {
return nil, fmt.Errorf("Error setting bucket metadata: %v", err)
}
@ -118,12 +121,16 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
}
glog.V(4).Infof("Deleting volume %s", volumeID)
exists, err := cs.s3.client.bucketExists(volumeID)
s3, err := newS3ClientFromSecrets(req.GetSecrets())
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
exists, err := s3.bucketExists(volumeID)
if err != nil {
return nil, err
}
if exists {
if err := cs.s3.client.removeBucket(volumeID); err != nil {
if err := s3.removeBucket(volumeID); err != nil {
glog.V(3).Infof("Failed to remove volume: %v", err)
return nil, err
}
@ -144,7 +151,11 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request")
}
exists, err := cs.s3.client.bucketExists(req.GetVolumeId())
s3, err := newS3ClientFromSecrets(req.GetSecrets())
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
exists, err := s3.bucketExists(req.GetVolumeId())
if err != nil {
return nil, err
}

View file

@ -17,10 +17,9 @@ limitations under the License.
package s3
import (
"github.com/kubernetes-csi/drivers/pkg/csi-common"
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
)
type identityServer struct {
*csicommon.DefaultIdentityServer
*s3
}

View file

@ -15,7 +15,6 @@ type Mounter interface {
Stage(stagePath string) error
Unstage(stagePath string) error
Mount(source string, target string) error
Unmount(target string) error
}
const (
@ -67,12 +66,12 @@ func fuseMount(path string, command string, args []string) error {
return waitForMount(path, 10*time.Second)
}
func fuseUnmount(path string, command string) error {
func fuseUnmount(path string) error {
if err := mount.New("").Unmount(path); err != nil {
return err
}
// as fuse quits immediately, we will try to wait until the process is done
process, err := findFuseMountProcess(path, command)
process, err := findFuseMountProcess(path)
if err != nil {
glog.Errorf("Error getting PID of fuse mount: %s", err)
return nil

View file

@ -69,7 +69,3 @@ func (goofys *goofysMounter) Mount(source string, target string) error {
}
return nil
}
func (goofys *goofysMounter) Unmount(targetPath string) error {
return fuseUnmount(targetPath, goofysCmd)
}

View file

@ -54,7 +54,3 @@ func (rclone *rcloneMounter) Mount(source string, target string) error {
os.Setenv("AWS_SECRET_ACCESS_KEY", rclone.secretAccessKey)
return fuseMount(target, rcloneCmd, args)
}
func (rclone *rcloneMounter) Unmount(target string) error {
return fuseUnmount(target, rcloneCmd)
}

View file

@ -71,14 +71,14 @@ func (s3backer *s3backerMounter) Stage(stageTarget string) error {
// ensure 'file' device is formatted
err := formatFs(s3backerFsType, path.Join(stageTarget, s3backerDevice))
if err != nil {
fuseUnmount(stageTarget, s3backerCmd)
fuseUnmount(stageTarget)
}
return err
}
func (s3backer *s3backerMounter) Unstage(stageTarget string) error {
// Unmount the s3backer fuse mount
return fuseUnmount(stageTarget, s3backerCmd)
return fuseUnmount(stageTarget)
}
func (s3backer *s3backerMounter) Mount(source string, target string) error {
@ -87,17 +87,12 @@ func (s3backer *s3backerMounter) Mount(source string, target string) error {
err := mount.New("").Mount(device, target, s3backerFsType, []string{})
if err != nil {
// cleanup fuse mount
fuseUnmount(target, s3backerCmd)
fuseUnmount(target)
return err
}
return nil
}
func (s3backer *s3backerMounter) Unmount(targetPath string) error {
// Unmount the filesystem first
return mount.New("").Unmount(targetPath)
}
func (s3backer *s3backerMounter) mountInit(path string) error {
args := []string{
fmt.Sprintf("--blockSize=%s", s3backerBlockSize),

View file

@ -51,10 +51,6 @@ func (s3fs *s3fsMounter) Mount(source string, target string) error {
return fuseMount(target, s3fsCmd, args)
}
func (s3fs *s3fsMounter) Unmount(target string) error {
return fuseUnmount(target, s3fsCmd)
}
func writes3fsPass(pwFileContent string) error {
pwFileName := fmt.Sprintf("%s/.passwd-s3fs", os.Getenv("HOME"))
pwFile, err := os.OpenFile(pwFileName, os.O_RDWR|os.O_CREATE, 0600)

View file

@ -93,10 +93,6 @@ func (s3ql *s3qlMounter) Mount(source string, target string) error {
return fuseMount(target, s3qlCmdMount, append(args, s3ql.options...))
}
func (s3ql *s3qlMounter) Unmount(target string) error {
return fuseUnmount(target, s3qlCmdMount)
}
func (s3ql *s3qlMounter) writeConfig() error {
s3qlIni := ini.Empty()
section, err := s3qlIni.NewSection("s3ql")

View file

@ -17,6 +17,7 @@ limitations under the License.
package s3
import (
"fmt"
"os"
"github.com/golang/glog"
@ -27,12 +28,11 @@ import (
"google.golang.org/grpc/status"
"k8s.io/kubernetes/pkg/util/mount"
"github.com/kubernetes-csi/drivers/pkg/csi-common"
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
)
type nodeServer struct {
*csicommon.DefaultNodeServer
*s3
}
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
@ -76,12 +76,16 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
glog.V(4).Infof("target %v\ndevice %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n",
targetPath, deviceID, readOnly, volumeID, attrib, mountFlags)
b, err := ns.s3.client.getBucket(volumeID)
s3, err := newS3ClientFromSecrets(req.GetSecrets())
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
b, err := s3.getBucket(volumeID)
if err != nil {
return nil, err
}
mounter, err := newMounter(b, ns.s3.cfg)
mounter, err := newMounter(b, s3.cfg)
if err != nil {
return nil, err
}
@ -106,15 +110,7 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
b, err := ns.s3.client.getBucket(volumeID)
if err != nil {
return nil, err
}
mounter, err := newMounter(b, ns.s3.cfg)
if err != nil {
return nil, err
}
if err := mounter.Unmount(targetPath); err != nil {
if err := fuseUnmount(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
glog.V(4).Infof("s3: bucket %s has been unmounted.", volumeID)
@ -146,12 +142,15 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
if !notMnt {
return &csi.NodeStageVolumeResponse{}, nil
}
b, err := ns.s3.client.getBucket(volumeID)
s3, err := newS3ClientFromSecrets(req.GetSecrets())
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
b, err := s3.getBucket(volumeID)
if err != nil {
return nil, err
}
mounter, err := newMounter(b, ns.s3.cfg)
mounter, err := newMounter(b, s3.cfg)
if err != nil {
return nil, err
}
@ -174,18 +173,6 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
b, err := ns.s3.client.getBucket(volumeID)
if err != nil {
return nil, err
}
mounter, err := newMounter(b, ns.s3.cfg)
if err != nil {
return nil, err
}
if err := mounter.Unstage(stagingTargetPath); err != nil {
return nil, err
}
return &csi.NodeUnstageVolumeResponse{}, nil
}

View file

@ -49,6 +49,18 @@ func newS3Client(cfg *Config) (*s3Client, error) {
return client, nil
}
func newS3ClientFromSecrets(secrets map[string]string) (*s3Client, error) {
return newS3Client(&Config{
AccessKeyID: secrets["accessKeyID"],
SecretAccessKey: secrets["secretAccessKey"],
Region: secrets["region"],
Endpoint: secrets["endpoint"],
EncryptionKey: secrets["encryptionKey"],
// Mounter is set in the volume preferences, not secrets
Mounter: "",
})
}
func (client *s3Client) bucketExists(bucketName string) (bool, error) {
return client.minio.BucketExists(bucketName)
}

View file

@ -20,14 +20,12 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
"github.com/kubernetes-csi/drivers/pkg/csi-common"
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
)
type s3 struct {
driver *csicommon.CSIDriver
client *s3Client
endpoint string
cfg *Config
ids *identityServer
ns *nodeServer
@ -47,22 +45,15 @@ var (
)
// NewS3 initializes the driver
func NewS3(nodeID string, endpoint string, cfg *Config) (*s3, error) {
func NewS3(nodeID string, endpoint string) (*s3, error) {
driver := csicommon.NewCSIDriver(driverName, vendorVersion, nodeID)
if driver == nil {
glog.Fatalln("Failed to initialize CSI Driver.")
}
client, err := newS3Client(cfg)
if err != nil {
glog.V(3).Infof("Failed to create s3 client: %v", err)
return nil, err
}
s3Driver := &s3{
endpoint: endpoint,
driver: driver,
client: client,
cfg: cfg,
}
return s3Driver, nil
}
@ -70,21 +61,18 @@ func NewS3(nodeID string, endpoint string, cfg *Config) (*s3, error) {
func (s3 *s3) newIdentityServer(d *csicommon.CSIDriver) *identityServer {
return &identityServer{
DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d),
s3: s3,
}
}
func (s3 *s3) newControllerServer(d *csicommon.CSIDriver) *controllerServer {
return &controllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
s3: s3,
}
}
func (s3 *s3) newNodeServer(d *csicommon.CSIDriver) *nodeServer {
return &nodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d),
s3: s3,
}
}

View file

@ -24,16 +24,10 @@ var _ = Describe("S3Driver", func() {
Context("goofys", func() {
socket := "/tmp/csi-goofys.sock"
csiEndpoint := "unix://" + socket
cfg := &s3.Config{
AccessKeyID: "FJDSJ",
SecretAccessKey: "DSG643HGDS",
Endpoint: "http://127.0.0.1:9000",
Mounter: "goofys",
}
if err := os.Remove(socket); err != nil && !os.IsNotExist(err) {
Expect(err).NotTo(HaveOccurred())
}
driver, err := s3.NewS3("test-node", csiEndpoint, cfg)
driver, err := s3.NewS3("test-node", csiEndpoint)
if err != nil {
log.Fatal(err)
}
@ -44,6 +38,10 @@ var _ = Describe("S3Driver", func() {
TargetPath: mntDir,
StagingPath: stagingDir,
Address: csiEndpoint,
SecretsFile: "../../test/secret.yaml",
TestVolumeParameters: map[string]string{
"mounter": "goofys",
},
}
sanity.GinkgoTest(sanityCfg)
})
@ -52,16 +50,10 @@ var _ = Describe("S3Driver", func() {
Context("s3fs", func() {
socket := "/tmp/csi-s3fs.sock"
csiEndpoint := "unix://" + socket
cfg := &s3.Config{
AccessKeyID: "FJDSJ",
SecretAccessKey: "DSG643HGDS",
Endpoint: "http://127.0.0.1:9000",
Mounter: "s3fs",
}
if err := os.Remove(socket); err != nil && !os.IsNotExist(err) {
Expect(err).NotTo(HaveOccurred())
}
driver, err := s3.NewS3("test-node", csiEndpoint, cfg)
driver, err := s3.NewS3("test-node", csiEndpoint)
if err != nil {
log.Fatal(err)
}
@ -72,6 +64,10 @@ var _ = Describe("S3Driver", func() {
TargetPath: mntDir,
StagingPath: stagingDir,
Address: csiEndpoint,
SecretsFile: "../../test/secret.yaml",
TestVolumeParameters: map[string]string{
"mounter": "s3fs",
},
}
sanity.GinkgoTest(sanityCfg)
})
@ -81,16 +77,10 @@ var _ = Describe("S3Driver", func() {
socket := "/tmp/csi-s3ql.sock"
csiEndpoint := "unix://" + socket
cfg := &s3.Config{
AccessKeyID: "FJDSJ",
SecretAccessKey: "DSG643HGDS",
Endpoint: "http://127.0.0.1:9000",
Mounter: "s3ql",
}
if err := os.Remove(socket); err != nil && !os.IsNotExist(err) {
Expect(err).NotTo(HaveOccurred())
}
driver, err := s3.NewS3("test-node", csiEndpoint, cfg)
driver, err := s3.NewS3("test-node", csiEndpoint)
if err != nil {
log.Fatal(err)
}
@ -103,6 +93,10 @@ var _ = Describe("S3Driver", func() {
TargetPath: mntDir,
StagingPath: stagingDir,
Address: csiEndpoint,
SecretsFile: "../../test/secret.yaml",
TestVolumeParameters: map[string]string{
"mounter": "s3ql",
},
}
sanity.GinkgoTest(sanityCfg)
})
@ -112,18 +106,12 @@ var _ = Describe("S3Driver", func() {
socket := "/tmp/csi-s3backer.sock"
csiEndpoint := "unix://" + socket
cfg := &s3.Config{
AccessKeyID: "FJDSJ",
SecretAccessKey: "DSG643HGDS",
Endpoint: "http://127.0.0.1:9000",
Mounter: "s3backer",
}
if err := os.Remove(socket); err != nil && !os.IsNotExist(err) {
Expect(err).NotTo(HaveOccurred())
}
// Clear loop device so we cover the creation of it
os.Remove(s3.S3backerLoopDevice)
driver, err := s3.NewS3("test-node", csiEndpoint, cfg)
driver, err := s3.NewS3("test-node", csiEndpoint)
if err != nil {
log.Fatal(err)
}
@ -134,6 +122,10 @@ var _ = Describe("S3Driver", func() {
TargetPath: mntDir,
StagingPath: stagingDir,
Address: csiEndpoint,
SecretsFile: "../../test/secret.yaml",
TestVolumeParameters: map[string]string{
"mounter": "s3backer",
},
}
sanity.GinkgoTest(sanityCfg)
})
@ -143,16 +135,10 @@ var _ = Describe("S3Driver", func() {
socket := "/tmp/csi-rclone.sock"
csiEndpoint := "unix://" + socket
cfg := &s3.Config{
AccessKeyID: "FJDSJ",
SecretAccessKey: "DSG643HGDS",
Endpoint: "http://127.0.0.1:9000",
Mounter: "rclone",
}
if err := os.Remove(socket); err != nil && !os.IsNotExist(err) {
Expect(err).NotTo(HaveOccurred())
}
driver, err := s3.NewS3("test-node", csiEndpoint, cfg)
driver, err := s3.NewS3("test-node", csiEndpoint)
if err != nil {
log.Fatal(err)
}
@ -163,6 +149,10 @@ var _ = Describe("S3Driver", func() {
TargetPath: mntDir,
StagingPath: stagingDir,
Address: csiEndpoint,
SecretsFile: "../../test/secret.yaml",
TestVolumeParameters: map[string]string{
"mounter": "rclone",
},
}
sanity.GinkgoTest(sanityCfg)
})

View file

@ -60,22 +60,20 @@ func waitForMount(path string, timeout time.Duration) error {
}
}
func findFuseMountProcess(path string, name string) (*os.Process, error) {
func findFuseMountProcess(path string) (*os.Process, error) {
processes, err := ps.Processes()
if err != nil {
return nil, err
}
for _, p := range processes {
if strings.Contains(p.Executable(), name) {
cmdLine, err := getCmdLine(p.Pid())
if err != nil {
glog.Errorf("Unable to get cmdline of PID %v: %s", p.Pid(), err)
continue
}
if strings.Contains(cmdLine, path) {
glog.Infof("Found matching pid %v on path %s", p.Pid(), path)
return os.FindProcess(p.Pid())
}
cmdLine, err := getCmdLine(p.Pid())
if err != nil {
glog.Errorf("Unable to get cmdline of PID %v: %s", p.Pid(), err)
continue
}
if strings.Contains(cmdLine, path) {
glog.Infof("Found matching pid %v on path %s", p.Pid(), path)
return os.FindProcess(p.Pid())
}
}
return nil, nil