diff --git a/api/proto/internal/rpc/audit.proto b/api/proto/internal/rpc/audit.proto new file mode 100644 index 0000000..aa07869 --- /dev/null +++ b/api/proto/internal/rpc/audit.proto @@ -0,0 +1,36 @@ +syntax = "proto3"; + +option go_package = "gitlab.com/inetmock/inetmock/internal/rpc"; +option java_multiple_files = true; +option java_package = "com.github.baez90.inetmock.rpc"; +option java_outer_classname = "AuditProto"; + +import 'pkg/audit/event_entity.proto'; + +package inetmock.rpc; + +message WatchEventsRequest { + string watcherName = 1; +} + +message RegisterFileSinkRequest { + string targetPath = 1; +} + +message RegisterFileSinkResponse { + +} + +message RemoveFileSinkRequest { + string targetPath = 1; +} + +message RemoveFileSinkResponse { + +} + +service Audit { + rpc WatchEvents (WatchEventsRequest) returns (stream inetmock.audit.EventEntity); + rpc RegisterFileSink (RegisterFileSinkRequest) returns (RegisterFileSinkResponse); + rpc RemoveFileSink (RemoveFileSinkRequest) returns (RemoveFileSinkResponse); +} \ No newline at end of file diff --git a/api/proto/internal/rpc/endpoints.proto b/api/proto/internal/rpc/endpoints.proto index 48d850b..db21768 100644 --- a/api/proto/internal/rpc/endpoints.proto +++ b/api/proto/internal/rpc/endpoints.proto @@ -7,11 +7,6 @@ option java_outer_classname = "EndpointsProto"; package inetmock.rpc; -service Endpoints { - rpc GetEndpoints (GetEndpointsRequest) returns (GetEndpointsResponse) { - } -} - message GetEndpointsRequest { } @@ -25,4 +20,9 @@ message Endpoint { string handler = 3; string listenAddress = 4; int32 port = 5; +} + +service Endpoints { + rpc GetEndpoints (GetEndpointsRequest) returns (GetEndpointsResponse) { + } } \ No newline at end of file diff --git a/internal/app/app.go b/internal/app/app.go index f906795..3540e6c 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -29,6 +29,7 @@ var ( type App interface { api.PluginContext + EventStream() audit.EventStream Config() config.Config Checker() health.Checker EndpointManager() endpoint.EndpointManager @@ -89,6 +90,10 @@ func (a app) Audit() audit.Emitter { return a.eventStream } +func (a app) EventStream() audit.EventStream { + return a.eventStream +} + func (a app) HandlerRegistry() api.HandlerRegistry { return a.registry } diff --git a/internal/cmd/audit_file.go b/internal/cmd/audit_file.go new file mode 100644 index 0000000..9f98069 --- /dev/null +++ b/internal/cmd/audit_file.go @@ -0,0 +1,59 @@ +package cmd + +import ( + "context" + "fmt" + "os" + + "github.com/spf13/cobra" + "gitlab.com/inetmock/inetmock/internal/rpc" + "google.golang.org/grpc" +) + +var ( + addFileCmd = &cobra.Command{ + Use: "addFile", + Short: "subscribe events to a file", + Args: cobra.ExactArgs(1), + RunE: runAddFile, + } + + removeFileCmd = &cobra.Command{ + Use: "removeFile", + Short: "remove file subscription", + Args: cobra.ExactArgs(1), + RunE: runRemoveFile, + } +) + +func runAddFile(_ *cobra.Command, args []string) (err error) { + var conn *grpc.ClientConn + + if conn, err = grpc.Dial(inetMockSocketPath, grpc.WithInsecure()); err != nil { + fmt.Printf("Failed to connecto INetMock socket: %v\n", err) + os.Exit(10) + } + + auditClient := rpc.NewAuditClient(conn) + ctx, cancel := context.WithTimeout(appCtx, grpcTimeout) + defer cancel() + + _, err = auditClient.RegisterFileSink(ctx, &rpc.RegisterFileSinkRequest{TargetPath: args[0]}) + return +} + +func runRemoveFile(_ *cobra.Command, args []string) (err error) { + var conn *grpc.ClientConn + + if conn, err = grpc.Dial(inetMockSocketPath, grpc.WithInsecure()); err != nil { + fmt.Printf("Failed to connecto INetMock socket: %v\n", err) + os.Exit(10) + } + + auditClient := rpc.NewAuditClient(conn) + ctx, cancel := context.WithTimeout(appCtx, grpcTimeout) + defer cancel() + + _, err = auditClient.RemoveFileSink(ctx, &rpc.RemoveFileSinkRequest{TargetPath: args[0]}) + return +} diff --git a/internal/cmd/audit_watch.go b/internal/cmd/audit_watch.go new file mode 100644 index 0000000..93e775e --- /dev/null +++ b/internal/cmd/audit_watch.go @@ -0,0 +1,62 @@ +package cmd + +import ( + "encoding/json" + "fmt" + "os" + + "github.com/spf13/cobra" + "gitlab.com/inetmock/inetmock/internal/rpc" + "gitlab.com/inetmock/inetmock/pkg/audit" + "google.golang.org/grpc" +) + +var ( + watchEventsCmd = &cobra.Command{ + Use: "watch", + Short: "Watch all audit events", + RunE: watchAuditEvents, + } + + auditCmd = &cobra.Command{ + Use: "audit", + Short: "Interact with the audit stream", + } + + listenerName string +) + +func watchAuditEvents(_ *cobra.Command, _ []string) (err error) { + var conn *grpc.ClientConn + + if conn, err = grpc.Dial(inetMockSocketPath, grpc.WithInsecure()); err != nil { + fmt.Printf("Failed to connecto INetMock socket: %v\n", err) + os.Exit(10) + } + + auditClient := rpc.NewAuditClient(conn) + + var watchClient rpc.Audit_WatchEventsClient + if watchClient, err = auditClient.WatchEvents(appCtx, &rpc.WatchEventsRequest{WatcherName: listenerName}); err != nil { + fmt.Println(err.Error()) + os.Exit(1) + } + + go func() { + var protoEv *audit.EventEntity + for protoEv, err = watchClient.Recv(); err == nil; protoEv, err = watchClient.Recv() { + ev := audit.NewEventFromProto(protoEv) + var out []byte + out, err = json.Marshal(ev) + if err != nil { + continue + } + fmt.Println(string(out)) + } + }() + + <-appCtx.Done() + err = watchClient.CloseSend() + + return +} diff --git a/internal/cmd/cli.go b/internal/cmd/cli.go index 68f5491..bf89ede 100644 --- a/internal/cmd/cli.go +++ b/internal/cmd/cli.go @@ -1,8 +1,14 @@ package cmd import ( + "context" + "os" + "os/signal" + "os/user" + "syscall" "time" + "github.com/google/uuid" "github.com/spf13/cobra" ) @@ -15,20 +21,54 @@ var ( inetMockSocketPath string outputFormat string grpcTimeout time.Duration + appCtx context.Context + appCancel context.CancelFunc ) func init() { - cliCmd.PersistentFlags().StringVar(&inetMockSocketPath, "socket-path", "./inetmock.sock", "Path to the INetMock socket file") + cliCmd.PersistentFlags().StringVar(&inetMockSocketPath, "socket-path", "unix:///var/run/inetmock.sock", "Path to the INetMock socket file") cliCmd.PersistentFlags().StringVarP(&outputFormat, "format", "f", "table", "Output format to use. Possible values: table, json, yaml") cliCmd.PersistentFlags().DurationVar(&grpcTimeout, "grpc-timeout", 5*time.Second, "Timeout to connect to the gRPC API") - cliCmd.AddCommand(endpointsCmd, handlerCmd, healthCmd) + cliCmd.AddCommand(endpointsCmd, handlerCmd, healthCmd, auditCmd) endpointsCmd.AddCommand(getEndpoints) handlerCmd.AddCommand(getHandlersCmd) healthCmd.AddCommand(generalHealthCmd) healthCmd.AddCommand(containerHealthCmd) + + currentUser := "" + if usr, err := user.Current(); err == nil { + currentUser = usr.Username + } else { + currentUser = uuid.New().String() + } + + watchEventsCmd.PersistentFlags().StringVar( + &listenerName, + "listener-name", + currentUser, + "set listener name - defaults to the current username, if the user cannot be determined a random UUID will be used", + ) + auditCmd.AddCommand(watchEventsCmd, addFileCmd, removeFileCmd) + + appCtx, appCancel = initAppContext() } func ExecuteClientCommand() error { + defer appCancel() return cliCmd.Execute() } + +func initAppContext() (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + + signals := make(chan os.Signal, 1) + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + + go func() { + <-signals + cancel() + }() + + return ctx, cancel +} diff --git a/internal/cmd/endpoints.go b/internal/cmd/endpoints.go index 3cd2749..f9dcd0c 100644 --- a/internal/cmd/endpoints.go +++ b/internal/cmd/endpoints.go @@ -15,7 +15,7 @@ var ( getEndpoints = &cobra.Command{ Use: "get", Short: "Get all running endpoints", - Run: runGetEndpoints, + RunE: runGetEndpoints, } endpointsCmd = &cobra.Command{ @@ -50,8 +50,7 @@ func fromEndpoints(eps []*rpc.Endpoint) (out []*printableEndpoint) { return } -func runGetEndpoints(_ *cobra.Command, _ []string) { - var err error +func runGetEndpoints(_ *cobra.Command, _ []string) (err error) { var conn *grpc.ClientConn if conn, err = grpc.Dial(inetMockSocketPath, grpc.WithInsecure()); err != nil { @@ -59,7 +58,8 @@ func runGetEndpoints(_ *cobra.Command, _ []string) { os.Exit(10) } endpointsClient := rpc.NewEndpointsClient(conn) - ctx, _ := context.WithTimeout(context.Background(), grpcTimeout) + ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout) + defer cancel() var endpointsResp *rpc.GetEndpointsResponse if endpointsResp, err = endpointsClient.GetEndpoints(ctx, &rpc.GetEndpointsRequest{}); err != nil { fmt.Printf("Failed to get the endpoints: %v", err) @@ -70,4 +70,5 @@ func runGetEndpoints(_ *cobra.Command, _ []string) { if err = writer.Write(fromEndpoints(endpointsResp.Endpoints)); err != nil { fmt.Printf("Error occurred during writing response values: %v\n", err) } + return } diff --git a/internal/format/table_writer.go b/internal/format/table_writer.go index feb3383..da75f16 100644 --- a/internal/format/table_writer.go +++ b/internal/format/table_writer.go @@ -75,6 +75,15 @@ func (t *tblWriter) getData(val reflect.Value, numberOfFields int) (data []strin } func value(val reflect.Value) string { + + if val.IsZero() { + return "" + } + + if stringer, isStringer := val.Interface().(fmt.Stringer); isStringer { + return stringer.String() + } + switch val.Kind() { case reflect.Ptr: return value(val.Elem()) @@ -84,6 +93,8 @@ func value(val reflect.Value) string { return strconv.FormatBool(val.Bool()) case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: return strconv.FormatInt(val.Int(), 10) + case reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + return strconv.FormatUint(val.Uint(), 10) case reflect.Float32, reflect.Float64: return strconv.FormatFloat(val.Float(), 'f', 6, 64) default: diff --git a/internal/rpc/audit_server.go b/internal/rpc/audit_server.go new file mode 100644 index 0000000..db8d1fa --- /dev/null +++ b/internal/rpc/audit_server.go @@ -0,0 +1,51 @@ +package rpc + +import ( + "context" + "io" + "os" + + "gitlab.com/inetmock/inetmock/internal/app" + "gitlab.com/inetmock/inetmock/pkg/audit" + "gitlab.com/inetmock/inetmock/pkg/audit/sink" + "go.uber.org/zap" +) + +type auditServer struct { + UnimplementedAuditServer + app app.App +} + +func (a *auditServer) WatchEvents(req *WatchEventsRequest, srv Audit_WatchEventsServer) (err error) { + a.app.Logger().Info("watcher attached", zap.String("name", req.WatcherName)) + err = a.app.EventStream().RegisterSink(sink.NewGRPCSink(srv.Context(), req.WatcherName, func(ev audit.Event) { + if err = srv.Send(ev.ProtoMessage()); err != nil { + return + } + })) + + if err != nil { + return + } + + <-srv.Context().Done() + a.app.Logger().Info("Watcher detached", zap.String("name", req.WatcherName)) + return +} + +func (a *auditServer) RegisterFileSink(_ context.Context, req *RegisterFileSinkRequest) (resp *RegisterFileSinkResponse, err error) { + var writer io.WriteCloser + if writer, err = os.OpenFile(req.TargetPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644); err != nil { + return + } + if err = a.app.EventStream().RegisterSink(sink.NewWriterSink(req.TargetPath, audit.NewEventWriter(writer))); err != nil { + return + } + resp = &RegisterFileSinkResponse{} + return +} + +func (a *auditServer) RemoveFileSink(_ context.Context, req *RemoveFileSinkRequest) (*RemoveFileSinkResponse, error) { + a.app.EventStream().RemoveSink(req.TargetPath) + return &RemoveFileSinkResponse{}, nil +} diff --git a/internal/rpc/grpc_api.go b/internal/rpc/grpc_api.go index cbcc87c..6c71d8c 100644 --- a/internal/rpc/grpc_api.go +++ b/internal/rpc/grpc_api.go @@ -52,6 +52,10 @@ func (i *inetmockAPI) StartServer() (err error) { app: i.app, }) + RegisterAuditServer(i.server, &auditServer{ + app: i.app, + }) + go i.startServerAsync(lis) return } diff --git a/internal/rpc/handlers_server.go b/internal/rpc/handlers_server.go index bd5491e..0af6e56 100644 --- a/internal/rpc/handlers_server.go +++ b/internal/rpc/handlers_server.go @@ -11,9 +11,6 @@ type handlersServer struct { registry api.HandlerRegistry } -func (h *handlersServer) mustEmbedUnimplementedHandlersServer() { -} - func (h *handlersServer) GetHandlers(_ context.Context, _ *GetHandlersRequest) (*GetHandlersResponse, error) { return &GetHandlersResponse{ Handlers: h.registry.AvailableHandlers(), diff --git a/internal/rpc/health_server.go b/internal/rpc/health_server.go index d92f344..17b0af7 100644 --- a/internal/rpc/health_server.go +++ b/internal/rpc/health_server.go @@ -3,12 +3,12 @@ package rpc import ( "context" - app2 "gitlab.com/inetmock/inetmock/internal/app" + "gitlab.com/inetmock/inetmock/internal/app" ) type healthServer struct { UnimplementedHealthServer - app app2.App + app app.App } func (h healthServer) GetHealth(_ context.Context, _ *HealthRequest) (resp *HealthResponse, err error) { diff --git a/pkg/audit/api.go b/pkg/audit/api.go index fa4ef7b..d269121 100644 --- a/pkg/audit/api.go +++ b/pkg/audit/api.go @@ -15,9 +15,11 @@ type Emitter interface { Emit(ev Event) } +type CloseHandle func() + type Sink interface { Name() string - OnSubscribe(evs <-chan Event) + OnSubscribe(evs <-chan Event, close CloseHandle) } type EventStream interface { @@ -25,4 +27,5 @@ type EventStream interface { Emitter RegisterSink(s Sink) error Sinks() []string + RemoveSink(name string) } diff --git a/pkg/audit/event.go b/pkg/audit/event.go index 0ec4aa5..5e2f29d 100644 --- a/pkg/audit/event.go +++ b/pkg/audit/event.go @@ -29,7 +29,7 @@ type Event struct { TLS *TLSDetails } -func (e *Event) ProtoMessage() proto.Message { +func (e *Event) ProtoMessage() *EventEntity { var sourceIP isEventEntity_SourceIP if ipv4 := e.SourceIP.To4(); ipv4 != nil { sourceIP = &EventEntity_SourceIPv4{SourceIPv4: ipv4ToUint32(ipv4)} diff --git a/pkg/audit/event_stream.go b/pkg/audit/event_stream.go index 51f6c7c..d3986b0 100644 --- a/pkg/audit/event_stream.go +++ b/pkg/audit/event_stream.go @@ -72,6 +72,20 @@ func (e *eventStream) Emit(ev Event) { } } +func (e *eventStream) RemoveSink(name string) { + e.lock.Lock() + defer e.lock.Unlock() + + sink, exists := e.sinks[name] + if !exists { + return + } + sink.lock.Lock() + defer sink.lock.Unlock() + delete(e.sinks, name) + close(sink.downstream) +} + func (e *eventStream) RegisterSink(s Sink) error { name := s.Name() @@ -83,7 +97,11 @@ func (e *eventStream) RegisterSink(s Sink) error { downstream: make(chan Event, e.sinkBufferSize), lock: new(sync.Mutex), } - s.OnSubscribe(rs.downstream) + + s.OnSubscribe(rs.downstream, func() { + e.RemoveSink(name) + }) + e.sinks[name] = rs return nil } diff --git a/pkg/audit/event_stream_test.go b/pkg/audit/event_stream_test.go index 1d47fdc..f89c981 100644 --- a/pkg/audit/event_stream_test.go +++ b/pkg/audit/event_stream_test.go @@ -61,7 +61,7 @@ func (t *testSink) Name() string { return t.name } -func (t *testSink) OnSubscribe(evs <-chan audit.Event) { +func (t *testSink) OnSubscribe(evs <-chan audit.Event, _ audit.CloseHandle) { go func() { for ev := range evs { if t.consumer != nil { diff --git a/pkg/audit/sink/grpc_sink.go b/pkg/audit/sink/grpc_sink.go new file mode 100644 index 0000000..de52156 --- /dev/null +++ b/pkg/audit/sink/grpc_sink.go @@ -0,0 +1,39 @@ +package sink + +import ( + "context" + + "gitlab.com/inetmock/inetmock/pkg/audit" +) + +func NewGRPCSink(ctx context.Context, name string, consumer func(ev audit.Event)) audit.Sink { + return &grpcSink{ + name: name, + ctx: ctx, + consumer: consumer, + } +} + +type grpcSink struct { + name string + ctx context.Context + consumer func(ev audit.Event) +} + +func (g grpcSink) Name() string { + return g.name +} + +func (g grpcSink) OnSubscribe(evs <-chan audit.Event, handle audit.CloseHandle) { + go func(ctx context.Context, consumer func(ev audit.Event), evs <-chan audit.Event, handle audit.CloseHandle) { + for { + select { + case ev := <-evs: + consumer(ev) + case <-ctx.Done(): + handle() + return + } + } + }(g.ctx, g.consumer, evs, handle) +} diff --git a/pkg/audit/sink/log_sink.go b/pkg/audit/sink/log_sink.go index 9b9de78..fb3c131 100644 --- a/pkg/audit/sink/log_sink.go +++ b/pkg/audit/sink/log_sink.go @@ -26,7 +26,7 @@ func (logSink) Name() string { return logSinkName } -func (l logSink) OnSubscribe(evs <-chan audit.Event) { +func (l logSink) OnSubscribe(evs <-chan audit.Event, _ audit.CloseHandle) { go func(logger logging.Logger, evs <-chan audit.Event) { for ev := range evs { eventLogger := logger diff --git a/pkg/audit/sink/writer_sink.go b/pkg/audit/sink/writer_sink.go index e0e2ba5..5409dde 100644 --- a/pkg/audit/sink/writer_sink.go +++ b/pkg/audit/sink/writer_sink.go @@ -33,7 +33,7 @@ func (f writerCloserSink) Name() string { return f.name } -func (f writerCloserSink) OnSubscribe(evs <-chan audit.Event) { +func (f writerCloserSink) OnSubscribe(evs <-chan audit.Event, _ audit.CloseHandle) { go func(target audit.Writer, closeOnExit bool, evs <-chan audit.Event) { for ev := range evs { _ = target.Write(&ev)