2021-12-22 10:52:02 +00:00
|
|
|
package api
|
|
|
|
|
|
|
|
import (
|
2022-03-15 13:53:24 +00:00
|
|
|
"bytes"
|
2021-12-22 10:52:02 +00:00
|
|
|
"encoding/json"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
2022-03-15 13:53:24 +00:00
|
|
|
"net/http"
|
2021-12-22 10:52:02 +00:00
|
|
|
"time"
|
|
|
|
|
2022-03-15 13:53:24 +00:00
|
|
|
"github.com/julienschmidt/httprouter"
|
2021-12-22 10:52:02 +00:00
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
|
2022-09-25 12:23:22 +00:00
|
|
|
"code.icb4dc0.de/prskr/goveal/events"
|
2021-12-22 10:52:02 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
type ContentEventHandler chan events.ContentEvent
|
|
|
|
|
|
|
|
func (h ContentEventHandler) OnEvent(ce events.ContentEvent) error {
|
|
|
|
const enqueueTimeout = 50 * time.Millisecond
|
|
|
|
select {
|
|
|
|
case h <- ce:
|
|
|
|
return nil
|
|
|
|
case <-time.After(enqueueTimeout):
|
|
|
|
return errors.New("failed to enqueue due to timeout")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-03-15 13:53:24 +00:00
|
|
|
func (h ContentEventHandler) Close() (err error) {
|
|
|
|
defer func() {
|
|
|
|
if rec := recover(); rec != nil {
|
|
|
|
err = fmt.Errorf("failed to close event handler: %v", rec)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
close(h)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2021-12-22 10:52:02 +00:00
|
|
|
type Events struct {
|
|
|
|
logger *log.Logger
|
|
|
|
hub *events.EventHub
|
|
|
|
}
|
|
|
|
|
2022-03-15 13:53:24 +00:00
|
|
|
func RegisterEventsAPI(router *httprouter.Router, hub *events.EventHub, logger *log.Logger) {
|
2021-12-22 10:52:02 +00:00
|
|
|
ev := &Events{hub: hub, logger: logger}
|
2022-03-15 13:53:24 +00:00
|
|
|
router.GET("/api/v1/events", ev.EventHandler)
|
2021-12-22 10:52:02 +00:00
|
|
|
}
|
|
|
|
|
2022-03-15 13:53:24 +00:00
|
|
|
func (e *Events) EventHandler(writer http.ResponseWriter, req *http.Request, _ httprouter.Params) {
|
|
|
|
writer.Header().Set("Content-Type", "text/event-stream")
|
|
|
|
writer.Header().Set("Cache-Control", "no-cache")
|
|
|
|
writer.Header().Set("Connection", "keep-alive")
|
|
|
|
writer.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
|
writer.Header().Set("Transfer-Encoding", "chunked")
|
|
|
|
writer.Header().Set("Access-Control-Allow-Headers", "Cache-Control")
|
|
|
|
writer.Header().Set("Access-Control-Allow-Credentials", "true")
|
|
|
|
|
2021-12-22 10:52:02 +00:00
|
|
|
var (
|
2022-03-15 13:53:24 +00:00
|
|
|
handler = make(ContentEventHandler)
|
|
|
|
clientID = e.hub.Subscribe(handler)
|
|
|
|
buf = new(bytes.Buffer)
|
|
|
|
enc = json.NewEncoder(buf)
|
2021-12-22 10:52:02 +00:00
|
|
|
)
|
|
|
|
|
2022-03-15 13:53:24 +00:00
|
|
|
defer func() {
|
|
|
|
if err := e.hub.Unsubscribe(clientID); err != nil {
|
|
|
|
e.logger.Warnf("Error occurred while unsubscribing: %v", err)
|
|
|
|
}
|
|
|
|
}()
|
2021-12-22 10:52:02 +00:00
|
|
|
|
2022-03-15 13:53:24 +00:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case ev := <-handler:
|
|
|
|
if err := enc.Encode(ev); err != nil {
|
2021-12-22 10:52:02 +00:00
|
|
|
e.logger.Errorf("Failed to marshal to JSON: %v", err)
|
|
|
|
continue
|
2022-03-15 13:53:24 +00:00
|
|
|
} else if _, err = fmt.Fprintf(writer, "data: %s\n\n", buf.String()); err != nil {
|
2021-12-22 10:52:02 +00:00
|
|
|
e.logger.Errorf("Failed to write to client: %v", err)
|
2022-03-15 13:53:24 +00:00
|
|
|
return
|
|
|
|
} else if f, ok := writer.(http.Flusher); !ok {
|
|
|
|
e.logger.Errorf("Cannot flush data")
|
|
|
|
writer.WriteHeader(http.StatusBadRequest)
|
|
|
|
return
|
|
|
|
} else {
|
|
|
|
f.Flush()
|
2021-12-22 10:52:02 +00:00
|
|
|
}
|
2022-03-15 13:53:24 +00:00
|
|
|
case <-req.Context().Done():
|
|
|
|
return
|
2021-12-22 10:52:02 +00:00
|
|
|
}
|
2022-03-15 13:53:24 +00:00
|
|
|
}
|
2021-12-22 10:52:02 +00:00
|
|
|
}
|