This commit is contained in:
Daniele Ulrich, B2B-BAN-ABP-CEB 2023-03-31 23:20:49 +02:00
parent f4d01e12c7
commit c9bb189353
13 changed files with 3106 additions and 107 deletions

View file

@ -1,4 +1,4 @@
FROM golang:1.16-alpine as gobuild
FROM golang:1.19-alpine as gobuild
WORKDIR /build
ADD go.mod go.sum /build/
@ -8,10 +8,13 @@ ADD pkg /build/pkg
RUN go get -d -v ./...
RUN CGO_ENABLED=0 GOOS=linux go build -a -ldflags '-extldflags "-static"' -o ./s3driver ./cmd/s3driver
FROM alpine:3.16
FROM alpine:3.17
LABEL maintainers="Vitaliy Filippov <vitalif@yourcmc.ru>"
LABEL description="csi-s3 slim image"
RUN apk update
RUN apk upgrade
# apk add temporarily broken:
#ERROR: unable to select packages:
# so:libcrypto.so.3 (no such package):

View file

@ -4,9 +4,9 @@ metadata:
namespace: kube-system
name: csi-s3-secret
stringData:
accessKeyID: YOUR_ACCESS_KEY_ID
secretAccessKey: YOUR_SECRET_ACCESS_KEY
accessKeyID: AKIA9BBFD8B80EDE95981AE0C110BEZ10
secretAccessKey: 2djRg7pgU35oxWgwiM2CVD8fmTMwcX7HTOmq5ptW
# For AWS set it to "https://s3.<region>.amazonaws.com", for example https://s3.eu-central-1.amazonaws.com
endpoint: https://storage.yandexcloud.net
endpoint: https://ds11s3.swisscom.com
# For AWS set it to AWS region
#region: ""

40
go.mod
View file

@ -3,27 +3,27 @@ module github.com/yandex-cloud/k8s-csi-s3
go 1.15
require (
github.com/container-storage-interface/spec v1.1.0
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/godbus/dbus/v5 v5.0.4 // indirect
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/protobuf v1.1.0 // indirect
github.com/kubernetes-csi/csi-lib-utils v0.6.1 // indirect
github.com/container-storage-interface/spec v1.8.0
github.com/coreos/go-systemd/v22 v22.5.0
github.com/godbus/dbus/v5 v5.1.0
github.com/golang/glog v1.1.1
github.com/klauspost/compress v1.16.3 // indirect
github.com/klauspost/cpuid v1.3.1 // indirect
github.com/kubernetes-csi/csi-lib-utils v0.13.0 // indirect
github.com/kubernetes-csi/csi-test v2.0.0+incompatible
github.com/kubernetes-csi/drivers v1.0.2
github.com/minio/minio-go/v7 v7.0.5
github.com/mitchellh/go-ps v0.0.0-20170309133038-4fdf99ab2936
github.com/onsi/ginkgo v1.5.0
github.com/onsi/gomega v1.4.0
github.com/spf13/afero v1.2.1 // indirect
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a // indirect
golang.org/x/net v0.0.0-20200707034311-ab3426394381
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 // indirect
golang.org/x/sys v0.0.0-20200922070232-aee5d888a860 // indirect
google.golang.org/genproto v0.0.0-20180716172848-2731d4fa720b // indirect
google.golang.org/grpc v1.13.0
k8s.io/apimachinery v0.0.0-20180714051327-705cfa51a97f // indirect
k8s.io/klog v0.2.0 // indirect
github.com/minio/minio-go/v7 v7.0.50
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/go-ps v1.0.0
github.com/onsi/ginkgo v1.16.4
github.com/onsi/gomega v1.23.0
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/net v0.8.0
google.golang.org/genproto v0.0.0-20230330200707-38013875ee22 // indirect
google.golang.org/grpc v1.54.0
k8s.io/apimachinery v0.26.3 // indirect
k8s.io/klog v1.0.0 // indirect
k8s.io/kubernetes v1.13.4
k8s.io/utils v0.0.0-20180703210027-ab9069044f32 // indirect
k8s.io/utils v0.0.0-20230313181309-38a27ef9d749 // indirect
)

1740
go.sum

File diff suppressed because it is too large Load diff

20
k8s-csi-s3.code-workspace Normal file
View file

@ -0,0 +1,20 @@
{
"folders": [
{
"path": "."
},
{
"path": "../s3-provisioner"
},
{
"path": "../camunda-platform-8"
},
{
"path": "../csi-driver-iscsi"
},
{
"path": "../geesefs"
}
],
"settings": {}
}

1000
log.txt Normal file

File diff suppressed because it is too large Load diff

View file

@ -23,23 +23,20 @@ import (
"io"
"path"
"strings"
"github.com/yandex-cloud/k8s-csi-s3/pkg/mounter"
"github.com/yandex-cloud/k8s-csi-s3/pkg/s3"
"github.com/golang/glog"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/container-storage-interface/spec/lib/go/csi"
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
)
type controllerServer struct {
*csicommon.DefaultControllerServer
type ControllerServer struct {
Driver *driver
}
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
params := req.GetParameters()
capacityBytes := int64(req.GetCapacityRange().GetRequiredBytes())
volumeID := sanitizeVolumeID(req.GetName())
@ -53,10 +50,10 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
volumeID = path.Join(bucketName, prefix)
}
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
glog.V(3).Infof("invalid create volume req: %v", req)
return nil, err
}
//if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
// glog.V(3).Infof("invalid create volume req: %v", req)
// return nil, err
//}
// Check arguments
if len(volumeID) == 0 {
@ -105,7 +102,21 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
}, nil
}
func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
func (cs *ControllerServer) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
volumeID := req.GetVolumeId()
bucketName, prefix := volumeIDToBucketPrefix(volumeID)
@ -114,10 +125,11 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
glog.V(3).Infof("Invalid delete volume req: %v", req)
return nil, err
}
//if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
// glog.V(3).Infof("Invalid delete volume req: %v", req)
// return nil, err
//}
glog.V(4).Infof("Deleting volume %s", volumeID)
client, err := s3.NewClientFromSecret(req.GetSecrets())
@ -146,7 +158,7 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return &csi.DeleteVolumeResponse{}, nil
}
func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
// Check arguments
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
@ -191,7 +203,7 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
}, nil
}
func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
return &csi.ControllerExpandVolumeResponse{}, status.Error(codes.Unimplemented, "ControllerExpandVolume is not implemented")
}
@ -217,3 +229,31 @@ func volumeIDToBucketPrefix(volumeID string) (string, string) {
return volumeID, ""
}
func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
glog.V(5).Infof("Using default ControllerGetCapabilities")
return &csi.ControllerGetCapabilitiesResponse{
Capabilities: cs.Driver.cscap,
}, nil
}
func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (cs *ControllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (cs *ControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

View file

@ -19,17 +19,20 @@ package driver
import (
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
)
type driver struct {
driver *csicommon.CSIDriver
ids *IdentityServer
cs *ControllerServer
name string
nodeID string
version string
endpoint string
ids *identityServer
ns *nodeServer
cs *controllerServer
ns *NodeServer
cap []*csi.VolumeCapability_AccessMode
cscap []*csi.ControllerServiceCapability
}
var (
@ -39,50 +42,58 @@ var (
// New initializes the driver
func New(nodeID string, endpoint string) (*driver, error) {
d := csicommon.NewCSIDriver(driverName, vendorVersion, nodeID)
if d == nil {
glog.Fatalln("Failed to initialize CSI Driver.")
}
glog.Infof("Driver: %v version: %v", driverName, vendorVersion)
s3Driver := &driver{
d := &driver{
name: driverName,
version: vendorVersion,
nodeID: nodeID,
endpoint: endpoint,
driver: d,
}
return s3Driver, nil
return d, nil
}
func (s3 *driver) newIdentityServer(d *csicommon.CSIDriver) *identityServer {
return &identityServer{
DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d),
func (d *driver) AddVolumeCapabilityAccessModes(vc []csi.VolumeCapability_AccessMode_Mode) []*csi.VolumeCapability_AccessMode {
var vca []*csi.VolumeCapability_AccessMode
for _, c := range vc {
glog.Infof("Enabling volume access mode: %v", c.String())
vca = append(vca, &csi.VolumeCapability_AccessMode{Mode: c})
}
d.cap = vca
return vca
}
func (s3 *driver) newControllerServer(d *csicommon.CSIDriver) *controllerServer {
return &controllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
}
}
func (d *driver) AddControllerServiceCapabilities(cl []csi.ControllerServiceCapability_RPC_Type) {
var csc []*csi.ControllerServiceCapability
func (s3 *driver) newNodeServer(d *csicommon.CSIDriver) *nodeServer {
return &nodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d),
for _, c := range cl {
glog.Infof("Enabling controller service capability: %v", c.String())
csc = append(csc, NewControllerServiceCapability(c))
}
d.cscap = csc
return
}
func (s3 *driver) Run() {
glog.Infof("Driver: %v ", driverName)
glog.Infof("Version: %v ", vendorVersion)
// Initialize default library driver
s3.driver.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME})
s3.driver.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER})
s3.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME})
s3.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER})
// Create GRPC servers
s3.ids = s3.newIdentityServer(s3.driver)
s3.ns = s3.newNodeServer(s3.driver)
s3.cs = s3.newControllerServer(s3.driver)
s := csicommon.NewNonBlockingGRPCServer()
s.Start(s3.endpoint, s3.ids, s3.cs, s3.ns)
s3.ids = NewDefaultIdentityServer(s3)
s3.ns = NewNodeServer(s3)
s3.cs = NewControllerServer(s3)
s := NewNonBlockingGRPCServer()
s.Start(s3.endpoint,
s3.ids,
s3.cs,
s3.ns)
s.Wait()
}

View file

@ -1,25 +1,49 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package driver
import (
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type identityServer struct {
*csicommon.DefaultIdentityServer
type IdentityServer struct {
Driver *driver
}
func (ids *IdentityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
glog.V(5).Infof("Using default GetPluginInfo")
if ids.Driver.name == "" {
return nil, status.Error(codes.Unavailable, "Driver name not configured")
}
if ids.Driver.version == "" {
return nil, status.Error(codes.Unavailable, "Driver is missing version")
}
return &csi.GetPluginInfoResponse{
Name: ids.Driver.name,
VendorVersion: ids.Driver.version,
}, nil
}
func (ids *IdentityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
return &csi.ProbeResponse{}, nil
}
func (ids *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
glog.V(5).Infof("Using default capabilities")
return &csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
},
},
},
},
}, nil
}

View file

@ -32,12 +32,10 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/kubernetes/pkg/util/mount"
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
)
type nodeServer struct {
*csicommon.DefaultNodeServer
type NodeServer struct {
Driver *driver
}
func getMeta(bucketName, prefix string, context map[string]string) *s3.FSMeta {
@ -65,7 +63,7 @@ func getMeta(bucketName, prefix string, context map[string]string) *s3.FSMeta {
}
}
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()
stagingTargetPath := req.GetStagingTargetPath()
@ -113,7 +111,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return &csi.NodePublishVolumeResponse{}, nil
}
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()
@ -133,7 +131,7 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return &csi.NodeUnpublishVolumeResponse{}, nil
}
func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
volumeID := req.GetVolumeId()
stagingTargetPath := req.GetStagingTargetPath()
bucketName, prefix := volumeIDToBucketPrefix(volumeID)
@ -175,7 +173,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return &csi.NodeStageVolumeResponse{}, nil
}
func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
volumeID := req.GetVolumeId()
stagingTargetPath := req.GetStagingTargetPath()
@ -207,7 +205,7 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
}
// NodeGetCapabilities returns the supported capabilities of the node server
func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
// currently there is a single NodeServer capability according to the spec
nscap := &csi.NodeServiceCapability{
Type: &csi.NodeServiceCapability_Rpc{
@ -224,7 +222,7 @@ func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetC
}, nil
}
func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
return &csi.NodeExpandVolumeResponse{}, status.Error(codes.Unimplemented, "NodeExpandVolume is not implemented")
}
@ -242,3 +240,16 @@ func checkMount(targetPath string) (bool, error) {
}
return notMnt, nil
}
func (ns *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
glog.V(5).Infof("Using default NodeGetInfo")
return &csi.NodeGetInfoResponse{
NodeId: ns.Driver.nodeID,
}, nil
}
func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

96
pkg/driver/server.go Normal file
View file

@ -0,0 +1,96 @@
package driver
import (
"net"
"os"
"sync"
"github.com/golang/glog"
"google.golang.org/grpc"
"github.com/container-storage-interface/spec/lib/go/csi"
)
// Defines Non blocking GRPC server interfaces
type NonBlockingGRPCServer interface {
// Start services at the endpoint
Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer)
// Waits for the service to stop
Wait()
// Stops the service gracefully
Stop()
// Stops the service forcefully
ForceStop()
}
func NewNonBlockingGRPCServer() NonBlockingGRPCServer {
return &nonBlockingGRPCServer{}
}
// NonBlocking server
type nonBlockingGRPCServer struct {
wg sync.WaitGroup
server *grpc.Server
}
func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
s.wg.Add(1)
go s.serve(endpoint, ids, cs, ns)
return
}
func (s *nonBlockingGRPCServer) Wait() {
s.wg.Wait()
}
func (s *nonBlockingGRPCServer) Stop() {
s.server.GracefulStop()
}
func (s *nonBlockingGRPCServer) ForceStop() {
s.server.Stop()
}
func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
proto, addr, err := ParseEndpoint(endpoint)
if err != nil {
glog.Fatal(err.Error())
}
if proto == "unix" {
addr = "/" + addr
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
glog.Fatalf("Failed to remove %s, error: %s", addr, err.Error())
}
}
listener, err := net.Listen(proto, addr)
if err != nil {
glog.Fatalf("Failed to listen: %v", err)
}
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(logGRPC),
}
server := grpc.NewServer(opts...)
s.server = server
if ids != nil {
csi.RegisterIdentityServer(server, ids)
}
if cs != nil {
csi.RegisterControllerServer(server, cs)
}
if ns != nil {
csi.RegisterNodeServer(server, ns)
}
glog.Infof("Listening for connections on address: %#v", listener.Addr())
server.Serve(listener)
}

62
pkg/driver/utils.go Normal file
View file

@ -0,0 +1,62 @@
package driver
import (
"fmt"
"strings"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
func NewNodeServer(d *driver) *NodeServer {
return &NodeServer{
Driver: d,
}
}
func NewDefaultIdentityServer(d *driver) *IdentityServer {
return &IdentityServer{
Driver: d,
}
}
func NewControllerServer(d *driver) *ControllerServer {
return &ControllerServer{
Driver: d,
}
}
func NewControllerServiceCapability(cap csi.ControllerServiceCapability_RPC_Type) *csi.ControllerServiceCapability {
return &csi.ControllerServiceCapability{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: cap,
},
},
}
}
func ParseEndpoint(ep string) (string, string, error) {
if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") {
s := strings.SplitN(ep, "://", 2)
if s[1] != "" {
return s[0], s[1], nil
}
}
return "", "", fmt.Errorf("Invalid endpoint: %v", ep)
}
func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
glog.V(3).Infof("GRPC call: %s", info.FullMethod)
glog.V(5).Infof("GRPC request: %s", protosanitizer.StripSecrets(req))
resp, err := handler(ctx, req)
if err != nil {
glog.Errorf("GRPC error: %v", err)
} else {
glog.V(5).Infof("GRPC response: %s", protosanitizer.StripSecrets(resp))
}
return resp, err
}

BIN
s3driver Executable file

Binary file not shown.