refactor: rework plugins to track their state
Some checks failed
continuous-integration/drone/push Build is failing

- introduce individual commands to manage plugins
- store plugin in state to skip loading them to memory every time
This commit is contained in:
Peter 2023-06-29 20:14:52 +02:00
parent 1261932bdc
commit 5af8ddceab
No known key found for this signature in database
24 changed files with 751 additions and 179 deletions

View file

@ -3,7 +3,8 @@ buildr {
out_dir = ".buildr/out"
logs_dir = ".buildr/logs"
plugin {
url = "https://code.icb4dc0.de/api/packages/buildr/generic/hello_world/0.0.1/hello_world.wasm"
plugin "hello_world" {
url = "https://code.icb4dc0.de/api/packages/buildr/generic/hello_world/0.0.1/hello_world.wasm"
checksum = "6c1999d4838c49a9a3a4bc604956f32878eb8b6f672186653824608edd0049ed"
}
}

View file

@ -71,6 +71,11 @@ type VaultCommander interface {
Remove(ctx context.Context, key string) error
}
type PluginCommander interface {
ListPlugins(ctx context.Context, writer io.Writer) error
UpdatePlugins(ctx context.Context) error
}
type ServerCommander interface {
ServeAPI(ctx context.Context, cfg *rpc.GrpcConfig) error
}

View file

@ -61,6 +61,10 @@ func NewApp() *App {
SilenceUsage: true,
SilenceErrors: true,
},
buildrCfg: new(config.Buildr),
pluginMgr: &plugins.Manager{
Downloader: plugins.Getter{},
},
loggingCfg: logging.NewConfig(),
}
@ -94,10 +98,11 @@ func NewApp() *App {
})...,
)
app.rootCmd.AddCommand(
ModulesCommand(app, app, manApp, app),
PluginsCommand(NewPluginApp(app.buildrCfg, app.pluginMgr, app), app),
VaultCommand(NewVaultApp(app, app, app)),
ServerCommand(NewServerApp(app, app)),
EnvCommand(NewEnvApp(app, app)),
ModulesCommand(app, app, manApp, app),
VersionCommand(),
)
@ -189,17 +194,18 @@ func (a *App) Init(ctx context.Context) (err error) {
return err
}
var s state.Store
var db *state.DB
if s, err = a.appCfg.InitState(ctx); err != nil {
if db, err = a.appCfg.InitState(ctx); err != nil {
return err
}
a.Collection, err = services.NewCollection(
services.WithTypeRegistry(registry),
services.WithIgnorer(ignorer),
services.WithStateStore(s),
services.WithCache(state.NewStateCache(a.appCfg.Cache.TTL, s)),
services.WithDB(db),
services.WithStateStore(db.State),
services.WithCache(state.NewStateCache(a.appCfg.Cache.TTL, db.State)),
services.WithDockerClientFromEnv(ctx),
)
@ -245,6 +251,9 @@ func (a *App) BootstrapModule(ctx context.Context, cat modules.Category, typeNam
func (a *App) RunModule(ctx context.Context, cat modules.Category, name string) error {
if err := a.InitAt(ctx, InitLevelParseConfig); err != nil {
if errors.Is(err, modules.ErrNoSuchModule) {
return fmt.Errorf("%w, did you run 'buildr plugins update'", err)
}
return err
}
@ -294,13 +303,13 @@ func (a *App) initBuildRConfig(ctx context.Context) error {
return err
}
a.buildrCfg.LogToStderr = a.appCfg.Execution.LogToStderr
buildrCfg := struct {
Remainder hcl2.Body `hcl:",remain"`
config.Buildr `hcl:"buildr,block"`
Remainder hcl2.Body `hcl:",remain"`
*config.Buildr `hcl:"buildr,block"`
}{
Buildr: config.Buildr{
LogToStderr: a.appCfg.Execution.LogToStderr,
},
Buildr: a.buildrCfg,
}
a.parsingState.currentEvalCtx = hcl.BasicContext(a.Vault())
@ -314,14 +323,9 @@ func (a *App) initBuildRConfig(ctx context.Context) error {
return err
}
a.buildrCfg = &buildrCfg.Buildr
a.pluginMgr = plugins.NewManager(a.StateStore(), buildrCfg.CacheDirectory)
a.pluginMgr.Init(a.PluginsRepo(), buildrCfg.CacheDirectory)
if pluginURLs, err := buildrCfg.PluginURLs(); err != nil {
return err
} else if err := a.pluginMgr.Add(ctx, pluginURLs...); err != nil {
return err
} else if err := a.pluginMgr.Register(ctx, a.TypeRegistry()); err != nil {
if err := a.pluginMgr.Register(ctx, a.TypeRegistry()); err != nil {
return err
}

View file

@ -0,0 +1,51 @@
package cmd
import (
"context"
"io"
"code.icb4dc0.de/buildr/buildr/internal/config"
"code.icb4dc0.de/buildr/buildr/internal/plugins"
"code.icb4dc0.de/buildr/buildr/internal/services"
"github.com/olekukonko/tablewriter"
)
var _ PluginCommander = (*PluginApp)(nil)
type PluginAppServiceAccess interface {
services.PluginsRepoAccessor
}
func NewPluginApp(buildrCfg *config.Buildr, pluginMgr *plugins.Manager, svcAcc PluginAppServiceAccess) *PluginApp {
return &PluginApp{
buildrCfg: buildrCfg,
pluginMgr: pluginMgr,
svcAcc: svcAcc,
}
}
type PluginApp struct {
buildrCfg *config.Buildr
pluginMgr *plugins.Manager
svcAcc PluginAppServiceAccess
}
func (p PluginApp) ListPlugins(ctx context.Context, writer io.Writer) error {
knownPlugins, err := p.svcAcc.PluginsRepo().List(ctx)
if err != nil {
return err
}
table := tablewriter.NewWriter(writer)
table.SetHeader([]string{"Name"})
for _, plugin := range knownPlugins {
table.Append([]string{plugin.Name})
}
table.Render()
return nil
}
func (p PluginApp) UpdatePlugins(ctx context.Context) error {
return p.pluginMgr.UpdatePlugins(ctx, p.buildrCfg.Plugins...)
}

View file

@ -100,9 +100,9 @@ func (c *AppConfig) InitPaths(from string) (err error) {
return nil
}
func (c *AppConfig) InitState(ctx context.Context) (state.Store, error) {
func (c *AppConfig) InitState(ctx context.Context) (*state.DB, error) {
if stateFilePath := c.State.FilePath; stateFilePath != "" {
return state.NewEntStore(ctx, c.State.FilePath)
return state.NewDB(ctx, c.State.FilePath)
}
return nil, nil

View file

@ -18,8 +18,11 @@ func ModulesCommand(
moduleCmder BootstrapModuleCommander,
) *cobra.Command {
cmd := &cobra.Command{
Use: "modules",
Short: "Interact with modules",
Use: "modules",
Aliases: []string{"mod", "mods"},
Short: "Interact with modules",
SilenceUsage: true,
SilenceErrors: true,
}
cmd.AddCommand(

48
internal/cmd/plugins.go Normal file
View file

@ -0,0 +1,48 @@
package cmd
import (
"os"
"github.com/spf13/cobra"
)
func PluginListCommand(cmder PluginCommander) *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Short: "List plugins",
Aliases: []string{"ls", "dir"},
RunE: func(cmd *cobra.Command, args []string) error {
return cmder.ListPlugins(cmd.Context(), os.Stdout)
},
}
return cmd
}
func PluginUpdateCommand(cmder PluginCommander, initializer LevelInitializer) *cobra.Command {
cmd := &cobra.Command{
Use: "update",
Short: "Update plugins",
RunE: func(cmd *cobra.Command, args []string) error {
if err := initializer.InitAt(cmd.Context(), InitLevelBuildRConfig); err != nil {
return err
}
return cmder.UpdatePlugins(cmd.Context())
},
}
return cmd
}
func PluginsCommand(cmder PluginCommander, initializer LevelInitializer) *cobra.Command {
cmd := &cobra.Command{
Use: "plugins",
Short: "Manage plugins",
SilenceUsage: true,
SilenceErrors: true,
}
cmd.AddCommand(PluginListCommand(cmder), PluginUpdateCommand(cmder, initializer))
return cmd
}

View file

@ -9,7 +9,9 @@ import (
const defaultDirectoryPermissions = 0o755
type PluginReference struct {
URL string `hcl:"url"`
Name string `hcl:",label"`
URL string `hcl:"url"`
Checksum *string `hcl:"checksum,optional"`
}
type Buildr struct {

View file

@ -15,7 +15,7 @@ func NewPlanFor(
) (*Plan, error) {
target := repo.Module(moduleType, moduleName)
if target == nil {
return nil, fmt.Errorf("no such module registered: %s/%s", moduleType, moduleName)
return nil, fmt.Errorf("%w: %s/%s", modules.ErrNoSuchModule, moduleType, moduleName)
}
entryPoint, err := factory.TaskForModule(target.ID(), repo)

View file

@ -0,0 +1,20 @@
package plugins
import (
"context"
"net/url"
"github.com/hashicorp/go-getter"
)
type Downloader interface {
Download(ctx context.Context, target string, src *url.URL) error
}
var _ Downloader = (*Getter)(nil)
type Getter struct{}
func (Getter) Download(ctx context.Context, target string, src *url.URL) error {
return getter.GetFile(target, src.String(), getter.WithContext(ctx), getter.WithMode(getter.ClientModeFile))
}

View file

@ -3,162 +3,245 @@ package plugins
import (
"bytes"
"context"
"crypto/md5"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
"net/url"
"os"
"path"
"path/filepath"
"sync"
"code.icb4dc0.de/buildr/buildr/internal/config"
"code.icb4dc0.de/buildr/buildr/internal/slices"
"github.com/go-git/go-git/v5/plumbing/hash"
"github.com/google/uuid"
"golang.org/x/exp/slog"
"golang.org/x/sync/errgroup"
"code.icb4dc0.de/buildr/buildr/internal/ioutils"
"code.icb4dc0.de/buildr/buildr/modules"
"code.icb4dc0.de/buildr/buildr/modules/plugin"
"github.com/hashicorp/go-getter"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
rpcv1 "code.icb4dc0.de/buildr/buildr/internal/generated/rpc/v1"
"code.icb4dc0.de/buildr/buildr/modules/state"
)
func NewManager(stateStore state.Store, cacheDir string) *Manager {
return &Manager{
stateStore: stateStore,
cacheDir: cacheDir,
}
}
type Manager struct {
stateStore state.Store
Downloader Downloader
lock sync.RWMutex
plugins state.Plugins
cacheDir string
}
func (p *Manager) Init(plugins state.Plugins, cacheDir string) {
p.cacheDir = cacheDir
p.plugins = plugins
}
func (p *Manager) Register(ctx context.Context, registry *modules.TypeRegistry) error {
pluginFiles, err := filepath.Glob(filepath.Join(p.cacheDir, "*"))
plugins, err := p.plugins.List(ctx)
if err != nil {
return err
}
grp, grpCtx := errgroup.WithContext(ctx)
for i := range pluginFiles {
f := pluginFiles[i]
grp, grpcCtx := errgroup.WithContext(ctx)
for i := range plugins {
registeredPlugin := plugins[i]
grp.Go(func() error {
return p.registerPlugin(grpCtx, registry, f)
knownModules, err := p.plugins.ModulesForPlugin(grpcCtx, *registeredPlugin.ID)
if err != nil {
return err
}
for _, module := range knownModules {
registry.RegisterModule(modules.ModuleFactoryFunc(func() modules.ModuleWithMeta {
return &modules.Metadata[plugin.Module]{
Module: plugin.Module{
PluginPayload: &plugin.PayloadFile{Path: registeredPlugin.LocalPath},
PluginCategory: modules.Category(module.Category),
PluginType: module.Type,
ModuleSpec: make(map[string]any),
},
}
}))
}
return nil
})
}
return grp.Wait()
}
func (p *Manager) Add(ctx context.Context, pluginURLs ...*url.URL) error {
grp, grpCtx := errgroup.WithContext(ctx)
for i := range pluginURLs {
pluginURL := pluginURLs[i]
grp.Go(func() error {
return p.fetchPlugin(grpCtx, pluginURL)
})
func (p *Manager) UpdatePlugins(ctx context.Context, refs ...config.PluginReference) error {
knownPlugins, err := p.plugins.List(ctx)
if err != nil {
return err
}
return grp.Wait()
}
pluginsByName := make(map[string]state.Plugin, len(knownPlugins))
for _, p := range knownPlugins {
pluginsByName[p.Name] = p
}
func (p *Manager) fetchPlugin(ctx context.Context, pluginURL *url.URL) error {
var shouldDownload bool
_, file := path.Split(pluginURL.Path)
stateKey := state.KeyOfStrings("plugins", pluginURL.String())
//nolint:gocritic // branches are not identical
if _, err := os.Stat(filepath.Join(p.cacheDir, file)); err != nil {
if os.IsNotExist(err) {
shouldDownload = true
for _, pluginReference := range refs {
var requiredDownload bool
if knownPlugin, ok := pluginsByName[pluginReference.Name]; !ok {
requiredDownload = true
} else {
return err
delete(pluginsByName, pluginReference.Name)
if requiredDownload, err = p.validateKnownPlugin(knownPlugin, pluginReference); err != nil {
return err
}
}
if requiredDownload {
slog.Info("Downloading plugin", slog.String("plugin", pluginReference.Name))
if downloadedPlugin, payload, err := p.downloadPlugin(ctx, pluginReference); err != nil {
return err
} else if pluginID, err := p.plugins.UpsertPlugin(ctx, *downloadedPlugin); err != nil {
return err
} else if inventory, err := plugin.DiscoverInventory(ctx, payload); err != nil {
return err
} else {
pluginModules := slices.Map(inventory.Modules, func(in *rpcv1.ModuleReference) state.PluginModule {
return moduleReferenceToPluginModule(pluginID, in)
})
if err := p.plugins.UpsertModules(ctx, pluginModules); err != nil {
return err
}
}
}
} else if _, _, err = p.stateStore.Get(ctx, stateKey); err != nil {
shouldDownload = true
} else {
shouldDownload = true
}
if shouldDownload {
if err := getter.GetFile(filepath.Join(p.cacheDir, file), pluginURL.String(), getter.WithContext(ctx)); err != nil {
return err
for _, obsoletePlugin := range pluginsByName {
slog.Info("Removing obsolete plugin", slog.String("plugin", obsoletePlugin.Name))
if err := os.Remove(obsoletePlugin.LocalPath); err != nil {
if !errors.Is(err, os.ErrNotExist) {
return err
}
}
return p.stateStore.Set(ctx, stateKey, []byte("downloaded"))
if err := p.plugins.Remove(ctx, *obsoletePlugin.ID); err != nil {
return err
}
}
return nil
}
func (p *Manager) registerPlugin(ctx context.Context, registry *modules.TypeRegistry, pluginFile string) error {
f, err := os.Open(pluginFile)
func (p *Manager) downloadPlugin(ctx context.Context, ref config.PluginReference) (pl *state.Plugin, payload []byte, err error) {
pluginURL, err := url.Parse(ref.URL)
if err != nil {
return err
return nil, nil, err
}
targetPath := filepath.Join(p.cacheDir, ref.Name)
if err := p.Downloader.Download(ctx, targetPath, pluginURL); err != nil {
return nil, nil, err
}
computedHash, payload, err := hashFile(targetPath, sha256.New())
if err != nil {
return nil, nil, err
}
if ref.Checksum != nil {
expectedHash, err := hex.DecodeString(*ref.Checksum)
if err != nil {
return nil, nil, err
}
if !bytes.Equal(expectedHash, computedHash) {
return nil, nil, fmt.Errorf("plugin checksum mismatch - expected %s, got %s", *ref.Checksum, hex.EncodeToString(computedHash))
}
}
return &state.Plugin{
Name: ref.Name,
URL: pluginURL,
LocalPath: targetPath,
Hash: computedHash,
}, payload, nil
}
func (p *Manager) validateKnownPlugin(knownPlugin state.Plugin, ref config.PluginReference) (requireUpdate bool, err error) {
// check if URL is valid
_, err = url.Parse(ref.URL)
if err != nil {
return false, err
}
// expected file does not exist
if _, err := os.Stat(knownPlugin.LocalPath); err != nil {
return true, nil
}
// by default require update if URL was changed
if ref.URL != knownPlugin.URL.String() {
return true, nil
}
// check if SHA256 is valid
computedHash, equal, err := validateHashForFile(knownPlugin.LocalPath, knownPlugin.Hash)
if err != nil {
return false, err
} else if !equal {
return true, nil
}
// if checksum differs from expected, require update
if ref.Checksum != nil {
expectedChecksum, err := hex.DecodeString(*ref.Checksum)
if err != nil {
return false, err
}
if !bytes.Equal(computedHash, expectedChecksum) {
return true, nil
}
}
return false, nil
}
func validateHashForFile(filePath string, hash []byte) (computedHash []byte, equal bool, err error) {
computedHash, _, err = hashFile(filePath, sha256.New())
if !bytes.Equal(hash, computedHash) {
return nil, false, nil
}
return computedHash, true, nil
}
func hashFile(filePath string, h hash.Hash) (hash []byte, payload []byte, err error) {
f, err := os.Open(filePath)
if err != nil {
return nil, nil, err
}
defer func() {
_ = f.Close()
}()
var (
pluginHash = md5.New()
buf = bytes.NewBuffer(nil)
)
buf := bytes.NewBuffer(nil)
w := io.MultiWriter(h, buf)
if _, err = ioutils.CopyWithPooledBuffer(io.MultiWriter(pluginHash, buf), f); err != nil {
return err
if _, err := ioutils.CopyWithPooledBuffer(w, f); err != nil {
return nil, nil, err
}
var (
inventory *rpcv1.PluginInventory
stateKey = state.KeyOfStrings("plugins", hex.EncodeToString(pluginHash.Sum(nil)))
)
if val, _, err := p.stateStore.Get(ctx, stateKey); err == nil && len(val) > 0 {
inventory = new(rpcv1.PluginInventory)
if err := proto.Unmarshal(val, inventory); err != nil {
return err
}
} else {
inventory, err = plugin.DiscoverInventory(ctx, buf.Bytes())
if err != nil {
return err
}
if inventoryData, err := proto.Marshal(inventory); err == nil {
_ = p.stateStore.Set(ctx, stateKey, inventoryData)
}
}
for _, factory := range factoriesForInventory(inventory, buf.Bytes()) {
registry.RegisterModule(factory)
}
return nil
return h.Sum(nil), buf.Bytes(), nil
}
func factoriesForInventory(inventory *rpcv1.PluginInventory, wasiPayload []byte) (factories []modules.Factory) {
factories = make([]modules.Factory, 0, len(inventory.Modules))
for i := range inventory.Modules {
m := inventory.Modules[i]
factories = append(factories, modules.ModuleFactoryFunc(func() modules.ModuleWithMeta {
return &modules.Metadata[plugin.PluginModule]{
Module: plugin.PluginModule{
PluginPayload: wasiPayload,
PluginCategory: modules.Category(m.ModuleCategory),
PluginType: m.ModuleType,
ModuleSpec: make(map[string]any),
},
}
}))
func moduleReferenceToPluginModule(pluginID uuid.UUID, in *rpcv1.ModuleReference) state.PluginModule {
return state.PluginModule{
PluginID: pluginID,
Type: in.ModuleType,
Category: in.ModuleCategory,
}
return factories
}

View file

@ -13,7 +13,7 @@ func TestPluginModule_UnmarshalJSON(t *testing.T) {
data []byte
}
tests := []struct {
want func(m *plugin.PluginModule) error
want func(m *plugin.Module) error
name string
args args
wantErr bool
@ -30,7 +30,7 @@ func TestPluginModule_UnmarshalJSON(t *testing.T) {
args: args{
data: []byte(`{"plugin_type": "test", "plugin_category": "task"}`),
},
want: func(m *plugin.PluginModule) error {
want: func(m *plugin.Module) error {
if m.PluginCategory == "" {
return errors.New("category is empty")
}
@ -48,7 +48,7 @@ func TestPluginModule_UnmarshalJSON(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
var m plugin.PluginModule
var m plugin.Module
if err := m.UnmarshalJSON(tt.args.data); (err != nil) != tt.wantErr {
t.Errorf("UnmarshalJSON() error = %v, wantErr %v", err, tt.wantErr)
}

View file

@ -1,6 +1,7 @@
package services
import (
"code.icb4dc0.de/buildr/buildr/modules/state"
"github.com/docker/docker/client"
"code.icb4dc0.de/buildr/buildr/internal/ignore"
@ -28,4 +29,8 @@ type (
DockerClientAccessor interface {
DockerClient() *client.Client
}
PluginsRepoAccessor interface {
PluginsRepo() state.Plugins
}
)

View file

@ -2,6 +2,7 @@ package services
import (
"context"
"errors"
"github.com/hashicorp/hcl/v2"
@ -45,6 +46,13 @@ func WithIgnorer(ignorer *ignore.Ignorer) CollectionOption {
})
}
func WithDB(db *state.DB) CollectionOption {
return collectionOptionFunc(func(svc *Collection) error {
svc.stateDb = db
return nil
})
}
func WithStateStore(store state.Store) CollectionOption {
return collectionOptionFunc(func(svc *Collection) error {
svc.stateStore = store
@ -101,6 +109,7 @@ type Collection struct {
dockerClient *client.Client
ignorer *ignore.Ignorer
stateStore state.Store
stateDb *state.DB
cache state.Cache
diagsWriter hcl.DiagnosticWriter
}
@ -143,10 +152,19 @@ func (c *Collection) DiagsWriter() hcl.DiagnosticWriter {
return c.diagsWriter
}
func (c *Collection) PluginsRepo() state.Plugins {
return c.stateDb.Plugins
}
func (c *Collection) Close() error {
var err error
if c.dockerClient != nil {
return c.dockerClient.Close()
err = c.dockerClient.Close()
}
return nil
if c.stateDb != nil {
err = errors.Join(c.stateDb.Close())
}
return err
}

53
modules/plugin/payload.go Normal file
View file

@ -0,0 +1,53 @@
package plugin
import (
"bytes"
"io"
"os"
"sync"
)
type PayloadReader interface {
Reader() io.Reader
Bytes() ([]byte, error)
}
var (
_ PayloadReader = (*PayloadFile)(nil)
_ PayloadReader = (*MemoryPayload)(nil)
)
type MemoryPayload []byte
func (m MemoryPayload) Reader() io.Reader {
return bytes.NewReader(m)
}
func (m MemoryPayload) Bytes() ([]byte, error) {
return m, nil
}
type PayloadFile struct {
readOnce sync.Once
payload []byte
Path string
}
func (f *PayloadFile) Reader() io.Reader {
return bytes.NewReader(f.payload)
}
func (f *PayloadFile) Bytes() ([]byte, error) {
if err := f.readPayloadFile(); err != nil {
return nil, err
}
return f.payload, nil
}
func (f *PayloadFile) readPayloadFile() (err error) {
f.readOnce.Do(func() {
f.payload, err = os.ReadFile(f.Path)
})
return
}

View file

@ -9,6 +9,8 @@ import (
"io"
"time"
"code.icb4dc0.de/buildr/buildr/internal/ioutils"
"code.icb4dc0.de/buildr/buildr/internal/hcl"
"code.icb4dc0.de/buildr/buildr/modules"
@ -28,10 +30,10 @@ import (
)
var (
_ modules.Module = (*PluginModule)(nil)
_ hcl.Marshaler = (*PluginModule)(nil)
_ json.Marshaler = (*PluginModule)(nil)
_ json.Unmarshaler = (*PluginModule)(nil)
_ modules.Module = (*Module)(nil)
_ hcl.Marshaler = (*Module)(nil)
_ json.Marshaler = (*Module)(nil)
_ json.Unmarshaler = (*Module)(nil)
)
type jsonPluginModule struct {
@ -41,29 +43,34 @@ type jsonPluginModule struct {
PluginPayload []byte `json:"plugin_payload"`
}
type PluginModule struct {
type Module struct {
ModuleSpec map[string]any `hcl:",remain"`
unwrappedSpec map[string]any
PluginCategory modules.Category
PluginType string
PluginPayload []byte
PluginPayload PayloadReader
}
func (m PluginModule) MarshalHCL(*hclwrite.Block) error {
func (m Module) MarshalHCL(*hclwrite.Block) error {
return errors.New("plugin modules cannot be marshaled to HCL right now")
}
func (m PluginModule) WithSpec(spec map[string]any) PluginModule {
func (m Module) WithSpec(spec map[string]any) Module {
m.unwrappedSpec = spec
m.ModuleSpec = spec
return m
}
func (m PluginModule) Execute(ctx modules.ExecutionContext) (err error) {
func (m Module) Execute(ctx modules.ExecutionContext) (err error) {
runtimeConfig := wazero.NewRuntimeConfig().
WithCloseOnContextDone(true)
pluginpayload, err := m.PluginPayload.Bytes()
if err != nil {
return err
}
r := wazero.NewRuntimeWithConfig(ctx, runtimeConfig)
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -107,7 +114,7 @@ func (m PluginModule) Execute(ctx modules.ExecutionContext) (err error) {
WithStderr(ctx.StdErr()).
WithFSConfig(config)
mod, err := r.InstantiateWithConfig(ctx, m.PluginPayload, moduleConfig)
mod, err := r.InstantiateWithConfig(ctx, pluginpayload, moduleConfig)
if err != nil {
return fmt.Errorf("failed to instantiate WASI module: %w", err)
}
@ -162,15 +169,15 @@ func (m PluginModule) Execute(ctx modules.ExecutionContext) (err error) {
return err
}
func (m PluginModule) Category() modules.Category {
func (m Module) Category() modules.Category {
return m.PluginCategory
}
func (m PluginModule) Type() string {
func (m Module) Type() string {
return m.PluginType
}
func (m PluginModule) Init(hclCtx *hcl2.EvalContext) (modules.Module, error) {
func (m Module) Init(hclCtx *hcl2.EvalContext) (modules.Module, error) {
m.unwrappedSpec = make(map[string]any)
for k, v := range m.ModuleSpec {
@ -198,7 +205,7 @@ func (m PluginModule) Init(hclCtx *hcl2.EvalContext) (modules.Module, error) {
return m, nil
}
func (m *PluginModule) UnmarshalJSON(data []byte) (err error) {
func (m *Module) UnmarshalJSON(data []byte) (err error) {
var jpm jsonPluginModule
if err := json.Unmarshal(data, &jpm); err != nil {
@ -207,8 +214,10 @@ func (m *PluginModule) UnmarshalJSON(data []byte) (err error) {
reader := s2.NewReader(bytes.NewReader(jpm.PluginPayload))
if m.PluginPayload, err = io.ReadAll(reader); err != nil {
if plainData, err := io.ReadAll(reader); err != nil {
return err
} else {
m.PluginPayload = MemoryPayload(plainData)
}
m.PluginCategory = jpm.PluginCategory
@ -218,11 +227,11 @@ func (m *PluginModule) UnmarshalJSON(data []byte) (err error) {
return nil
}
func (m PluginModule) MarshalJSON() ([]byte, error) {
func (m Module) MarshalJSON() ([]byte, error) {
buf := bytes.NewBuffer(nil)
writer := s2.NewWriter(buf)
if _, err := writer.Write(m.PluginPayload); err != nil {
if _, err := ioutils.CopyWithPooledBuffer(writer, m.PluginPayload.Reader()); err != nil {
return nil, err
}

View file

@ -4,6 +4,8 @@ import (
"context"
"time"
"github.com/google/uuid"
"code.icb4dc0.de/buildr/buildr/modules/state/ent"
)
@ -41,3 +43,11 @@ type Cache interface {
CacheWriter
StoreReader
}
type Plugins interface {
List(ctx context.Context) ([]Plugin, error)
Remove(ctx context.Context, id uuid.UUID) error
ModulesForPlugin(ctx context.Context, id uuid.UUID) ([]PluginModule, error)
UpsertPlugin(ctx context.Context, plugin Plugin) (pluginId uuid.UUID, err error)
UpsertModules(ctx context.Context, modules []PluginModule) error
}

44
modules/state/db.go Normal file
View file

@ -0,0 +1,44 @@
package state
import (
"context"
"fmt"
"code.icb4dc0.de/buildr/buildr/modules/state/ent"
"entgo.io/ent/dialect"
)
func NewDB(ctx context.Context, stateFilePath string) (*DB, error) {
client, err := ent.Open(dialect.SQLite, fmt.Sprintf("file:%s?_fk=1&_pragma=foreign_keys(1)", stateFilePath))
if err != nil {
return nil, fmt.Errorf("failed to open SQLite database: %w at %s", err, stateFilePath)
}
if err := client.Schema.Create(ctx); err != nil {
return nil, fmt.Errorf("failed to create schema: %w at %s", err, stateFilePath)
}
db := &DB{
client: client,
Plugins: NewEntPluginsRepo(client),
}
if stateStore, err := NewEntStore(ctx, client); err != nil {
return nil, err
} else {
db.State = stateStore
}
return db, nil
}
type DB struct {
client *ent.Client
State Store
Plugins Plugins
}
func (db *DB) Close() error {
return db.client.Close()
}

38
modules/state/models.go Normal file
View file

@ -0,0 +1,38 @@
package state
import (
"code.icb4dc0.de/buildr/buildr/modules/state/ent"
"net/url"
"github.com/google/uuid"
)
type Plugin struct {
ID *uuid.UUID
Name string
URL *url.URL
LocalPath string
Hash []byte
}
type PluginModule struct {
ID *uuid.UUID
PluginID uuid.UUID
Type string
Category string
}
func pluginFromEntity(p *ent.Plugin) (Plugin, error) {
u, err := url.Parse(p.URL)
if err != nil {
return Plugin{}, err
}
return Plugin{
ID: &p.ID,
Name: p.Name,
URL: u,
LocalPath: p.LocalPath,
Hash: p.Hash,
}, nil
}

100
modules/state/plugins.go Normal file
View file

@ -0,0 +1,100 @@
package state
import (
"context"
"code.icb4dc0.de/buildr/buildr/modules/state/ent"
"code.icb4dc0.de/buildr/buildr/modules/state/ent/plugin"
"code.icb4dc0.de/buildr/buildr/modules/state/ent/pluginmodule"
"github.com/google/uuid"
)
var _ Plugins = (*EntPluginsRepo)(nil)
func NewEntPluginsRepo(client *ent.Client) *EntPluginsRepo {
return &EntPluginsRepo{
client: client,
}
}
type EntPluginsRepo struct {
client *ent.Client
}
func (e EntPluginsRepo) Remove(ctx context.Context, id uuid.UUID) error {
return e.client.Plugin.DeleteOneID(id).Exec(ctx)
}
func (e EntPluginsRepo) ModulesForPlugin(ctx context.Context, id uuid.UUID) ([]PluginModule, error) {
modules, err := e.client.PluginModule.
Query().
Where(pluginmodule.HasPluginWith(plugin.ID(id))).
All(ctx)
if err != nil {
return nil, err
}
result := make([]PluginModule, 0, len(modules))
for _, module := range modules {
result = append(result, PluginModule{
ID: &module.ID,
PluginID: id,
Type: module.Type,
Category: module.Category,
})
}
return result, nil
}
func (e EntPluginsRepo) List(ctx context.Context) ([]Plugin, error) {
result, err := e.client.Plugin.Query().All(ctx)
if err != nil {
return nil, err
}
plugins := make([]Plugin, 0, len(result))
for _, pluginEntity := range result {
if p, err := pluginFromEntity(pluginEntity); err != nil {
return nil, err
} else {
plugins = append(plugins, p)
}
}
return plugins, nil
}
func (e EntPluginsRepo) UpsertPlugin(ctx context.Context, plugin Plugin) (pluginId uuid.UUID, err error) {
return e.client.Plugin.
Create().
SetNillableID(plugin.ID).
SetName(plugin.Name).
SetLocalPath(plugin.LocalPath).
SetURL(plugin.URL.String()).
SetHash(plugin.Hash).
OnConflict().
UpdateNewValues().
ID(ctx)
}
func (e EntPluginsRepo) UpsertModules(ctx context.Context, modules []PluginModule) error {
for _, module := range modules {
upsertErr := e.client.PluginModule.
Create().
SetNillableID(module.ID).
SetPluginID(module.PluginID).
SetType(module.Type).
SetCategory(module.Category).
OnConflict().
UpdateNewValues().
Exec(ctx)
if upsertErr != nil {
return upsertErr
}
}
return nil
}

View file

@ -0,0 +1,49 @@
package schema
import (
"entgo.io/ent"
"entgo.io/ent/dialect/entsql"
"entgo.io/ent/schema/edge"
"entgo.io/ent/schema/field"
"entgo.io/ent/schema/index"
"github.com/google/uuid"
)
// Plugin holds the schema definition for the Plugin entity.
type Plugin struct {
ent.Schema
}
// Fields of the Plugin.
func (Plugin) Fields() []ent.Field {
return []ent.Field{
field.UUID("id", uuid.UUID{}).
Default(uuid.New).
Unique(),
field.String("name").
Unique().
Immutable().
MaxLen(150),
field.String("url"),
field.String("local_path").
Unique(),
field.Bytes("hash").
Unique().
MaxLen(256),
}
}
func (Plugin) Indexes() []ent.Index {
return []ent.Index{
index.Fields("name").
Unique(),
}
}
// Edges of the Plugin.
func (Plugin) Edges() []ent.Edge {
return []ent.Edge{
edge.To("plugin_modules", PluginModule.Type).
Annotations(entsql.OnDelete(entsql.Cascade)),
}
}

View file

@ -0,0 +1,44 @@
package schema
import (
"entgo.io/ent"
"entgo.io/ent/dialect/entsql"
"entgo.io/ent/schema/edge"
"entgo.io/ent/schema/field"
"entgo.io/ent/schema/index"
"github.com/google/uuid"
)
// PluginModule holds the schema definition for the PluginModule entity.
type PluginModule struct {
ent.Schema
}
// Fields of the PluginModule.
func (PluginModule) Fields() []ent.Field {
return []ent.Field{
field.UUID("id", uuid.UUID{}).
Default(uuid.New).
Unique(),
field.String("type").
MinLen(1),
field.String("category").
MinLen(1),
}
}
func (PluginModule) Indexes() []ent.Index {
return []ent.Index{
index.Fields("type", "category").Unique(),
}
}
// Edges of the PluginModule.
func (PluginModule) Edges() []ent.Edge {
return []ent.Edge{
edge.From("plugin", Plugin.Type).
Ref("plugin_modules").
Unique().
Annotations(entsql.OnDelete(entsql.Cascade)),
}
}

View file

@ -9,44 +9,32 @@ import (
"code.icb4dc0.de/buildr/buildr/modules/state/ent"
"code.icb4dc0.de/buildr/buildr/modules/state/ent/kventry"
"entgo.io/ent/dialect"
_ "modernc.org/sqlite"
)
var ErrEmptyKey = errors.New("key may not be empty")
func NewEntStore(ctx context.Context, stateFilePath string) (*EntStore, error) {
client, err := ent.Open(dialect.SQLite, fmt.Sprintf("file:%s?_fk=1&_pragma=foreign_keys(1)", stateFilePath))
if err != nil {
return nil, fmt.Errorf("failed to open SQLite database: %w at %s", err, stateFilePath)
}
if err := client.Schema.Create(ctx); err != nil {
return nil, fmt.Errorf("failed to create schema: %w at %s", err, stateFilePath)
}
_, err = client.KVEntry.
func NewEntStore(ctx context.Context, client *ent.Client) (*EntStateRepository, error) {
_, err := client.KVEntry.
Delete().
Where(kventry.TTLLTE(time.Now().UTC())).
Exec(ctx)
if err != nil {
return nil, fmt.Errorf("failed to delete expired entries: %w at %s", err, stateFilePath)
return nil, fmt.Errorf("failed to delete expired entries: %w", err)
}
return &EntStore{
return &EntStateRepository{
client: client,
}, nil
}
var _ Store = (*EntStore)(nil)
var _ Store = (*EntStateRepository)(nil)
type EntStore struct {
type EntStateRepository struct {
client *ent.Client
}
func (s *EntStore) Get(ctx context.Context, key Key) (state []byte, meta Metadata, err error) {
func (s *EntStateRepository) Get(ctx context.Context, key Key) (state []byte, meta Metadata, err error) {
keyHash := key.Bytes()
if len(keyHash) == 0 {
return nil, Metadata{}, ErrEmptyKey
@ -70,7 +58,7 @@ func (s *EntStore) Get(ctx context.Context, key Key) (state []byte, meta Metadat
return kvEntry.State, metadataForEntry(*kvEntry), nil
}
func (s *EntStore) Set(ctx context.Context, key Key, state []byte, opts ...EntryOption) error {
func (s *EntStateRepository) Set(ctx context.Context, key Key, state []byte, opts ...EntryOption) error {
keyHash := key.Bytes()
if len(keyHash) == 0 {
return ErrEmptyKey
@ -92,10 +80,6 @@ func (s *EntStore) Set(ctx context.Context, key Key, state []byte, opts ...Entry
Exec(ctx)
}
func (s *EntStore) Close() error {
return s.client.Close()
}
func metadataForEntry(entry ent.KVEntry) Metadata {
return Metadata{
ModifiedAt: entry.ModifiedAt,

View file

@ -52,13 +52,13 @@ func TestEntStore_Set(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
store, err := state.NewEntStore(ctx, filepath.Join(t.TempDir(), "store.sqlite"))
db, err := state.NewDB(ctx, filepath.Join(t.TempDir(), "store.sqlite"))
if err != nil {
t.Errorf("NewEntStore() error = %v", err)
return
}
if err := store.Set(ctx, tt.args.key, tt.args.state); (err != nil) != tt.wantErr {
if err := db.State.Set(ctx, tt.args.key, tt.args.state); (err != nil) != tt.wantErr {
t.Errorf("Set() error = %v, wantErr %v", err, tt.wantErr)
}
})
@ -70,18 +70,19 @@ func TestEntStore_Get_NonExisting(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
store, err := state.NewEntStore(ctx, filepath.Join(t.TempDir(), "store.sqlite"))
db, err := state.NewDB(ctx, filepath.Join(t.TempDir(), "store.sqlite"))
if err != nil {
t.Errorf("NewEntStore() error = %v", err)
return
}
key := state.KeyOfStrings(
"script_state",
modules.CategoryTask,
"go_test",
)
gotState, _, err := store.Get(ctx, key)
gotState, _, err := db.State.Get(ctx, key)
if err != nil {
t.Errorf("Get() error = %v", err)
return
@ -97,7 +98,7 @@ func TestEntStore_SetGet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
store, err := state.NewEntStore(ctx, filepath.Join(t.TempDir(), "store.sqlite"))
db, err := state.NewDB(ctx, filepath.Join(t.TempDir(), "store.sqlite"))
if err != nil {
t.Errorf("NewEntStore() error = %v", err)
return
@ -108,12 +109,12 @@ func TestEntStore_SetGet(t *testing.T) {
"go_test",
)
if err := store.Set(ctx, key, []byte(`{}`)); err != nil {
if err := db.State.Set(ctx, key, []byte(`{}`)); err != nil {
t.Errorf("Set() error = %v", err)
return
}
gotState, gotMeta, err := store.Get(ctx, key)
gotState, gotMeta, err := db.State.Get(ctx, key)
if err != nil {
t.Errorf("Get() error = %v", err)
return