217 lines
4.1 KiB
Go
217 lines
4.1 KiB
Go
|
package services
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/json"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"maps"
|
||
|
"os"
|
||
|
"sync"
|
||
|
|
||
|
"code.icb4dc0.de/prskr/searcherside/core/ports"
|
||
|
)
|
||
|
|
||
|
var _ ports.IndexCurator = (*FileIndexCurator)(nil)
|
||
|
|
||
|
func NewFileIndexCurator(
|
||
|
filePath string,
|
||
|
indexer ports.Indexer,
|
||
|
archiver ports.Archiver,
|
||
|
) (*FileIndexCurator, error) {
|
||
|
curator := &FileIndexCurator{
|
||
|
FilePath: filePath,
|
||
|
Indexer: indexer,
|
||
|
Archiver: archiver,
|
||
|
searchers: make(map[string]ports.Searcher),
|
||
|
}
|
||
|
|
||
|
if err := curator.load(); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return curator, nil
|
||
|
}
|
||
|
|
||
|
type FileIndexCurator struct {
|
||
|
lock sync.RWMutex
|
||
|
FilePath string
|
||
|
Indexer ports.Indexer
|
||
|
Archiver ports.Archiver
|
||
|
state indexState
|
||
|
searchers map[string]ports.Searcher
|
||
|
}
|
||
|
|
||
|
func (f *FileIndexCurator) Ingest(ctx context.Context, request ports.IngestIndexRequest) error {
|
||
|
f.lock.Lock()
|
||
|
defer f.lock.Unlock()
|
||
|
|
||
|
result, err := f.Indexer.IngestIndex(ctx, request)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
archiveErr := f.Archiver.ArchiveIndex(ports.ArchiveIndexRequest{
|
||
|
Path: result.Path,
|
||
|
Hash: request.Hash,
|
||
|
Type: result.Type,
|
||
|
})
|
||
|
|
||
|
if archiveErr != nil {
|
||
|
return archiveErr
|
||
|
}
|
||
|
|
||
|
idxKey := ports.IndexKey{
|
||
|
Module: request.Module,
|
||
|
Instance: request.Instance,
|
||
|
}
|
||
|
|
||
|
f.state.Add(idxKey, request.Hash, result)
|
||
|
|
||
|
return f.snapshot()
|
||
|
}
|
||
|
|
||
|
func (f *FileIndexCurator) Searcher(key ports.IndexKey) (searcher ports.Searcher, err error) {
|
||
|
f.lock.RLock()
|
||
|
defer f.lock.RUnlock()
|
||
|
|
||
|
idxHash, ok := f.state.Current[key]
|
||
|
if !ok {
|
||
|
return nil, fmt.Errorf("no known index for key %s/%s", key.Module, key.Instance)
|
||
|
}
|
||
|
|
||
|
if instance, ok := f.searchers[idxHash]; ok {
|
||
|
f.searchers[idxHash] = instance
|
||
|
return instance, nil
|
||
|
}
|
||
|
|
||
|
idxResult, ok := f.state.Indices[idxHash]
|
||
|
if !ok {
|
||
|
return nil, fmt.Errorf("no index result for last indexed hash: %s", idxHash)
|
||
|
}
|
||
|
|
||
|
defer func() {
|
||
|
f.searchers[idxHash] = searcher
|
||
|
}()
|
||
|
|
||
|
switch idxResult.Type {
|
||
|
case ports.IndexTypeBleve:
|
||
|
return NewBleveSearcher(idxResult.Path)
|
||
|
default:
|
||
|
return nil, fmt.Errorf("no searcher for index type: %s", idxResult.Type)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (f *FileIndexCurator) Close() error {
|
||
|
var err error
|
||
|
for _, searcher := range f.searchers {
|
||
|
if closer, ok := searcher.(io.Closer); ok {
|
||
|
err = errors.Join(err, closer.Close())
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (f *FileIndexCurator) snapshot() (err error) {
|
||
|
stateFile, err := os.Create(f.FilePath)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
defer func() {
|
||
|
err = errors.Join(err, stateFile.Close())
|
||
|
}()
|
||
|
|
||
|
encoder := json.NewEncoder(stateFile)
|
||
|
|
||
|
return encoder.Encode(f.state)
|
||
|
}
|
||
|
|
||
|
func (f *FileIndexCurator) load() error {
|
||
|
stateFile, err := os.Open(f.FilePath)
|
||
|
if err != nil {
|
||
|
if os.IsNotExist(err) {
|
||
|
return nil
|
||
|
}
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
defer func() {
|
||
|
_ = stateFile.Close()
|
||
|
}()
|
||
|
|
||
|
decoder := json.NewDecoder(stateFile)
|
||
|
|
||
|
return decoder.Decode(&f.state)
|
||
|
}
|
||
|
|
||
|
var (
|
||
|
_ json.Marshaler = (*indexState)(nil)
|
||
|
_ json.Unmarshaler = (*indexState)(nil)
|
||
|
)
|
||
|
|
||
|
type indexState struct {
|
||
|
Current map[ports.IndexKey]string
|
||
|
Indices map[string]ports.IngestIndexResult
|
||
|
}
|
||
|
|
||
|
func (s indexState) MarshalJSON() ([]byte, error) {
|
||
|
tmp := struct {
|
||
|
Current map[string]string
|
||
|
Indices map[string]ports.IngestIndexResult
|
||
|
}{
|
||
|
Current: make(map[string]string),
|
||
|
}
|
||
|
|
||
|
tmp.Indices = maps.Clone(s.Indices)
|
||
|
for k, v := range s.Current {
|
||
|
if marshalledKey, err := k.MarshalText(); err != nil {
|
||
|
return nil, err
|
||
|
} else {
|
||
|
tmp.Current[string(marshalledKey)] = v
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return json.Marshal(tmp)
|
||
|
}
|
||
|
|
||
|
func (s *indexState) UnmarshalJSON(bytes []byte) error {
|
||
|
tmp := struct {
|
||
|
Current map[string]string
|
||
|
Indices map[string]ports.IngestIndexResult
|
||
|
}{}
|
||
|
|
||
|
s.Current = make(map[ports.IndexKey]string)
|
||
|
|
||
|
if err := json.Unmarshal(bytes, &tmp); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
s.Indices = maps.Clone(tmp.Indices)
|
||
|
for k, v := range tmp.Current {
|
||
|
var idxKey ports.IndexKey
|
||
|
if err := idxKey.UnmarshalText([]byte(k)); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
s.Current[idxKey] = v
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *indexState) Add(key ports.IndexKey, hash string, result ports.IngestIndexResult) {
|
||
|
if s.Current == nil {
|
||
|
s.Current = make(map[ports.IndexKey]string)
|
||
|
}
|
||
|
|
||
|
if s.Indices == nil {
|
||
|
s.Indices = make(map[string]ports.IngestIndexResult)
|
||
|
}
|
||
|
|
||
|
s.Current[key] = hash
|
||
|
s.Indices[hash] = result
|
||
|
}
|