goveal/events/event_hub.go

153 lines
3 KiB
Go

package events
import (
"encoding/hex"
"fmt"
"hash"
"io"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"github.com/google/uuid"
"github.com/baez90/goveal/fs"
)
const (
baseDecimal = 10
)
type (
EventMutation interface {
OnEvent(in ContentEvent, ev fs.Event) (out ContentEvent)
}
ContentEvent struct {
File string `json:"file"`
FileNameHash string `json:"fileNameHash"`
Timestamp string `json:"ts"`
ForceReload bool `json:"forceReload"`
ReloadConfig bool `json:"reloadConfig"`
}
EventSource interface {
io.Closer
fs.FS
Events() chan fs.Event
}
EventHandler interface {
OnEvent(ev ContentEvent) error
}
EventHandlerFunc func(ev ContentEvent) error
subscription struct {
EventHandler
OnError chan error
}
MutationReloadForFile string
MutationConfigReloadForFile string
)
func (t MutationReloadForFile) OnEvent(in ContentEvent, ev fs.Event) (out ContentEvent) {
if strings.EqualFold(filepath.Base(ev.File), string(t)) {
in.ForceReload = true
}
return in
}
func (t MutationConfigReloadForFile) OnEvent(in ContentEvent, ev fs.Event) (out ContentEvent) {
if strings.EqualFold(filepath.Base(ev.File), string(t)) {
in.ReloadConfig = true
}
return in
}
func (f EventHandlerFunc) OnEvent(ev ContentEvent) error {
return f(ev)
}
func NewEventHub(eventSource EventSource, fileNameHash hash.Hash, mutations ...EventMutation) *EventHub {
hub := &EventHub{
FileNameHash: fileNameHash,
mutations: mutations,
source: eventSource,
subscriptions: make(map[uuid.UUID]*subscription),
done: make(chan struct{}),
}
go hub.processEvents()
return hub
}
type EventHub struct {
FileNameHash hash.Hash
mutations []EventMutation
lock sync.RWMutex
done chan struct{}
source EventSource
subscriptions map[uuid.UUID]*subscription
}
func (h *EventHub) Subscribe(handler EventHandler) (id uuid.UUID, onError <-chan error) {
h.lock.Lock()
defer h.lock.Unlock()
s := &subscription{
EventHandler: handler,
OnError: make(chan error),
}
clientID := uuid.New()
h.subscriptions[clientID] = s
return clientID, s.OnError
}
func (h *EventHub) Unsubscribe(id uuid.UUID) {
h.lock.Lock()
defer h.lock.Unlock()
delete(h.subscriptions, id)
}
func (h *EventHub) Close() error {
close(h.done)
return h.source.Close()
}
func (h *EventHub) processEvents() {
events := h.source.Events()
for {
select {
case ev, more := <-events:
if !more {
return
}
h.notifySubscribers(ev)
case _, more := <-h.done:
if !more {
return
}
}
}
}
func (h *EventHub) notifySubscribers(ev fs.Event) {
h.lock.RLock()
defer h.lock.RUnlock()
ce := ContentEvent{
File: fmt.Sprintf("/%s", ev.File),
Timestamp: strconv.FormatInt(ev.Timestamp.Unix(), baseDecimal),
FileNameHash: hex.EncodeToString(h.FileNameHash.Sum([]byte(path.Base(ev.File)))),
}
for idx := range h.mutations {
ce = h.mutations[idx].OnEvent(ce, ev)
}
for _, handler := range h.subscriptions {
if err := handler.OnEvent(ce); err != nil {
handler.OnError <- err
}
}
}