searcherside/core/services/file_index_curator.go

216 lines
4.1 KiB
Go

package services
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"maps"
"os"
"sync"
"code.icb4dc0.de/prskr/searcherside/core/ports"
)
var _ ports.IndexCurator = (*FileIndexCurator)(nil)
func NewFileIndexCurator(
filePath string,
indexer ports.Indexer,
archiver ports.Archiver,
) (*FileIndexCurator, error) {
curator := &FileIndexCurator{
FilePath: filePath,
Indexer: indexer,
Archiver: archiver,
searchers: make(map[string]ports.Searcher),
}
if err := curator.load(); err != nil {
return nil, err
}
return curator, nil
}
type FileIndexCurator struct {
lock sync.RWMutex
FilePath string
Indexer ports.Indexer
Archiver ports.Archiver
state indexState
searchers map[string]ports.Searcher
}
func (f *FileIndexCurator) Ingest(ctx context.Context, request ports.IngestIndexRequest) error {
f.lock.Lock()
defer f.lock.Unlock()
result, err := f.Indexer.IngestIndex(ctx, request)
if err != nil {
return err
}
archiveErr := f.Archiver.ArchiveIndex(ports.ArchiveIndexRequest{
Path: result.Path,
Hash: request.Hash,
Type: result.Type,
})
if archiveErr != nil {
return archiveErr
}
idxKey := ports.IndexKey{
Module: request.Module,
Instance: request.Instance,
}
f.state.Add(idxKey, request.Hash, result)
return f.snapshot()
}
func (f *FileIndexCurator) Searcher(key ports.IndexKey) (searcher ports.Searcher, err error) {
f.lock.RLock()
defer f.lock.RUnlock()
idxHash, ok := f.state.Current[key]
if !ok {
return nil, fmt.Errorf("no known index for key %s/%s", key.Module, key.Instance)
}
if instance, ok := f.searchers[idxHash]; ok {
f.searchers[idxHash] = instance
return instance, nil
}
idxResult, ok := f.state.Indices[idxHash]
if !ok {
return nil, fmt.Errorf("no index result for last indexed hash: %s", idxHash)
}
defer func() {
f.searchers[idxHash] = searcher
}()
switch idxResult.Type {
case ports.IndexTypeBleve:
return NewBleveSearcher(idxResult.Path)
default:
return nil, fmt.Errorf("no searcher for index type: %s", idxResult.Type)
}
}
func (f *FileIndexCurator) Close() error {
var err error
for _, searcher := range f.searchers {
if closer, ok := searcher.(io.Closer); ok {
err = errors.Join(err, closer.Close())
}
}
return err
}
func (f *FileIndexCurator) snapshot() (err error) {
stateFile, err := os.Create(f.FilePath)
if err != nil {
return err
}
defer func() {
err = errors.Join(err, stateFile.Close())
}()
encoder := json.NewEncoder(stateFile)
return encoder.Encode(f.state)
}
func (f *FileIndexCurator) load() error {
stateFile, err := os.Open(f.FilePath)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
defer func() {
_ = stateFile.Close()
}()
decoder := json.NewDecoder(stateFile)
return decoder.Decode(&f.state)
}
var (
_ json.Marshaler = (*indexState)(nil)
_ json.Unmarshaler = (*indexState)(nil)
)
type indexState struct {
Current map[ports.IndexKey]string
Indices map[string]ports.IngestIndexResult
}
func (s indexState) MarshalJSON() ([]byte, error) {
tmp := struct {
Current map[string]string
Indices map[string]ports.IngestIndexResult
}{
Current: make(map[string]string),
}
tmp.Indices = maps.Clone(s.Indices)
for k, v := range s.Current {
if marshalledKey, err := k.MarshalText(); err != nil {
return nil, err
} else {
tmp.Current[string(marshalledKey)] = v
}
}
return json.Marshal(tmp)
}
func (s *indexState) UnmarshalJSON(bytes []byte) error {
tmp := struct {
Current map[string]string
Indices map[string]ports.IngestIndexResult
}{}
s.Current = make(map[ports.IndexKey]string)
if err := json.Unmarshal(bytes, &tmp); err != nil {
return err
}
s.Indices = maps.Clone(tmp.Indices)
for k, v := range tmp.Current {
var idxKey ports.IndexKey
if err := idxKey.UnmarshalText([]byte(k)); err != nil {
return err
}
s.Current[idxKey] = v
}
return nil
}
func (s *indexState) Add(key ports.IndexKey, hash string, result ports.IngestIndexResult) {
if s.Current == nil {
s.Current = make(map[ports.IndexKey]string)
}
if s.Indices == nil {
s.Indices = make(map[string]ports.IngestIndexResult)
}
s.Current[key] = hash
s.Indices[hash] = result
}