156 lines
3.2 KiB
Go
156 lines
3.2 KiB
Go
package events
|
|
|
|
import (
|
|
"encoding/hex"
|
|
"fmt"
|
|
"hash"
|
|
"io"
|
|
"path"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/google/uuid"
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
"code.icb4dc0.de/prskr/goveal/fs"
|
|
)
|
|
|
|
const (
|
|
baseDecimal = 10
|
|
)
|
|
|
|
type (
|
|
EventMutation interface {
|
|
OnEvent(in ContentEvent, ev fs.Event) (out ContentEvent)
|
|
}
|
|
ContentEvent struct {
|
|
File string `json:"file"`
|
|
FileNameHash string `json:"fileNameHash"`
|
|
Timestamp string `json:"ts"`
|
|
ForceReload bool `json:"forceReload"`
|
|
ReloadConfig bool `json:"reloadConfig"`
|
|
}
|
|
EventSource interface {
|
|
io.Closer
|
|
fs.FS
|
|
Events() chan fs.Event
|
|
}
|
|
EventHandler interface {
|
|
OnEvent(ev ContentEvent) error
|
|
Close() error
|
|
}
|
|
EventHandlerFunc func(ev ContentEvent) error
|
|
MutationReloadForFile string
|
|
MutationConfigReloadForFile string
|
|
)
|
|
|
|
func (t MutationReloadForFile) OnEvent(in ContentEvent, ev fs.Event) (out ContentEvent) {
|
|
if strings.EqualFold(filepath.Base(ev.File), filepath.Base(string(t))) {
|
|
in.ForceReload = true
|
|
}
|
|
return in
|
|
}
|
|
|
|
func (t MutationConfigReloadForFile) OnEvent(in ContentEvent, ev fs.Event) (out ContentEvent) {
|
|
if strings.EqualFold(filepath.Base(ev.File), string(t)) {
|
|
in.ReloadConfig = true
|
|
}
|
|
return in
|
|
}
|
|
|
|
func (f EventHandlerFunc) OnEvent(ev ContentEvent) error {
|
|
return f(ev)
|
|
}
|
|
|
|
func NewEventHub(logger *log.Logger, eventSource EventSource, fileNameHash hash.Hash, mutations ...EventMutation) *EventHub {
|
|
hub := &EventHub{
|
|
Logger: logger,
|
|
FileNameHash: fileNameHash,
|
|
mutations: mutations,
|
|
source: eventSource,
|
|
subscriptions: make(map[uuid.UUID]EventHandler),
|
|
done: make(chan struct{}),
|
|
}
|
|
|
|
go hub.processEvents()
|
|
|
|
return hub
|
|
}
|
|
|
|
type EventHub struct {
|
|
FileNameHash hash.Hash
|
|
Logger *log.Logger
|
|
mutations []EventMutation
|
|
lock sync.RWMutex
|
|
done chan struct{}
|
|
source EventSource
|
|
subscriptions map[uuid.UUID]EventHandler
|
|
}
|
|
|
|
func (h *EventHub) Subscribe(handler EventHandler) (id uuid.UUID) {
|
|
h.lock.Lock()
|
|
defer h.lock.Unlock()
|
|
|
|
clientID := uuid.New()
|
|
h.subscriptions[clientID] = handler
|
|
|
|
return clientID
|
|
}
|
|
|
|
func (h *EventHub) Unsubscribe(id uuid.UUID) (err error) {
|
|
h.lock.Lock()
|
|
defer h.lock.Unlock()
|
|
|
|
if handler, ok := h.subscriptions[id]; ok {
|
|
err = handler.Close()
|
|
delete(h.subscriptions, id)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *EventHub) Close() error {
|
|
close(h.done)
|
|
return h.source.Close()
|
|
}
|
|
|
|
func (h *EventHub) processEvents() {
|
|
events := h.source.Events()
|
|
for {
|
|
select {
|
|
case ev, more := <-events:
|
|
if !more {
|
|
return
|
|
}
|
|
h.notifySubscribers(ev)
|
|
case _, more := <-h.done:
|
|
if !more {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *EventHub) notifySubscribers(ev fs.Event) {
|
|
h.lock.RLock()
|
|
defer h.lock.RUnlock()
|
|
|
|
ce := ContentEvent{
|
|
File: fmt.Sprintf("/%s", ev.File),
|
|
Timestamp: strconv.FormatInt(ev.Timestamp.Unix(), baseDecimal),
|
|
FileNameHash: hex.EncodeToString(h.FileNameHash.Sum([]byte(path.Base(ev.File)))),
|
|
}
|
|
|
|
for idx := range h.mutations {
|
|
ce = h.mutations[idx].OnEvent(ce, ev)
|
|
}
|
|
|
|
for _, handler := range h.subscriptions {
|
|
if err := handler.OnEvent(ce); err != nil {
|
|
h.Logger.Errorf("Failed to propagate event: %v", err)
|
|
}
|
|
}
|
|
}
|