Apply comments
This commit is contained in:
parent
38293bb8c4
commit
37c87adaf3
6 changed files with 21 additions and 14 deletions
|
@ -26,7 +26,7 @@ message RemoveFileSinkRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
message RemoveFileSinkResponse {
|
message RemoveFileSinkResponse {
|
||||||
|
bool SinkGotRemoved = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
service Audit {
|
service Audit {
|
||||||
|
|
|
@ -51,7 +51,7 @@ func fromEndpoints(eps []*rpc.Endpoint) (out []*printableEndpoint) {
|
||||||
|
|
||||||
func runGetEndpoints(_ *cobra.Command, _ []string) (err error) {
|
func runGetEndpoints(_ *cobra.Command, _ []string) (err error) {
|
||||||
endpointsClient := rpc.NewEndpointsClient(conn)
|
endpointsClient := rpc.NewEndpointsClient(conn)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout)
|
ctx, cancel := context.WithTimeout(appCtx, grpcTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
var endpointsResp *rpc.GetEndpointsResponse
|
var endpointsResp *rpc.GetEndpointsResponse
|
||||||
if endpointsResp, err = endpointsClient.GetEndpoints(ctx, &rpc.GetEndpointsRequest{}); err != nil {
|
if endpointsResp, err = endpointsClient.GetEndpoints(ctx, &rpc.GetEndpointsRequest{}); err != nil {
|
||||||
|
|
|
@ -5,20 +5,21 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"gitlab.com/inetmock/inetmock/internal/app"
|
|
||||||
"gitlab.com/inetmock/inetmock/pkg/audit"
|
"gitlab.com/inetmock/inetmock/pkg/audit"
|
||||||
"gitlab.com/inetmock/inetmock/pkg/audit/sink"
|
"gitlab.com/inetmock/inetmock/pkg/audit/sink"
|
||||||
|
"gitlab.com/inetmock/inetmock/pkg/logging"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type auditServer struct {
|
type auditServer struct {
|
||||||
UnimplementedAuditServer
|
UnimplementedAuditServer
|
||||||
app app.App
|
logger logging.Logger
|
||||||
|
eventStream audit.EventStream
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *auditServer) WatchEvents(req *WatchEventsRequest, srv Audit_WatchEventsServer) (err error) {
|
func (a *auditServer) WatchEvents(req *WatchEventsRequest, srv Audit_WatchEventsServer) (err error) {
|
||||||
a.app.Logger().Info("watcher attached", zap.String("name", req.WatcherName))
|
a.logger.Info("watcher attached", zap.String("name", req.WatcherName))
|
||||||
err = a.app.EventStream().RegisterSink(sink.NewGRPCSink(srv.Context(), req.WatcherName, func(ev audit.Event) {
|
err = a.eventStream.RegisterSink(sink.NewGRPCSink(srv.Context(), req.WatcherName, func(ev audit.Event) {
|
||||||
if err = srv.Send(ev.ProtoMessage()); err != nil {
|
if err = srv.Send(ev.ProtoMessage()); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -29,7 +30,7 @@ func (a *auditServer) WatchEvents(req *WatchEventsRequest, srv Audit_WatchEvents
|
||||||
}
|
}
|
||||||
|
|
||||||
<-srv.Context().Done()
|
<-srv.Context().Done()
|
||||||
a.app.Logger().Info("Watcher detached", zap.String("name", req.WatcherName))
|
a.logger.Info("Watcher detached", zap.String("name", req.WatcherName))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,7 +39,7 @@ func (a *auditServer) RegisterFileSink(_ context.Context, req *RegisterFileSinkR
|
||||||
if writer, err = os.OpenFile(req.TargetPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644); err != nil {
|
if writer, err = os.OpenFile(req.TargetPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err = a.app.EventStream().RegisterSink(sink.NewWriterSink(req.TargetPath, audit.NewEventWriter(writer))); err != nil {
|
if err = a.eventStream.RegisterSink(sink.NewWriterSink(req.TargetPath, audit.NewEventWriter(writer))); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
resp = &RegisterFileSinkResponse{}
|
resp = &RegisterFileSinkResponse{}
|
||||||
|
@ -46,6 +47,8 @@ func (a *auditServer) RegisterFileSink(_ context.Context, req *RegisterFileSinkR
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *auditServer) RemoveFileSink(_ context.Context, req *RemoveFileSinkRequest) (*RemoveFileSinkResponse, error) {
|
func (a *auditServer) RemoveFileSink(_ context.Context, req *RemoveFileSinkRequest) (*RemoveFileSinkResponse, error) {
|
||||||
a.app.EventStream().RemoveSink(req.TargetPath)
|
gotRemoved := a.eventStream.RemoveSink(req.TargetPath)
|
||||||
return &RemoveFileSinkResponse{}, nil
|
return &RemoveFileSinkResponse{
|
||||||
|
SinkGotRemoved: gotRemoved,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,7 +53,8 @@ func (i *inetmockAPI) StartServer() (err error) {
|
||||||
})
|
})
|
||||||
|
|
||||||
RegisterAuditServer(i.server, &auditServer{
|
RegisterAuditServer(i.server, &auditServer{
|
||||||
app: i.app,
|
logger: i.app.Logger(),
|
||||||
|
eventStream: i.app.EventStream(),
|
||||||
})
|
})
|
||||||
|
|
||||||
go i.startServerAsync(lis)
|
go i.startServerAsync(lis)
|
||||||
|
|
|
@ -27,5 +27,5 @@ type EventStream interface {
|
||||||
Emitter
|
Emitter
|
||||||
RegisterSink(s Sink) error
|
RegisterSink(s Sink) error
|
||||||
Sinks() []string
|
Sinks() []string
|
||||||
RemoveSink(name string)
|
RemoveSink(name string) (exists bool)
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,11 +72,12 @@ func (e *eventStream) Emit(ev Event) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *eventStream) RemoveSink(name string) {
|
func (e *eventStream) RemoveSink(name string) (exists bool) {
|
||||||
e.lock.Lock()
|
e.lock.Lock()
|
||||||
defer e.lock.Unlock()
|
defer e.lock.Unlock()
|
||||||
|
|
||||||
sink, exists := e.sinks[name]
|
var sink *registeredSink
|
||||||
|
sink, exists = e.sinks[name]
|
||||||
if !exists {
|
if !exists {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -84,6 +85,8 @@ func (e *eventStream) RemoveSink(name string) {
|
||||||
defer sink.lock.Unlock()
|
defer sink.lock.Unlock()
|
||||||
delete(e.sinks, name)
|
delete(e.sinks, name)
|
||||||
close(sink.downstream)
|
close(sink.downstream)
|
||||||
|
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *eventStream) RegisterSink(s Sink) error {
|
func (e *eventStream) RegisterSink(s Sink) error {
|
||||||
|
|
Loading…
Reference in a new issue