Audit API prototype
- watch events interactively - pipe events to files - remove file subscriptions
This commit is contained in:
parent
dc4a9b18a3
commit
af31b1166a
19 changed files with 348 additions and 22 deletions
36
api/proto/internal/rpc/audit.proto
Normal file
36
api/proto/internal/rpc/audit.proto
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
option go_package = "gitlab.com/inetmock/inetmock/internal/rpc";
|
||||||
|
option java_multiple_files = true;
|
||||||
|
option java_package = "com.github.baez90.inetmock.rpc";
|
||||||
|
option java_outer_classname = "AuditProto";
|
||||||
|
|
||||||
|
import 'pkg/audit/event_entity.proto';
|
||||||
|
|
||||||
|
package inetmock.rpc;
|
||||||
|
|
||||||
|
message WatchEventsRequest {
|
||||||
|
string watcherName = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message RegisterFileSinkRequest {
|
||||||
|
string targetPath = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message RegisterFileSinkResponse {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
message RemoveFileSinkRequest {
|
||||||
|
string targetPath = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message RemoveFileSinkResponse {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
service Audit {
|
||||||
|
rpc WatchEvents (WatchEventsRequest) returns (stream inetmock.audit.EventEntity);
|
||||||
|
rpc RegisterFileSink (RegisterFileSinkRequest) returns (RegisterFileSinkResponse);
|
||||||
|
rpc RemoveFileSink (RemoveFileSinkRequest) returns (RemoveFileSinkResponse);
|
||||||
|
}
|
|
@ -7,11 +7,6 @@ option java_outer_classname = "EndpointsProto";
|
||||||
|
|
||||||
package inetmock.rpc;
|
package inetmock.rpc;
|
||||||
|
|
||||||
service Endpoints {
|
|
||||||
rpc GetEndpoints (GetEndpointsRequest) returns (GetEndpointsResponse) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
message GetEndpointsRequest {
|
message GetEndpointsRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,3 +21,8 @@ message Endpoint {
|
||||||
string listenAddress = 4;
|
string listenAddress = 4;
|
||||||
int32 port = 5;
|
int32 port = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
service Endpoints {
|
||||||
|
rpc GetEndpoints (GetEndpointsRequest) returns (GetEndpointsResponse) {
|
||||||
|
}
|
||||||
|
}
|
|
@ -29,6 +29,7 @@ var (
|
||||||
|
|
||||||
type App interface {
|
type App interface {
|
||||||
api.PluginContext
|
api.PluginContext
|
||||||
|
EventStream() audit.EventStream
|
||||||
Config() config.Config
|
Config() config.Config
|
||||||
Checker() health.Checker
|
Checker() health.Checker
|
||||||
EndpointManager() endpoint.EndpointManager
|
EndpointManager() endpoint.EndpointManager
|
||||||
|
@ -89,6 +90,10 @@ func (a app) Audit() audit.Emitter {
|
||||||
return a.eventStream
|
return a.eventStream
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a app) EventStream() audit.EventStream {
|
||||||
|
return a.eventStream
|
||||||
|
}
|
||||||
|
|
||||||
func (a app) HandlerRegistry() api.HandlerRegistry {
|
func (a app) HandlerRegistry() api.HandlerRegistry {
|
||||||
return a.registry
|
return a.registry
|
||||||
}
|
}
|
||||||
|
|
59
internal/cmd/audit_file.go
Normal file
59
internal/cmd/audit_file.go
Normal file
|
@ -0,0 +1,59 @@
|
||||||
|
package cmd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
"gitlab.com/inetmock/inetmock/internal/rpc"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
addFileCmd = &cobra.Command{
|
||||||
|
Use: "addFile",
|
||||||
|
Short: "subscribe events to a file",
|
||||||
|
Args: cobra.ExactArgs(1),
|
||||||
|
RunE: runAddFile,
|
||||||
|
}
|
||||||
|
|
||||||
|
removeFileCmd = &cobra.Command{
|
||||||
|
Use: "removeFile",
|
||||||
|
Short: "remove file subscription",
|
||||||
|
Args: cobra.ExactArgs(1),
|
||||||
|
RunE: runRemoveFile,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func runAddFile(_ *cobra.Command, args []string) (err error) {
|
||||||
|
var conn *grpc.ClientConn
|
||||||
|
|
||||||
|
if conn, err = grpc.Dial(inetMockSocketPath, grpc.WithInsecure()); err != nil {
|
||||||
|
fmt.Printf("Failed to connecto INetMock socket: %v\n", err)
|
||||||
|
os.Exit(10)
|
||||||
|
}
|
||||||
|
|
||||||
|
auditClient := rpc.NewAuditClient(conn)
|
||||||
|
ctx, cancel := context.WithTimeout(appCtx, grpcTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
_, err = auditClient.RegisterFileSink(ctx, &rpc.RegisterFileSinkRequest{TargetPath: args[0]})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func runRemoveFile(_ *cobra.Command, args []string) (err error) {
|
||||||
|
var conn *grpc.ClientConn
|
||||||
|
|
||||||
|
if conn, err = grpc.Dial(inetMockSocketPath, grpc.WithInsecure()); err != nil {
|
||||||
|
fmt.Printf("Failed to connecto INetMock socket: %v\n", err)
|
||||||
|
os.Exit(10)
|
||||||
|
}
|
||||||
|
|
||||||
|
auditClient := rpc.NewAuditClient(conn)
|
||||||
|
ctx, cancel := context.WithTimeout(appCtx, grpcTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
_, err = auditClient.RemoveFileSink(ctx, &rpc.RemoveFileSinkRequest{TargetPath: args[0]})
|
||||||
|
return
|
||||||
|
}
|
62
internal/cmd/audit_watch.go
Normal file
62
internal/cmd/audit_watch.go
Normal file
|
@ -0,0 +1,62 @@
|
||||||
|
package cmd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
"gitlab.com/inetmock/inetmock/internal/rpc"
|
||||||
|
"gitlab.com/inetmock/inetmock/pkg/audit"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
watchEventsCmd = &cobra.Command{
|
||||||
|
Use: "watch",
|
||||||
|
Short: "Watch all audit events",
|
||||||
|
RunE: watchAuditEvents,
|
||||||
|
}
|
||||||
|
|
||||||
|
auditCmd = &cobra.Command{
|
||||||
|
Use: "audit",
|
||||||
|
Short: "Interact with the audit stream",
|
||||||
|
}
|
||||||
|
|
||||||
|
listenerName string
|
||||||
|
)
|
||||||
|
|
||||||
|
func watchAuditEvents(_ *cobra.Command, _ []string) (err error) {
|
||||||
|
var conn *grpc.ClientConn
|
||||||
|
|
||||||
|
if conn, err = grpc.Dial(inetMockSocketPath, grpc.WithInsecure()); err != nil {
|
||||||
|
fmt.Printf("Failed to connecto INetMock socket: %v\n", err)
|
||||||
|
os.Exit(10)
|
||||||
|
}
|
||||||
|
|
||||||
|
auditClient := rpc.NewAuditClient(conn)
|
||||||
|
|
||||||
|
var watchClient rpc.Audit_WatchEventsClient
|
||||||
|
if watchClient, err = auditClient.WatchEvents(appCtx, &rpc.WatchEventsRequest{WatcherName: listenerName}); err != nil {
|
||||||
|
fmt.Println(err.Error())
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
var protoEv *audit.EventEntity
|
||||||
|
for protoEv, err = watchClient.Recv(); err == nil; protoEv, err = watchClient.Recv() {
|
||||||
|
ev := audit.NewEventFromProto(protoEv)
|
||||||
|
var out []byte
|
||||||
|
out, err = json.Marshal(ev)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fmt.Println(string(out))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-appCtx.Done()
|
||||||
|
err = watchClient.CloseSend()
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
|
@ -1,8 +1,14 @@
|
||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"os/user"
|
||||||
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -15,20 +21,54 @@ var (
|
||||||
inetMockSocketPath string
|
inetMockSocketPath string
|
||||||
outputFormat string
|
outputFormat string
|
||||||
grpcTimeout time.Duration
|
grpcTimeout time.Duration
|
||||||
|
appCtx context.Context
|
||||||
|
appCancel context.CancelFunc
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
cliCmd.PersistentFlags().StringVar(&inetMockSocketPath, "socket-path", "./inetmock.sock", "Path to the INetMock socket file")
|
cliCmd.PersistentFlags().StringVar(&inetMockSocketPath, "socket-path", "unix:///var/run/inetmock.sock", "Path to the INetMock socket file")
|
||||||
cliCmd.PersistentFlags().StringVarP(&outputFormat, "format", "f", "table", "Output format to use. Possible values: table, json, yaml")
|
cliCmd.PersistentFlags().StringVarP(&outputFormat, "format", "f", "table", "Output format to use. Possible values: table, json, yaml")
|
||||||
cliCmd.PersistentFlags().DurationVar(&grpcTimeout, "grpc-timeout", 5*time.Second, "Timeout to connect to the gRPC API")
|
cliCmd.PersistentFlags().DurationVar(&grpcTimeout, "grpc-timeout", 5*time.Second, "Timeout to connect to the gRPC API")
|
||||||
|
|
||||||
cliCmd.AddCommand(endpointsCmd, handlerCmd, healthCmd)
|
cliCmd.AddCommand(endpointsCmd, handlerCmd, healthCmd, auditCmd)
|
||||||
endpointsCmd.AddCommand(getEndpoints)
|
endpointsCmd.AddCommand(getEndpoints)
|
||||||
handlerCmd.AddCommand(getHandlersCmd)
|
handlerCmd.AddCommand(getHandlersCmd)
|
||||||
healthCmd.AddCommand(generalHealthCmd)
|
healthCmd.AddCommand(generalHealthCmd)
|
||||||
healthCmd.AddCommand(containerHealthCmd)
|
healthCmd.AddCommand(containerHealthCmd)
|
||||||
|
|
||||||
|
currentUser := ""
|
||||||
|
if usr, err := user.Current(); err == nil {
|
||||||
|
currentUser = usr.Username
|
||||||
|
} else {
|
||||||
|
currentUser = uuid.New().String()
|
||||||
|
}
|
||||||
|
|
||||||
|
watchEventsCmd.PersistentFlags().StringVar(
|
||||||
|
&listenerName,
|
||||||
|
"listener-name",
|
||||||
|
currentUser,
|
||||||
|
"set listener name - defaults to the current username, if the user cannot be determined a random UUID will be used",
|
||||||
|
)
|
||||||
|
auditCmd.AddCommand(watchEventsCmd, addFileCmd, removeFileCmd)
|
||||||
|
|
||||||
|
appCtx, appCancel = initAppContext()
|
||||||
}
|
}
|
||||||
|
|
||||||
func ExecuteClientCommand() error {
|
func ExecuteClientCommand() error {
|
||||||
|
defer appCancel()
|
||||||
return cliCmd.Execute()
|
return cliCmd.Execute()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func initAppContext() (context.Context, context.CancelFunc) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
signals := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
<-signals
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
|
||||||
|
return ctx, cancel
|
||||||
|
}
|
||||||
|
|
|
@ -15,7 +15,7 @@ var (
|
||||||
getEndpoints = &cobra.Command{
|
getEndpoints = &cobra.Command{
|
||||||
Use: "get",
|
Use: "get",
|
||||||
Short: "Get all running endpoints",
|
Short: "Get all running endpoints",
|
||||||
Run: runGetEndpoints,
|
RunE: runGetEndpoints,
|
||||||
}
|
}
|
||||||
|
|
||||||
endpointsCmd = &cobra.Command{
|
endpointsCmd = &cobra.Command{
|
||||||
|
@ -50,8 +50,7 @@ func fromEndpoints(eps []*rpc.Endpoint) (out []*printableEndpoint) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func runGetEndpoints(_ *cobra.Command, _ []string) {
|
func runGetEndpoints(_ *cobra.Command, _ []string) (err error) {
|
||||||
var err error
|
|
||||||
var conn *grpc.ClientConn
|
var conn *grpc.ClientConn
|
||||||
|
|
||||||
if conn, err = grpc.Dial(inetMockSocketPath, grpc.WithInsecure()); err != nil {
|
if conn, err = grpc.Dial(inetMockSocketPath, grpc.WithInsecure()); err != nil {
|
||||||
|
@ -59,7 +58,8 @@ func runGetEndpoints(_ *cobra.Command, _ []string) {
|
||||||
os.Exit(10)
|
os.Exit(10)
|
||||||
}
|
}
|
||||||
endpointsClient := rpc.NewEndpointsClient(conn)
|
endpointsClient := rpc.NewEndpointsClient(conn)
|
||||||
ctx, _ := context.WithTimeout(context.Background(), grpcTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout)
|
||||||
|
defer cancel()
|
||||||
var endpointsResp *rpc.GetEndpointsResponse
|
var endpointsResp *rpc.GetEndpointsResponse
|
||||||
if endpointsResp, err = endpointsClient.GetEndpoints(ctx, &rpc.GetEndpointsRequest{}); err != nil {
|
if endpointsResp, err = endpointsClient.GetEndpoints(ctx, &rpc.GetEndpointsRequest{}); err != nil {
|
||||||
fmt.Printf("Failed to get the endpoints: %v", err)
|
fmt.Printf("Failed to get the endpoints: %v", err)
|
||||||
|
@ -70,4 +70,5 @@ func runGetEndpoints(_ *cobra.Command, _ []string) {
|
||||||
if err = writer.Write(fromEndpoints(endpointsResp.Endpoints)); err != nil {
|
if err = writer.Write(fromEndpoints(endpointsResp.Endpoints)); err != nil {
|
||||||
fmt.Printf("Error occurred during writing response values: %v\n", err)
|
fmt.Printf("Error occurred during writing response values: %v\n", err)
|
||||||
}
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,6 +75,15 @@ func (t *tblWriter) getData(val reflect.Value, numberOfFields int) (data []strin
|
||||||
}
|
}
|
||||||
|
|
||||||
func value(val reflect.Value) string {
|
func value(val reflect.Value) string {
|
||||||
|
|
||||||
|
if val.IsZero() {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
if stringer, isStringer := val.Interface().(fmt.Stringer); isStringer {
|
||||||
|
return stringer.String()
|
||||||
|
}
|
||||||
|
|
||||||
switch val.Kind() {
|
switch val.Kind() {
|
||||||
case reflect.Ptr:
|
case reflect.Ptr:
|
||||||
return value(val.Elem())
|
return value(val.Elem())
|
||||||
|
@ -84,6 +93,8 @@ func value(val reflect.Value) string {
|
||||||
return strconv.FormatBool(val.Bool())
|
return strconv.FormatBool(val.Bool())
|
||||||
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
|
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
|
||||||
return strconv.FormatInt(val.Int(), 10)
|
return strconv.FormatInt(val.Int(), 10)
|
||||||
|
case reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
|
||||||
|
return strconv.FormatUint(val.Uint(), 10)
|
||||||
case reflect.Float32, reflect.Float64:
|
case reflect.Float32, reflect.Float64:
|
||||||
return strconv.FormatFloat(val.Float(), 'f', 6, 64)
|
return strconv.FormatFloat(val.Float(), 'f', 6, 64)
|
||||||
default:
|
default:
|
||||||
|
|
51
internal/rpc/audit_server.go
Normal file
51
internal/rpc/audit_server.go
Normal file
|
@ -0,0 +1,51 @@
|
||||||
|
package rpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"gitlab.com/inetmock/inetmock/internal/app"
|
||||||
|
"gitlab.com/inetmock/inetmock/pkg/audit"
|
||||||
|
"gitlab.com/inetmock/inetmock/pkg/audit/sink"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type auditServer struct {
|
||||||
|
UnimplementedAuditServer
|
||||||
|
app app.App
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *auditServer) WatchEvents(req *WatchEventsRequest, srv Audit_WatchEventsServer) (err error) {
|
||||||
|
a.app.Logger().Info("watcher attached", zap.String("name", req.WatcherName))
|
||||||
|
err = a.app.EventStream().RegisterSink(sink.NewGRPCSink(srv.Context(), req.WatcherName, func(ev audit.Event) {
|
||||||
|
if err = srv.Send(ev.ProtoMessage()); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
<-srv.Context().Done()
|
||||||
|
a.app.Logger().Info("Watcher detached", zap.String("name", req.WatcherName))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *auditServer) RegisterFileSink(_ context.Context, req *RegisterFileSinkRequest) (resp *RegisterFileSinkResponse, err error) {
|
||||||
|
var writer io.WriteCloser
|
||||||
|
if writer, err = os.OpenFile(req.TargetPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err = a.app.EventStream().RegisterSink(sink.NewWriterSink(req.TargetPath, audit.NewEventWriter(writer))); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resp = &RegisterFileSinkResponse{}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *auditServer) RemoveFileSink(_ context.Context, req *RemoveFileSinkRequest) (*RemoveFileSinkResponse, error) {
|
||||||
|
a.app.EventStream().RemoveSink(req.TargetPath)
|
||||||
|
return &RemoveFileSinkResponse{}, nil
|
||||||
|
}
|
|
@ -52,6 +52,10 @@ func (i *inetmockAPI) StartServer() (err error) {
|
||||||
app: i.app,
|
app: i.app,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
RegisterAuditServer(i.server, &auditServer{
|
||||||
|
app: i.app,
|
||||||
|
})
|
||||||
|
|
||||||
go i.startServerAsync(lis)
|
go i.startServerAsync(lis)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,9 +11,6 @@ type handlersServer struct {
|
||||||
registry api.HandlerRegistry
|
registry api.HandlerRegistry
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handlersServer) mustEmbedUnimplementedHandlersServer() {
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *handlersServer) GetHandlers(_ context.Context, _ *GetHandlersRequest) (*GetHandlersResponse, error) {
|
func (h *handlersServer) GetHandlers(_ context.Context, _ *GetHandlersRequest) (*GetHandlersResponse, error) {
|
||||||
return &GetHandlersResponse{
|
return &GetHandlersResponse{
|
||||||
Handlers: h.registry.AvailableHandlers(),
|
Handlers: h.registry.AvailableHandlers(),
|
||||||
|
|
|
@ -3,12 +3,12 @@ package rpc
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
app2 "gitlab.com/inetmock/inetmock/internal/app"
|
"gitlab.com/inetmock/inetmock/internal/app"
|
||||||
)
|
)
|
||||||
|
|
||||||
type healthServer struct {
|
type healthServer struct {
|
||||||
UnimplementedHealthServer
|
UnimplementedHealthServer
|
||||||
app app2.App
|
app app.App
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h healthServer) GetHealth(_ context.Context, _ *HealthRequest) (resp *HealthResponse, err error) {
|
func (h healthServer) GetHealth(_ context.Context, _ *HealthRequest) (resp *HealthResponse, err error) {
|
||||||
|
|
|
@ -15,9 +15,11 @@ type Emitter interface {
|
||||||
Emit(ev Event)
|
Emit(ev Event)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type CloseHandle func()
|
||||||
|
|
||||||
type Sink interface {
|
type Sink interface {
|
||||||
Name() string
|
Name() string
|
||||||
OnSubscribe(evs <-chan Event)
|
OnSubscribe(evs <-chan Event, close CloseHandle)
|
||||||
}
|
}
|
||||||
|
|
||||||
type EventStream interface {
|
type EventStream interface {
|
||||||
|
@ -25,4 +27,5 @@ type EventStream interface {
|
||||||
Emitter
|
Emitter
|
||||||
RegisterSink(s Sink) error
|
RegisterSink(s Sink) error
|
||||||
Sinks() []string
|
Sinks() []string
|
||||||
|
RemoveSink(name string)
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,7 @@ type Event struct {
|
||||||
TLS *TLSDetails
|
TLS *TLSDetails
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Event) ProtoMessage() proto.Message {
|
func (e *Event) ProtoMessage() *EventEntity {
|
||||||
var sourceIP isEventEntity_SourceIP
|
var sourceIP isEventEntity_SourceIP
|
||||||
if ipv4 := e.SourceIP.To4(); ipv4 != nil {
|
if ipv4 := e.SourceIP.To4(); ipv4 != nil {
|
||||||
sourceIP = &EventEntity_SourceIPv4{SourceIPv4: ipv4ToUint32(ipv4)}
|
sourceIP = &EventEntity_SourceIPv4{SourceIPv4: ipv4ToUint32(ipv4)}
|
||||||
|
|
|
@ -72,6 +72,20 @@ func (e *eventStream) Emit(ev Event) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *eventStream) RemoveSink(name string) {
|
||||||
|
e.lock.Lock()
|
||||||
|
defer e.lock.Unlock()
|
||||||
|
|
||||||
|
sink, exists := e.sinks[name]
|
||||||
|
if !exists {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sink.lock.Lock()
|
||||||
|
defer sink.lock.Unlock()
|
||||||
|
delete(e.sinks, name)
|
||||||
|
close(sink.downstream)
|
||||||
|
}
|
||||||
|
|
||||||
func (e *eventStream) RegisterSink(s Sink) error {
|
func (e *eventStream) RegisterSink(s Sink) error {
|
||||||
name := s.Name()
|
name := s.Name()
|
||||||
|
|
||||||
|
@ -83,7 +97,11 @@ func (e *eventStream) RegisterSink(s Sink) error {
|
||||||
downstream: make(chan Event, e.sinkBufferSize),
|
downstream: make(chan Event, e.sinkBufferSize),
|
||||||
lock: new(sync.Mutex),
|
lock: new(sync.Mutex),
|
||||||
}
|
}
|
||||||
s.OnSubscribe(rs.downstream)
|
|
||||||
|
s.OnSubscribe(rs.downstream, func() {
|
||||||
|
e.RemoveSink(name)
|
||||||
|
})
|
||||||
|
|
||||||
e.sinks[name] = rs
|
e.sinks[name] = rs
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,7 +61,7 @@ func (t *testSink) Name() string {
|
||||||
return t.name
|
return t.name
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *testSink) OnSubscribe(evs <-chan audit.Event) {
|
func (t *testSink) OnSubscribe(evs <-chan audit.Event, _ audit.CloseHandle) {
|
||||||
go func() {
|
go func() {
|
||||||
for ev := range evs {
|
for ev := range evs {
|
||||||
if t.consumer != nil {
|
if t.consumer != nil {
|
||||||
|
|
39
pkg/audit/sink/grpc_sink.go
Normal file
39
pkg/audit/sink/grpc_sink.go
Normal file
|
@ -0,0 +1,39 @@
|
||||||
|
package sink
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"gitlab.com/inetmock/inetmock/pkg/audit"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewGRPCSink(ctx context.Context, name string, consumer func(ev audit.Event)) audit.Sink {
|
||||||
|
return &grpcSink{
|
||||||
|
name: name,
|
||||||
|
ctx: ctx,
|
||||||
|
consumer: consumer,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type grpcSink struct {
|
||||||
|
name string
|
||||||
|
ctx context.Context
|
||||||
|
consumer func(ev audit.Event)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g grpcSink) Name() string {
|
||||||
|
return g.name
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g grpcSink) OnSubscribe(evs <-chan audit.Event, handle audit.CloseHandle) {
|
||||||
|
go func(ctx context.Context, consumer func(ev audit.Event), evs <-chan audit.Event, handle audit.CloseHandle) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case ev := <-evs:
|
||||||
|
consumer(ev)
|
||||||
|
case <-ctx.Done():
|
||||||
|
handle()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(g.ctx, g.consumer, evs, handle)
|
||||||
|
}
|
|
@ -26,7 +26,7 @@ func (logSink) Name() string {
|
||||||
return logSinkName
|
return logSinkName
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l logSink) OnSubscribe(evs <-chan audit.Event) {
|
func (l logSink) OnSubscribe(evs <-chan audit.Event, _ audit.CloseHandle) {
|
||||||
go func(logger logging.Logger, evs <-chan audit.Event) {
|
go func(logger logging.Logger, evs <-chan audit.Event) {
|
||||||
for ev := range evs {
|
for ev := range evs {
|
||||||
eventLogger := logger
|
eventLogger := logger
|
||||||
|
|
|
@ -33,7 +33,7 @@ func (f writerCloserSink) Name() string {
|
||||||
return f.name
|
return f.name
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f writerCloserSink) OnSubscribe(evs <-chan audit.Event) {
|
func (f writerCloserSink) OnSubscribe(evs <-chan audit.Event, _ audit.CloseHandle) {
|
||||||
go func(target audit.Writer, closeOnExit bool, evs <-chan audit.Event) {
|
go func(target audit.Writer, closeOnExit bool, evs <-chan audit.Event) {
|
||||||
for ev := range evs {
|
for ev := range evs {
|
||||||
_ = target.Write(&ev)
|
_ = target.Write(&ev)
|
||||||
|
|
Loading…
Reference in a new issue