refactoring to latest versions

This commit is contained in:
Ulrich Daniele, B2B-BAN-ABP-CEB 2023-03-31 23:05:26 +02:00
parent 33d7e1b2b5
commit 20b2b5f078
9 changed files with 2083 additions and 86 deletions

View file

@ -1,4 +1,4 @@
FROM golang:1.16-alpine as gobuild
FROM golang:1.19-alpine as gobuild
WORKDIR /build
ADD go.mod go.sum /build/
@ -8,10 +8,13 @@ ADD pkg /build/pkg
RUN go get -d -v ./...
RUN CGO_ENABLED=0 GOOS=linux go build -a -ldflags '-extldflags "-static"' -o ./s3driver ./cmd/s3driver
FROM alpine:3.16
FROM alpine:3.17
LABEL maintainers="Vitaliy Filippov <vitalif@yourcmc.ru>"
LABEL description="csi-s3 slim image"
RUN apk update
RUN apk upgrade
# apk add temporarily broken:
#ERROR: unable to select packages:
# so:libcrypto.so.3 (no such package):

40
go.mod
View file

@ -3,27 +3,27 @@ module github.com/yandex-cloud/k8s-csi-s3
go 1.15
require (
github.com/container-storage-interface/spec v1.1.0
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/godbus/dbus/v5 v5.0.4 // indirect
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/protobuf v1.1.0 // indirect
github.com/kubernetes-csi/csi-lib-utils v0.6.1 // indirect
github.com/container-storage-interface/spec v1.8.0
github.com/coreos/go-systemd/v22 v22.5.0
github.com/godbus/dbus/v5 v5.1.0
github.com/golang/glog v1.1.1
github.com/klauspost/compress v1.16.3 // indirect
github.com/klauspost/cpuid v1.3.1 // indirect
github.com/kubernetes-csi/csi-lib-utils v0.13.0 // indirect
github.com/kubernetes-csi/csi-test v2.0.0+incompatible
github.com/kubernetes-csi/drivers v1.0.2
github.com/minio/minio-go/v7 v7.0.5
github.com/mitchellh/go-ps v0.0.0-20170309133038-4fdf99ab2936
github.com/onsi/ginkgo v1.5.0
github.com/onsi/gomega v1.4.0
github.com/spf13/afero v1.2.1 // indirect
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a // indirect
golang.org/x/net v0.0.0-20200707034311-ab3426394381
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 // indirect
golang.org/x/sys v0.0.0-20200922070232-aee5d888a860 // indirect
google.golang.org/genproto v0.0.0-20180716172848-2731d4fa720b // indirect
google.golang.org/grpc v1.13.0
k8s.io/apimachinery v0.0.0-20180714051327-705cfa51a97f // indirect
k8s.io/klog v0.2.0 // indirect
github.com/minio/minio-go/v7 v7.0.50
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/go-ps v1.0.0
github.com/onsi/ginkgo v1.16.4
github.com/onsi/gomega v1.23.0
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/net v0.8.0
google.golang.org/genproto v0.0.0-20230330200707-38013875ee22 // indirect
google.golang.org/grpc v1.54.0
k8s.io/apimachinery v0.26.3 // indirect
k8s.io/klog v1.0.0 // indirect
k8s.io/kubernetes v1.13.4
k8s.io/utils v0.0.0-20180703210027-ab9069044f32 // indirect
k8s.io/utils v0.0.0-20230313181309-38a27ef9d749 // indirect
)

1740
go.sum

File diff suppressed because it is too large Load diff

View file

@ -32,14 +32,13 @@ import (
"google.golang.org/grpc/status"
"github.com/container-storage-interface/spec/lib/go/csi"
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
)
type controllerServer struct {
*csicommon.DefaultControllerServer
type ControllerServer struct {
Driver *driver
}
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
params := req.GetParameters()
capacityBytes := int64(req.GetCapacityRange().GetRequiredBytes())
volumeID := sanitizeVolumeID(req.GetName())
@ -53,10 +52,10 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
volumeID = path.Join(bucketName, prefix)
}
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
glog.V(3).Infof("invalid create volume req: %v", req)
return nil, err
}
//if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
// glog.V(3).Infof("invalid create volume req: %v", req)
// return nil, err
//}
// Check arguments
if len(volumeID) == 0 {
@ -105,7 +104,21 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
}, nil
}
func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
func (cs *ControllerServer) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
volumeID := req.GetVolumeId()
bucketName, prefix := volumeIDToBucketPrefix(volumeID)
@ -114,10 +127,11 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
glog.V(3).Infof("Invalid delete volume req: %v", req)
return nil, err
}
//if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
// glog.V(3).Infof("Invalid delete volume req: %v", req)
// return nil, err
//}
glog.V(4).Infof("Deleting volume %s", volumeID)
client, err := s3.NewClientFromSecret(req.GetSecrets())
@ -146,7 +160,7 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return &csi.DeleteVolumeResponse{}, nil
}
func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
// Check arguments
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
@ -191,7 +205,7 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
}, nil
}
func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
return &csi.ControllerExpandVolumeResponse{}, status.Error(codes.Unimplemented, "ControllerExpandVolume is not implemented")
}
@ -217,3 +231,31 @@ func volumeIDToBucketPrefix(volumeID string) (string, string) {
return volumeID, ""
}
func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
glog.V(5).Infof("Using default ControllerGetCapabilities")
return &csi.ControllerGetCapabilitiesResponse{
Capabilities: cs.Driver.cscap,
}, nil
}
func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (cs *ControllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (cs *ControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

View file

@ -19,17 +19,20 @@ package driver
import (
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
)
type driver struct {
driver *csicommon.CSIDriver
ids *IdentityServer
cs *ControllerServer
name string
nodeID string
version string
endpoint string
ids *identityServer
ns *nodeServer
cs *controllerServer
ns *NodeServer
cap []*csi.VolumeCapability_AccessMode
cscap []*csi.ControllerServiceCapability
}
var (
@ -39,50 +42,58 @@ var (
// New initializes the driver
func New(nodeID string, endpoint string) (*driver, error) {
d := csicommon.NewCSIDriver(driverName, vendorVersion, nodeID)
if d == nil {
glog.Fatalln("Failed to initialize CSI Driver.")
}
glog.Infof("Driver: %v version: %v", driverName, vendorVersion)
s3Driver := &driver{
d := &driver{
name: driverName,
version: vendorVersion,
nodeID: nodeID,
endpoint: endpoint,
driver: d,
}
return s3Driver, nil
return d, nil
}
func (s3 *driver) newIdentityServer(d *csicommon.CSIDriver) *identityServer {
return &identityServer{
DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d),
func (d *driver) AddVolumeCapabilityAccessModes(vc []csi.VolumeCapability_AccessMode_Mode) []*csi.VolumeCapability_AccessMode {
var vca []*csi.VolumeCapability_AccessMode
for _, c := range vc {
glog.Infof("Enabling volume access mode: %v", c.String())
vca = append(vca, &csi.VolumeCapability_AccessMode{Mode: c})
}
d.cap = vca
return vca
}
func (s3 *driver) newControllerServer(d *csicommon.CSIDriver) *controllerServer {
return &controllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
}
}
func (d *driver) AddControllerServiceCapabilities(cl []csi.ControllerServiceCapability_RPC_Type) {
var csc []*csi.ControllerServiceCapability
func (s3 *driver) newNodeServer(d *csicommon.CSIDriver) *nodeServer {
return &nodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d),
for _, c := range cl {
glog.Infof("Enabling controller service capability: %v", c.String())
csc = append(csc, NewControllerServiceCapability(c))
}
d.cscap = csc
return
}
func (s3 *driver) Run() {
glog.Infof("Driver: %v ", driverName)
glog.Infof("Version: %v ", vendorVersion)
// Initialize default library driver
s3.driver.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME})
s3.driver.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER})
s3.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME})
s3.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER})
// Create GRPC servers
s3.ids = s3.newIdentityServer(s3.driver)
s3.ns = s3.newNodeServer(s3.driver)
s3.cs = s3.newControllerServer(s3.driver)
s := csicommon.NewNonBlockingGRPCServer()
s.Start(s3.endpoint, s3.ids, s3.cs, s3.ns)
s3.ids = NewDefaultIdentityServer(s3)
s3.ns = NewNodeServer(s3)
s3.cs = NewControllerServer(s3)
s := NewNonBlockingGRPCServer()
s.Start(s3.endpoint,
s3.ids,
s3.cs,
s3.ns)
s.Wait()
}

View file

@ -17,9 +17,49 @@ limitations under the License.
package driver
import (
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type identityServer struct {
*csicommon.DefaultIdentityServer
type IdentityServer struct {
Driver *driver
}
func (ids *IdentityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
glog.V(5).Infof("Using default GetPluginInfo")
if ids.Driver.name == "" {
return nil, status.Error(codes.Unavailable, "Driver name not configured")
}
if ids.Driver.version == "" {
return nil, status.Error(codes.Unavailable, "Driver is missing version")
}
return &csi.GetPluginInfoResponse{
Name: ids.Driver.name,
VendorVersion: ids.Driver.version,
}, nil
}
func (ids *IdentityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
return &csi.ProbeResponse{}, nil
}
func (ids *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
glog.V(5).Infof("Using default capabilities")
return &csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
},
},
},
},
}, nil
}

View file

@ -32,12 +32,10 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/kubernetes/pkg/util/mount"
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
)
type nodeServer struct {
*csicommon.DefaultNodeServer
type NodeServer struct {
Driver *driver
}
func getMeta(bucketName, prefix string, context map[string]string) *s3.FSMeta {
@ -65,7 +63,7 @@ func getMeta(bucketName, prefix string, context map[string]string) *s3.FSMeta {
}
}
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()
stagingTargetPath := req.GetStagingTargetPath()
@ -113,7 +111,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return &csi.NodePublishVolumeResponse{}, nil
}
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()
@ -133,7 +131,7 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return &csi.NodeUnpublishVolumeResponse{}, nil
}
func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
volumeID := req.GetVolumeId()
stagingTargetPath := req.GetStagingTargetPath()
bucketName, prefix := volumeIDToBucketPrefix(volumeID)
@ -175,7 +173,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return &csi.NodeStageVolumeResponse{}, nil
}
func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
volumeID := req.GetVolumeId()
stagingTargetPath := req.GetStagingTargetPath()
@ -207,7 +205,7 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
}
// NodeGetCapabilities returns the supported capabilities of the node server
func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
// currently there is a single NodeServer capability according to the spec
nscap := &csi.NodeServiceCapability{
Type: &csi.NodeServiceCapability_Rpc{
@ -224,7 +222,7 @@ func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetC
}, nil
}
func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
return &csi.NodeExpandVolumeResponse{}, status.Error(codes.Unimplemented, "NodeExpandVolume is not implemented")
}
@ -242,3 +240,16 @@ func checkMount(targetPath string) (bool, error) {
}
return notMnt, nil
}
func (ns *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
glog.V(5).Infof("Using default NodeGetInfo")
return &csi.NodeGetInfoResponse{
NodeId: ns.Driver.nodeID,
}, nil
}
func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

96
pkg/driver/server.go Normal file
View file

@ -0,0 +1,96 @@
package driver
import (
"net"
"os"
"sync"
"github.com/golang/glog"
"google.golang.org/grpc"
"github.com/container-storage-interface/spec/lib/go/csi"
)
// Defines Non blocking GRPC server interfaces
type NonBlockingGRPCServer interface {
// Start services at the endpoint
Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer)
// Waits for the service to stop
Wait()
// Stops the service gracefully
Stop()
// Stops the service forcefully
ForceStop()
}
func NewNonBlockingGRPCServer() NonBlockingGRPCServer {
return &nonBlockingGRPCServer{}
}
// NonBlocking server
type nonBlockingGRPCServer struct {
wg sync.WaitGroup
server *grpc.Server
}
func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
s.wg.Add(1)
go s.serve(endpoint, ids, cs, ns)
return
}
func (s *nonBlockingGRPCServer) Wait() {
s.wg.Wait()
}
func (s *nonBlockingGRPCServer) Stop() {
s.server.GracefulStop()
}
func (s *nonBlockingGRPCServer) ForceStop() {
s.server.Stop()
}
func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
proto, addr, err := ParseEndpoint(endpoint)
if err != nil {
glog.Fatal(err.Error())
}
if proto == "unix" {
addr = "/" + addr
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
glog.Fatalf("Failed to remove %s, error: %s", addr, err.Error())
}
}
listener, err := net.Listen(proto, addr)
if err != nil {
glog.Fatalf("Failed to listen: %v", err)
}
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(logGRPC),
}
server := grpc.NewServer(opts...)
s.server = server
if ids != nil {
csi.RegisterIdentityServer(server, ids)
}
if cs != nil {
csi.RegisterControllerServer(server, cs)
}
if ns != nil {
csi.RegisterNodeServer(server, ns)
}
glog.Infof("Listening for connections on address: %#v", listener.Addr())
server.Serve(listener)
}

62
pkg/driver/utils.go Normal file
View file

@ -0,0 +1,62 @@
package driver
import (
"fmt"
"strings"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
func NewNodeServer(d *driver) *NodeServer {
return &NodeServer{
Driver: d,
}
}
func NewDefaultIdentityServer(d *driver) *IdentityServer {
return &IdentityServer{
Driver: d,
}
}
func NewControllerServer(d *driver) *ControllerServer {
return &ControllerServer{
Driver: d,
}
}
func NewControllerServiceCapability(cap csi.ControllerServiceCapability_RPC_Type) *csi.ControllerServiceCapability {
return &csi.ControllerServiceCapability{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: cap,
},
},
}
}
func ParseEndpoint(ep string) (string, string, error) {
if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") {
s := strings.SplitN(ep, "://", 2)
if s[1] != "" {
return s[0], s[1], nil
}
}
return "", "", fmt.Errorf("Invalid endpoint: %v", ep)
}
func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
glog.V(3).Infof("GRPC call: %s", info.FullMethod)
glog.V(5).Infof("GRPC request: %s", protosanitizer.StripSecrets(req))
resp, err := handler(ctx, req)
if err != nil {
glog.Errorf("GRPC error: %v", err)
} else {
glog.V(5).Infof("GRPC response: %s", protosanitizer.StripSecrets(resp))
}
return resp, err
}