diff --git a/internal/app/app.go b/internal/app/app.go index 6c4adfe..2b17ec1 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -12,6 +12,7 @@ import ( "gitlab.com/inetmock/inetmock/internal/endpoints" "gitlab.com/inetmock/inetmock/pkg/api" "gitlab.com/inetmock/inetmock/pkg/audit" + "gitlab.com/inetmock/inetmock/pkg/audit/sink" "gitlab.com/inetmock/inetmock/pkg/cert" "gitlab.com/inetmock/inetmock/pkg/config" "gitlab.com/inetmock/inetmock/pkg/health" @@ -168,7 +169,7 @@ func NewApp(registrations ...api.Registration) (inetmockApp App, err error) { return } - err = a.eventStream.RegisterSink(audit.NewLogSink(a.Logger().Named("LogSink"))) + err = a.eventStream.RegisterSink(sink.NewLogSink(a.Logger().Named("LogSink"))) return } diff --git a/pkg/audit/event_stream_test.go b/pkg/audit/event_stream_test.go index 8e25eca..1d47fdc 100644 --- a/pkg/audit/event_stream_test.go +++ b/pkg/audit/event_stream_test.go @@ -11,6 +11,7 @@ import ( "gitlab.com/inetmock/inetmock/pkg/audit" "gitlab.com/inetmock/inetmock/pkg/audit/details" "gitlab.com/inetmock/inetmock/pkg/logging" + "gitlab.com/inetmock/inetmock/pkg/wait" ) var ( @@ -215,13 +216,13 @@ func Test_eventStream_Emit(t *testing.T) { }(tt.args.evs, emittedWaitGroup) select { - case <-waitGroupDone(emittedWaitGroup): + case <-wait.ForWaitGroupDone(emittedWaitGroup): case <-time.After(100 * time.Millisecond): t.Errorf("not all events emitted in time") } select { - case <-waitGroupDone(receivedWaitGroup): + case <-wait.ForWaitGroupDone(receivedWaitGroup): case <-time.After(5 * time.Second): t.Errorf("did not get all expected events in time") } @@ -232,14 +233,3 @@ func Test_eventStream_Emit(t *testing.T) { t.Run(tt.name, scenario(tt)) } } - -func waitGroupDone(wg *sync.WaitGroup) <-chan struct{} { - done := make(chan struct{}) - - go func(wg *sync.WaitGroup) { - wg.Wait() - close(done) - }(wg) - - return done -} diff --git a/pkg/audit/log_sink.go b/pkg/audit/sink/log_sink.go similarity index 82% rename from pkg/audit/log_sink.go rename to pkg/audit/sink/log_sink.go index a6e07f1..9b9de78 100644 --- a/pkg/audit/log_sink.go +++ b/pkg/audit/sink/log_sink.go @@ -1,8 +1,9 @@ -package audit +package sink import ( "crypto/tls" + "gitlab.com/inetmock/inetmock/pkg/audit" "gitlab.com/inetmock/inetmock/pkg/logging" "go.uber.org/zap" ) @@ -11,7 +12,7 @@ const ( logSinkName = "logging" ) -func NewLogSink(logger logging.Logger) Sink { +func NewLogSink(logger logging.Logger) audit.Sink { return &logSink{ logger: logger, } @@ -25,8 +26,8 @@ func (logSink) Name() string { return logSinkName } -func (l logSink) OnSubscribe(evs <-chan Event) { - go func(logger logging.Logger, evs <-chan Event) { +func (l logSink) OnSubscribe(evs <-chan audit.Event) { + go func(logger logging.Logger, evs <-chan audit.Event) { for ev := range evs { eventLogger := logger diff --git a/pkg/audit/log_sink_test.go b/pkg/audit/sink/log_sink_test.go similarity index 91% rename from pkg/audit/log_sink_test.go rename to pkg/audit/sink/log_sink_test.go index f201aa5..c64f200 100644 --- a/pkg/audit/log_sink_test.go +++ b/pkg/audit/sink/log_sink_test.go @@ -1,4 +1,4 @@ -package audit_test +package sink_test import ( "sync" @@ -8,7 +8,9 @@ import ( "github.com/golang/mock/gomock" logging_mock "gitlab.com/inetmock/inetmock/internal/mock/logging" "gitlab.com/inetmock/inetmock/pkg/audit" + "gitlab.com/inetmock/inetmock/pkg/audit/sink" "gitlab.com/inetmock/inetmock/pkg/logging" + "gitlab.com/inetmock/inetmock/pkg/wait" "go.uber.org/zap" ) @@ -81,7 +83,7 @@ func Test_logSink_OnSubscribe(t *testing.T) { wg := new(sync.WaitGroup) wg.Add(len(tt.events)) - logSink := audit.NewLogSink(tt.fields.loggerSetup(t, wg)) + logSink := sink.NewLogSink(tt.fields.loggerSetup(t, wg)) var evs audit.EventStream var err error if evs, err = audit.NewEventStream(logging.CreateTestLogger(t)); err != nil { @@ -99,7 +101,7 @@ func Test_logSink_OnSubscribe(t *testing.T) { select { case <-time.After(100 * time.Millisecond): t.Errorf("not all events recorded in time") - case <-waitGroupDone(wg): + case <-wait.ForWaitGroupDone(wg): } } } diff --git a/pkg/audit/writer_sink.go b/pkg/audit/sink/writer_sink.go similarity index 63% rename from pkg/audit/writer_sink.go rename to pkg/audit/sink/writer_sink.go index 803d29a..e0e2ba5 100644 --- a/pkg/audit/writer_sink.go +++ b/pkg/audit/sink/writer_sink.go @@ -1,4 +1,6 @@ -package audit +package sink + +import "gitlab.com/inetmock/inetmock/pkg/audit" type WriterSinkOption func(sink *writerCloserSink) @@ -8,7 +10,7 @@ var ( } ) -func NewWriterSink(name string, target Writer, opts ...WriterSinkOption) Sink { +func NewWriterSink(name string, target audit.Writer, opts ...WriterSinkOption) audit.Sink { sink := &writerCloserSink{ name: name, target: target, @@ -23,20 +25,16 @@ func NewWriterSink(name string, target Writer, opts ...WriterSinkOption) Sink { type writerCloserSink struct { name string - target Writer + target audit.Writer closeOnExit bool } -type syncer interface { - Sync() error -} - func (f writerCloserSink) Name() string { return f.name } -func (f writerCloserSink) OnSubscribe(evs <-chan Event) { - go func(target Writer, closeOnExit bool, evs <-chan Event) { +func (f writerCloserSink) OnSubscribe(evs <-chan audit.Event) { + go func(target audit.Writer, closeOnExit bool, evs <-chan audit.Event) { for ev := range evs { _ = target.Write(&ev) } diff --git a/pkg/audit/writer_sink_test.go b/pkg/audit/sink/writer_sink_test.go similarity index 53% rename from pkg/audit/writer_sink_test.go rename to pkg/audit/sink/writer_sink_test.go index 483a5ac..0791369 100644 --- a/pkg/audit/writer_sink_test.go +++ b/pkg/audit/sink/writer_sink_test.go @@ -1,6 +1,9 @@ -package audit_test +package sink_test import ( + "crypto/tls" + "net" + "net/http" "sync" "testing" "time" @@ -8,7 +11,45 @@ import ( "github.com/golang/mock/gomock" audit_mock "gitlab.com/inetmock/inetmock/internal/mock/audit" "gitlab.com/inetmock/inetmock/pkg/audit" + "gitlab.com/inetmock/inetmock/pkg/audit/details" + "gitlab.com/inetmock/inetmock/pkg/audit/sink" "gitlab.com/inetmock/inetmock/pkg/logging" + "gitlab.com/inetmock/inetmock/pkg/wait" +) + +var ( + testEvents = []*audit.Event{ + { + Transport: audit.TransportProtocol_TCP, + Application: audit.AppProtocol_HTTP, + SourceIP: net.ParseIP("127.0.0.1").To4(), + DestinationIP: net.ParseIP("127.0.0.1").To4(), + SourcePort: 32344, + DestinationPort: 80, + TLS: &audit.TLSDetails{ + Version: tls.VersionTLS13, + CipherSuite: tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA, + ServerName: "localhost", + }, + ProtocolDetails: details.HTTP{ + Method: "GET", + Host: "localhost", + URI: "http://localhost/asdf", + Proto: "HTTP 1.1", + Headers: http.Header{ + "Accept": []string{"application/json"}, + }, + }, + }, + { + Transport: audit.TransportProtocol_TCP, + Application: audit.AppProtocol_DNS, + SourceIP: net.ParseIP("::1").To16(), + DestinationIP: net.ParseIP("::1").To16(), + SourcePort: 32344, + DestinationPort: 80, + }, + } ) func Test_writerCloserSink_OnSubscribe(t *testing.T) { @@ -42,7 +83,7 @@ func Test_writerCloserSink_OnSubscribe(t *testing.T) { }). Times(len(tt.events)) - writerCloserSink := audit.NewWriterSink("WriterMock", writerMock, audit.WithCloseOnExit) + writerCloserSink := sink.NewWriterSink("WriterMock", writerMock, sink.WithCloseOnExit) var evs audit.EventStream var err error @@ -61,7 +102,7 @@ func Test_writerCloserSink_OnSubscribe(t *testing.T) { select { case <-time.After(100 * time.Millisecond): t.Errorf("not all events recorded in time") - case <-waitGroupDone(wg): + case <-wait.ForWaitGroupDone(wg): } } } diff --git a/pkg/audit/writer.go b/pkg/audit/writer.go index 5a318b4..15424d7 100644 --- a/pkg/audit/writer.go +++ b/pkg/audit/writer.go @@ -52,6 +52,10 @@ type eventWriter struct { byteOrder binary.ByteOrder } +type syncer interface { + Sync() error +} + func (e eventWriter) Write(ev *Event) (err error) { if ev == nil { return ErrValueMostNotBeNil diff --git a/pkg/wait/wg.go b/pkg/wait/wg.go new file mode 100644 index 0000000..11adf3f --- /dev/null +++ b/pkg/wait/wg.go @@ -0,0 +1,14 @@ +package wait + +import "sync" + +func ForWaitGroupDone(wg *sync.WaitGroup) <-chan struct{} { + done := make(chan struct{}) + + go func(wg *sync.WaitGroup) { + wg.Wait() + close(done) + }(wg) + + return done +}