goveal/events/event_hub.go

156 lines
3.2 KiB
Go

package events
import (
"encoding/hex"
"fmt"
"hash"
"io"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"github.com/baez90/goveal/fs"
)
const (
baseDecimal = 10
)
type (
EventMutation interface {
OnEvent(in ContentEvent, ev fs.Event) (out ContentEvent)
}
ContentEvent struct {
File string `json:"file"`
FileNameHash string `json:"fileNameHash"`
Timestamp string `json:"ts"`
ForceReload bool `json:"forceReload"`
ReloadConfig bool `json:"reloadConfig"`
}
EventSource interface {
io.Closer
fs.FS
Events() chan fs.Event
}
EventHandler interface {
OnEvent(ev ContentEvent) error
Close() error
}
EventHandlerFunc func(ev ContentEvent) error
MutationReloadForFile string
MutationConfigReloadForFile string
)
func (t MutationReloadForFile) OnEvent(in ContentEvent, ev fs.Event) (out ContentEvent) {
if strings.EqualFold(filepath.Base(ev.File), filepath.Base(string(t))) {
in.ForceReload = true
}
return in
}
func (t MutationConfigReloadForFile) OnEvent(in ContentEvent, ev fs.Event) (out ContentEvent) {
if strings.EqualFold(filepath.Base(ev.File), string(t)) {
in.ReloadConfig = true
}
return in
}
func (f EventHandlerFunc) OnEvent(ev ContentEvent) error {
return f(ev)
}
func NewEventHub(logger *log.Logger, eventSource EventSource, fileNameHash hash.Hash, mutations ...EventMutation) *EventHub {
hub := &EventHub{
Logger: logger,
FileNameHash: fileNameHash,
mutations: mutations,
source: eventSource,
subscriptions: make(map[uuid.UUID]EventHandler),
done: make(chan struct{}),
}
go hub.processEvents()
return hub
}
type EventHub struct {
FileNameHash hash.Hash
Logger *log.Logger
mutations []EventMutation
lock sync.RWMutex
done chan struct{}
source EventSource
subscriptions map[uuid.UUID]EventHandler
}
func (h *EventHub) Subscribe(handler EventHandler) (id uuid.UUID) {
h.lock.Lock()
defer h.lock.Unlock()
clientID := uuid.New()
h.subscriptions[clientID] = handler
return clientID
}
func (h *EventHub) Unsubscribe(id uuid.UUID) (err error) {
h.lock.Lock()
defer h.lock.Unlock()
if handler, ok := h.subscriptions[id]; ok {
err = handler.Close()
delete(h.subscriptions, id)
return err
}
return nil
}
func (h *EventHub) Close() error {
close(h.done)
return h.source.Close()
}
func (h *EventHub) processEvents() {
events := h.source.Events()
for {
select {
case ev, more := <-events:
if !more {
return
}
h.notifySubscribers(ev)
case _, more := <-h.done:
if !more {
return
}
}
}
}
func (h *EventHub) notifySubscribers(ev fs.Event) {
h.lock.RLock()
defer h.lock.RUnlock()
ce := ContentEvent{
File: fmt.Sprintf("/%s", ev.File),
Timestamp: strconv.FormatInt(ev.Timestamp.Unix(), baseDecimal),
FileNameHash: hex.EncodeToString(h.FileNameHash.Sum([]byte(path.Base(ev.File)))),
}
for idx := range h.mutations {
ce = h.mutations[idx].OnEvent(ce, ev)
}
for _, handler := range h.subscriptions {
if err := handler.OnEvent(ce); err != nil {
h.Logger.Errorf("Failed to propagate event: %v", err)
}
}
}