goveal/events/event_hub.go

157 lines
3.2 KiB
Go
Raw Normal View History

package events
import (
"encoding/hex"
"fmt"
"hash"
"io"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"github.com/google/uuid"
2022-03-15 13:53:24 +00:00
log "github.com/sirupsen/logrus"
2022-09-25 12:23:22 +00:00
"code.icb4dc0.de/prskr/goveal/fs"
)
const (
baseDecimal = 10
)
type (
2021-12-22 19:19:05 +00:00
EventMutation interface {
OnEvent(in ContentEvent, ev fs.Event) (out ContentEvent)
}
ContentEvent struct {
File string `json:"file"`
FileNameHash string `json:"fileNameHash"`
Timestamp string `json:"ts"`
ForceReload bool `json:"forceReload"`
2021-12-22 19:19:05 +00:00
ReloadConfig bool `json:"reloadConfig"`
}
EventSource interface {
io.Closer
fs.FS
Events() chan fs.Event
}
EventHandler interface {
OnEvent(ev ContentEvent) error
2022-03-15 13:53:24 +00:00
Close() error
}
2022-03-15 13:53:24 +00:00
EventHandlerFunc func(ev ContentEvent) error
2021-12-22 19:19:05 +00:00
MutationReloadForFile string
MutationConfigReloadForFile string
)
2021-12-22 19:19:05 +00:00
func (t MutationReloadForFile) OnEvent(in ContentEvent, ev fs.Event) (out ContentEvent) {
2022-03-15 13:53:24 +00:00
if strings.EqualFold(filepath.Base(ev.File), filepath.Base(string(t))) {
2021-12-22 19:19:05 +00:00
in.ForceReload = true
}
return in
}
2021-12-22 19:19:05 +00:00
func (t MutationConfigReloadForFile) OnEvent(in ContentEvent, ev fs.Event) (out ContentEvent) {
if strings.EqualFold(filepath.Base(ev.File), string(t)) {
in.ReloadConfig = true
}
return in
}
func (f EventHandlerFunc) OnEvent(ev ContentEvent) error {
return f(ev)
}
2022-03-15 13:53:24 +00:00
func NewEventHub(logger *log.Logger, eventSource EventSource, fileNameHash hash.Hash, mutations ...EventMutation) *EventHub {
hub := &EventHub{
2022-03-15 13:53:24 +00:00
Logger: logger,
2021-12-22 19:19:05 +00:00
FileNameHash: fileNameHash,
mutations: mutations,
source: eventSource,
2022-03-15 13:53:24 +00:00
subscriptions: make(map[uuid.UUID]EventHandler),
2021-12-22 19:19:05 +00:00
done: make(chan struct{}),
}
go hub.processEvents()
return hub
}
type EventHub struct {
2021-12-22 19:19:05 +00:00
FileNameHash hash.Hash
2022-03-15 13:53:24 +00:00
Logger *log.Logger
2021-12-22 19:19:05 +00:00
mutations []EventMutation
lock sync.RWMutex
done chan struct{}
source EventSource
2022-03-15 13:53:24 +00:00
subscriptions map[uuid.UUID]EventHandler
}
2022-03-15 13:53:24 +00:00
func (h *EventHub) Subscribe(handler EventHandler) (id uuid.UUID) {
h.lock.Lock()
defer h.lock.Unlock()
clientID := uuid.New()
2022-03-15 13:53:24 +00:00
h.subscriptions[clientID] = handler
2022-03-15 13:53:24 +00:00
return clientID
}
2022-03-15 13:53:24 +00:00
func (h *EventHub) Unsubscribe(id uuid.UUID) (err error) {
h.lock.Lock()
defer h.lock.Unlock()
2022-03-15 13:53:24 +00:00
if handler, ok := h.subscriptions[id]; ok {
err = handler.Close()
delete(h.subscriptions, id)
return err
}
return nil
}
func (h *EventHub) Close() error {
close(h.done)
return h.source.Close()
}
func (h *EventHub) processEvents() {
events := h.source.Events()
for {
select {
case ev, more := <-events:
if !more {
return
}
h.notifySubscribers(ev)
case _, more := <-h.done:
if !more {
return
}
}
}
}
func (h *EventHub) notifySubscribers(ev fs.Event) {
h.lock.RLock()
defer h.lock.RUnlock()
ce := ContentEvent{
File: fmt.Sprintf("/%s", ev.File),
Timestamp: strconv.FormatInt(ev.Timestamp.Unix(), baseDecimal),
FileNameHash: hex.EncodeToString(h.FileNameHash.Sum([]byte(path.Base(ev.File)))),
}
2021-12-22 19:19:05 +00:00
for idx := range h.mutations {
ce = h.mutations[idx].OnEvent(ce, ev)
}
for _, handler := range h.subscriptions {
if err := handler.OnEvent(ce); err != nil {
2022-03-15 13:53:24 +00:00
h.Logger.Errorf("Failed to propagate event: %v", err)
}
}
}