Peter Kurfer
647f602c79
- added Core CRD to manage DB migrations & configuration, PostgREST and GoTrue (auth) - added APIGateway CRD to manage Envoy proxy - added Dashboard CRD to manage (so far) pg-meta and (soon) studio deployments - implemented basic Envoy control plane based on K8s watcher
90 lines
2.4 KiB
Go
90 lines
2.4 KiB
Go
package controlplane
|
|
|
|
import (
|
|
"time"
|
|
|
|
clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
|
|
corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
|
|
endpointv3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
|
|
"google.golang.org/protobuf/types/known/durationpb"
|
|
discoveryv1 "k8s.io/api/discovery/v1"
|
|
)
|
|
|
|
type ServiceCluster struct {
|
|
ServiceEndpoints map[string]Endpoints
|
|
}
|
|
|
|
func (c *ServiceCluster) AddOrUpdateEndpoints(eps *discoveryv1.EndpointSlice) {
|
|
if c.ServiceEndpoints == nil {
|
|
c.ServiceEndpoints = make(map[string]Endpoints)
|
|
}
|
|
|
|
c.ServiceEndpoints[eps.Name] = newEndpointsFromSlice(eps)
|
|
}
|
|
|
|
func (c ServiceCluster) Cluster(name string, port uint32) *clusterv3.Cluster {
|
|
return &clusterv3.Cluster{
|
|
Name: name,
|
|
ConnectTimeout: durationpb.New(5 * time.Second),
|
|
ClusterDiscoveryType: &clusterv3.Cluster_Type{Type: clusterv3.Cluster_STATIC},
|
|
LbPolicy: clusterv3.Cluster_ROUND_ROBIN,
|
|
LoadAssignment: &endpointv3.ClusterLoadAssignment{
|
|
ClusterName: name,
|
|
Endpoints: c.endpoints(port),
|
|
},
|
|
}
|
|
}
|
|
|
|
func (c ServiceCluster) endpoints(port uint32) []*endpointv3.LocalityLbEndpoints {
|
|
eps := make([]*endpointv3.LocalityLbEndpoints, 0, len(c.ServiceEndpoints))
|
|
|
|
for _, sep := range c.ServiceEndpoints {
|
|
eps = append(eps, &endpointv3.LocalityLbEndpoints{
|
|
LbEndpoints: sep.LBEndpoints(port),
|
|
})
|
|
}
|
|
|
|
return eps
|
|
}
|
|
|
|
func newEndpointsFromSlice(eps *discoveryv1.EndpointSlice) Endpoints {
|
|
var result Endpoints
|
|
|
|
for _, ep := range eps.Endpoints {
|
|
if ep.Conditions.Ready != nil && *ep.Conditions.Ready {
|
|
result.Addresses = append(result.Addresses, ep.Addresses...)
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
type Endpoints struct {
|
|
Addresses []string
|
|
}
|
|
|
|
func (e Endpoints) LBEndpoints(port uint32) []*endpointv3.LbEndpoint {
|
|
endpoints := make([]*endpointv3.LbEndpoint, 0, len(e.Addresses))
|
|
|
|
for _, ep := range e.Addresses {
|
|
endpoints = append(endpoints, &endpointv3.LbEndpoint{
|
|
HostIdentifier: &endpointv3.LbEndpoint_Endpoint{
|
|
Endpoint: &endpointv3.Endpoint{
|
|
Address: &corev3.Address{
|
|
Address: &corev3.Address_SocketAddress{
|
|
SocketAddress: &corev3.SocketAddress{
|
|
Address: ep,
|
|
Protocol: corev3.SocketAddress_TCP,
|
|
PortSpecifier: &corev3.SocketAddress_PortValue{
|
|
PortValue: port,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
return endpoints
|
|
}
|