fix(envoy): version not handled properly
This commit is contained in:
parent
0fccef973f
commit
867daaa375
9 changed files with 198 additions and 182 deletions
|
@ -25,6 +25,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
|
||||
clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3"
|
||||
discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
|
||||
endpointservice "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3"
|
||||
|
@ -33,11 +34,11 @@ import (
|
|||
runtimeservice "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3"
|
||||
secretservice "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3"
|
||||
cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
|
||||
"github.com/envoyproxy/go-control-plane/pkg/log"
|
||||
"github.com/envoyproxy/go-control-plane/pkg/server/v3"
|
||||
"google.golang.org/grpc/credentials"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
|
||||
"google.golang.org/grpc"
|
||||
grpchealth "google.golang.org/grpc/health"
|
||||
"google.golang.org/grpc/health/grpc_health_v1"
|
||||
|
@ -79,7 +80,7 @@ type controlPlane struct {
|
|||
Namespace string `name:"namespace" env:"CONTROL_PLANE_NAMESPACE" default:"" required:"" help:"Namespace where the controller is running, ideally set via downward API"`
|
||||
}
|
||||
|
||||
func (cp *controlPlane) Run(ctx context.Context) error {
|
||||
func (cp *controlPlane) Run(ctx context.Context, logger logr.Logger) error {
|
||||
var tlsOpts []func(*tls.Config)
|
||||
|
||||
// if the enable-http2 flag is false (the default), http/2 should be disabled
|
||||
|
@ -133,14 +134,15 @@ func (cp *controlPlane) Run(ctx context.Context) error {
|
|||
return fmt.Errorf("unable to start control plane: %w", err)
|
||||
}
|
||||
|
||||
envoySnapshotCache := cachev3.NewSnapshotCache(false, cachev3.IDHash{}, nil)
|
||||
cacheLoggerInst := cacheLogger(logger.WithName("envoy-snapshot-cache"))
|
||||
envoySnapshotCache := cachev3.NewSnapshotCache(true, cachev3.IDHash{}, cacheLoggerInst)
|
||||
|
||||
serverCert, err := cp.ensureControlPlaneTlsCert(ctx, bootstrapClient)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to ensure control plane TLS cert: %w", err)
|
||||
}
|
||||
|
||||
envoySrv, err := cp.envoyServer(ctx, envoySnapshotCache, serverCert)
|
||||
envoySrv, err := cp.envoyServer(ctx, logger, envoySnapshotCache, serverCert)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -188,6 +190,7 @@ func (cp *controlPlane) AfterApply() (err error) {
|
|||
|
||||
func (cp *controlPlane) envoyServer(
|
||||
ctx context.Context,
|
||||
logger logr.Logger,
|
||||
cache cachev3.SnapshotCache,
|
||||
serverCert tls.Certificate,
|
||||
) (runnable mgr.Runnable, err error) {
|
||||
|
@ -197,11 +200,8 @@ func (cp *controlPlane) envoyServer(
|
|||
grpcKeepaliveMinTime = 30 * time.Second
|
||||
grpcMaxConcurrentStreams = 1000000
|
||||
)
|
||||
|
||||
var (
|
||||
logger = ctrl.Log.WithName("control-plane")
|
||||
srv = server.NewServer(ctx, cache, nil)
|
||||
)
|
||||
srv := server.NewServer(ctx, cache, xdsServerCallbacks(logger))
|
||||
logger = logger.WithName("control-plane")
|
||||
|
||||
// gRPC golang library sets a very small upper bound for the number gRPC/h2
|
||||
// streams over a single TCP connection. If a proxy multiplexes requests over
|
||||
|
@ -214,10 +214,6 @@ func (cp *controlPlane) envoyServer(
|
|||
return nil, fmt.Errorf("failed to create TLS config: %w", err)
|
||||
}
|
||||
|
||||
loggingOpts := []logging.Option{
|
||||
logging.WithLogOnEvents(logging.StartCall, logging.FinishCall),
|
||||
}
|
||||
|
||||
grpcOptions := append(make([]grpc.ServerOption, 0, 4),
|
||||
grpc.Creds(credentials.NewTLS(tlsCfg)),
|
||||
grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams),
|
||||
|
@ -229,12 +225,6 @@ func (cp *controlPlane) envoyServer(
|
|||
MinTime: grpcKeepaliveMinTime,
|
||||
PermitWithoutStream: true,
|
||||
}),
|
||||
grpc.ChainUnaryInterceptor(
|
||||
logging.UnaryServerInterceptor(InterceptorLogger(ctrl.Log), loggingOpts...),
|
||||
),
|
||||
grpc.ChainStreamInterceptor(
|
||||
logging.StreamServerInterceptor(InterceptorLogger(ctrl.Log), loggingOpts...),
|
||||
),
|
||||
)
|
||||
grpcServer := grpc.NewServer(grpcOptions...)
|
||||
|
||||
|
@ -268,7 +258,10 @@ func (cp *controlPlane) envoyServer(
|
|||
}), nil
|
||||
}
|
||||
|
||||
func (cp *controlPlane) ensureControlPlaneTlsCert(ctx context.Context, k8sClient client.Client) (tls.Certificate, error) {
|
||||
func (cp *controlPlane) ensureControlPlaneTlsCert(
|
||||
ctx context.Context,
|
||||
k8sClient client.Client,
|
||||
) (tls.Certificate, error) {
|
||||
var (
|
||||
controlPlaneServerCert = &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
|
@ -347,10 +340,86 @@ func (cp *controlPlane) tlsConfig(serverCert tls.Certificate) (*tls.Config, erro
|
|||
return tlsCfg, nil
|
||||
}
|
||||
|
||||
// InterceptorLogger adapts slog logger to interceptor logger.
|
||||
// This code is simple enough to be copied and not imported.
|
||||
func InterceptorLogger(l logr.Logger) logging.Logger {
|
||||
return logging.LoggerFunc(func(ctx context.Context, lvl logging.Level, msg string, fields ...any) {
|
||||
l.Info(msg, fields...)
|
||||
})
|
||||
func xdsServerCallbacks(logger logr.Logger) server.Callbacks {
|
||||
return server.CallbackFuncs{
|
||||
StreamOpenFunc: func(ctx context.Context, streamId int64, nodeId string) error {
|
||||
logger.Info("Stream opened", "stream-id", streamId, "node-id", nodeId)
|
||||
return nil
|
||||
},
|
||||
StreamClosedFunc: func(streamId int64, node *corev3.Node) {
|
||||
logger.Info("Stream closed", "stream-id", streamId,
|
||||
"node.id", node.Id,
|
||||
"node.cluster", node.Cluster,
|
||||
)
|
||||
},
|
||||
StreamRequestFunc: func(streamId int64, request *discoverygrpc.DiscoveryRequest) error {
|
||||
logger.Info("Stream request",
|
||||
"stream-id", streamId,
|
||||
"request.node.id", request.Node.Id,
|
||||
"request.node.cluster", request.Node.Cluster,
|
||||
"request.version", request.VersionInfo,
|
||||
"request.error", request.ErrorDetail,
|
||||
)
|
||||
return nil
|
||||
},
|
||||
StreamResponseFunc: func(
|
||||
ctx context.Context,
|
||||
streamId int64,
|
||||
request *discoverygrpc.DiscoveryRequest,
|
||||
response *discoverygrpc.DiscoveryResponse,
|
||||
) {
|
||||
logger.Info("Stream delta response",
|
||||
"stream-id", streamId,
|
||||
"request.node.id", request.Node.Id,
|
||||
"request.node.cluster", request.Node.Cluster,
|
||||
)
|
||||
},
|
||||
DeltaStreamOpenFunc: func(ctx context.Context, streamId int64, nodeId string) error {
|
||||
logger.Info("Delta stream opened", "stream-id", streamId, "node-id", nodeId)
|
||||
return nil
|
||||
},
|
||||
|
||||
DeltaStreamClosedFunc: func(streamId int64, node *corev3.Node) {
|
||||
logger.Info("Delta stream closed",
|
||||
"stream-id", streamId,
|
||||
"node.id", node.Id,
|
||||
"node.cluster", node.Cluster,
|
||||
)
|
||||
},
|
||||
StreamDeltaRequestFunc: func(i int64, request *discoverygrpc.DeltaDiscoveryRequest) error {
|
||||
logger.Info("Stream delta request",
|
||||
"stream-id", i,
|
||||
"request.node.id", request.Node.Id,
|
||||
"request.node.cluster", request.Node.Cluster,
|
||||
"request.error", request.ErrorDetail,
|
||||
)
|
||||
return nil
|
||||
},
|
||||
StreamDeltaResponseFunc: func(
|
||||
i int64,
|
||||
request *discoverygrpc.DeltaDiscoveryRequest,
|
||||
response *discoverygrpc.DeltaDiscoveryResponse,
|
||||
) {
|
||||
logger.Info("Stream delta response",
|
||||
"stream-id", i,
|
||||
"request.node", request.Node,
|
||||
"response.resources", response.Resources,
|
||||
)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func cacheLogger(logger logr.Logger) log.Logger {
|
||||
wrapper := func(delegate func(msg string, keysAndValues ...any)) func(string, ...any) {
|
||||
return func(s string, i ...any) {
|
||||
delegate(fmt.Sprintf(s, i...))
|
||||
}
|
||||
}
|
||||
|
||||
return log.LoggerFuncs{
|
||||
DebugFunc: nil, // enable for debug info
|
||||
InfoFunc: wrapper(logger.Info),
|
||||
WarnFunc: wrapper(logger.Info),
|
||||
ErrorFunc: wrapper(logger.Info),
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue