mirror of
https://github.com/TecharoHQ/anubis.git
synced 2026-02-08 21:09:41 -06:00
feat(lib/store): add bbolt store implementation
Signed-off-by: Xe Iaso <me@xeiaso.net>
This commit is contained in:
1
go.mod
1
go.mod
@@ -110,6 +110,7 @@ require (
|
||||
github.com/xanzy/ssh-agent v0.3.3 // indirect
|
||||
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
|
||||
gitlab.com/digitalxero/go-conventional-commit v1.0.7 // indirect
|
||||
go.etcd.io/bbolt v1.4.2 // indirect
|
||||
go.yaml.in/yaml/v2 v2.4.2 // indirect
|
||||
go.yaml.in/yaml/v3 v3.0.3 // indirect
|
||||
golang.org/x/crypto v0.39.0 // indirect
|
||||
|
||||
2
go.sum
2
go.sum
@@ -317,6 +317,8 @@ github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBi
|
||||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||
gitlab.com/digitalxero/go-conventional-commit v1.0.7 h1:8/dO6WWG+98PMhlZowt/YjuiKhqhGlOCwlIV8SqqGh8=
|
||||
gitlab.com/digitalxero/go-conventional-commit v1.0.7/go.mod h1:05Xc2BFsSyC5tKhK0y+P3bs0AwUtNuTp+mTpbCU/DZ0=
|
||||
go.etcd.io/bbolt v1.4.2 h1:IrUHp260R8c+zYx/Tm8QZr04CX+qWS5PGfPdevhdm1I=
|
||||
go.etcd.io/bbolt v1.4.2/go.mod h1:Is8rSHO/b4f3XigBC0lL0+4FwAQv3HXEEIgFMuKHceM=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
|
||||
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/TecharoHQ/anubis/lib/store"
|
||||
_ "github.com/TecharoHQ/anubis/lib/store/all"
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
package config_test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/TecharoHQ/anubis/lib/policy/config"
|
||||
_ "github.com/TecharoHQ/anubis/lib/store/all"
|
||||
"github.com/TecharoHQ/anubis/lib/store/bbolt"
|
||||
)
|
||||
|
||||
func TestStoreValid(t *testing.T) {
|
||||
@@ -25,6 +26,21 @@ func TestStoreValid(t *testing.T) {
|
||||
Backend: "memory",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "bbolt backend",
|
||||
input: config.Store{
|
||||
Backend: "bbolt",
|
||||
Parameters: json.RawMessage(`{"path": "/tmp/foo", "bucket": "bar"}`),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "bbolt backend no path",
|
||||
input: config.Store{
|
||||
Backend: "bbolt",
|
||||
Parameters: json.RawMessage(`{"path": "", "bucket": "bar"}`),
|
||||
},
|
||||
err: bbolt.ErrMissingPath,
|
||||
},
|
||||
{
|
||||
name: "unknown backend",
|
||||
input: config.Store{
|
||||
|
||||
@@ -4,5 +4,6 @@
|
||||
package all
|
||||
|
||||
import (
|
||||
_ "github.com/TecharoHQ/anubis/lib/store/bbolt"
|
||||
_ "github.com/TecharoHQ/anubis/lib/store/memory"
|
||||
)
|
||||
|
||||
142
lib/store/bbolt/bbolt.go
Normal file
142
lib/store/bbolt/bbolt.go
Normal file
@@ -0,0 +1,142 @@
|
||||
package bbolt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/TecharoHQ/anubis/lib/store"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrBucketDoesNotExist = errors.New("bbolt: bucket does not exist")
|
||||
ErrNotExists = errors.New("bbolt: value does not exist in store")
|
||||
)
|
||||
|
||||
type Item struct {
|
||||
Data []byte `json:"data"`
|
||||
Expires time.Time `json:"expires"`
|
||||
}
|
||||
|
||||
type Store struct {
|
||||
bucket []byte
|
||||
bdb *bbolt.DB
|
||||
}
|
||||
|
||||
func (s *Store) Delete(ctx context.Context, key string) error {
|
||||
return s.bdb.Update(func(tx *bbolt.Tx) error {
|
||||
bkt := tx.Bucket(s.bucket)
|
||||
if bkt == nil {
|
||||
return fmt.Errorf("%w: %q", ErrBucketDoesNotExist, string(s.bucket))
|
||||
}
|
||||
|
||||
if bkt.Get([]byte(key)) == nil {
|
||||
return fmt.Errorf("%w: %q", ErrNotExists, key)
|
||||
}
|
||||
|
||||
return bkt.Delete([]byte(key))
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Store) Get(ctx context.Context, key string) ([]byte, error) {
|
||||
var i Item
|
||||
|
||||
if err := s.bdb.View(func(tx *bbolt.Tx) error {
|
||||
bkt := tx.Bucket(s.bucket)
|
||||
if bkt == nil {
|
||||
return fmt.Errorf("%w: %q", ErrBucketDoesNotExist, string(s.bucket))
|
||||
}
|
||||
|
||||
bucketData := bkt.Get([]byte(key))
|
||||
if bucketData == nil {
|
||||
return fmt.Errorf("%w: %q", store.ErrNotFound, key)
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(bucketData, &i); err != nil {
|
||||
return fmt.Errorf("%w: %w", store.ErrCantDecode, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if time.Now().After(i.Expires) {
|
||||
go s.Delete(context.Background(), key)
|
||||
return nil, fmt.Errorf("%w: %q", store.ErrNotFound, key)
|
||||
}
|
||||
|
||||
return i.Data, nil
|
||||
}
|
||||
|
||||
func (s *Store) Set(ctx context.Context, key string, value []byte, expiry time.Duration) error {
|
||||
i := Item{
|
||||
Data: value,
|
||||
Expires: time.Now().Add(expiry),
|
||||
}
|
||||
|
||||
data, err := json.Marshal(i)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: %w", store.ErrCantEncode, err)
|
||||
}
|
||||
|
||||
return s.bdb.Update(func(tx *bbolt.Tx) error {
|
||||
bkt := tx.Bucket(s.bucket)
|
||||
if bkt == nil {
|
||||
return fmt.Errorf("%w: %q", ErrBucketDoesNotExist, string(s.bucket))
|
||||
}
|
||||
|
||||
return bkt.Put([]byte(key), data)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Store) cleanup(ctx context.Context) error {
|
||||
now := time.Now()
|
||||
|
||||
return s.bdb.Update(func(tx *bbolt.Tx) error {
|
||||
bkt := tx.Bucket(s.bucket)
|
||||
if bkt == nil {
|
||||
return fmt.Errorf("cache bucket %q does not exist", string(s.bucket))
|
||||
}
|
||||
|
||||
return bkt.ForEach(func(k, v []byte) error {
|
||||
var i Item
|
||||
|
||||
data := bkt.Get(k)
|
||||
if data == nil {
|
||||
return fmt.Errorf("%s in Cache bucket does not exist???", string(k))
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(data, &i); err != nil {
|
||||
return fmt.Errorf("can't unmarshal data at key %s: %w", string(k), err)
|
||||
}
|
||||
|
||||
if now.After(i.Expires) {
|
||||
return bkt.Delete(k)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func (s *Store) cleanupThread(ctx context.Context) {
|
||||
t := time.NewTicker(5 * time.Minute)
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
if err := s.cleanup(ctx); err != nil {
|
||||
slog.Error("error during bbolt cleanup", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
23
lib/store/bbolt/bbolt_test.go
Normal file
23
lib/store/bbolt/bbolt_test.go
Normal file
@@ -0,0 +1,23 @@
|
||||
package bbolt
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/TecharoHQ/anubis/lib/store/storetest"
|
||||
)
|
||||
|
||||
func TestImpl(t *testing.T) {
|
||||
path := filepath.Join(t.TempDir(), "db")
|
||||
t.Log(path)
|
||||
data, err := json.Marshal(Config{
|
||||
Path: path,
|
||||
Bucket: "anubis",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
storetest.Common(t, Factory{}, json.RawMessage(data))
|
||||
}
|
||||
100
lib/store/bbolt/factory.go
Normal file
100
lib/store/bbolt/factory.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package bbolt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/TecharoHQ/anubis/lib/store"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrMissingPath = errors.New("bbolt: path is missing from config")
|
||||
ErrCantWriteToPath = errors.New("bbolt: can't write to path")
|
||||
)
|
||||
|
||||
func init() {
|
||||
store.Register("bbolt", Factory{})
|
||||
}
|
||||
|
||||
type Factory struct{}
|
||||
|
||||
func (Factory) Build(ctx context.Context, data json.RawMessage) (store.Interface, error) {
|
||||
var config Config
|
||||
if err := json.Unmarshal([]byte(data), &config); err != nil {
|
||||
return nil, fmt.Errorf("%w: %w", store.ErrBadConfig, err)
|
||||
}
|
||||
|
||||
if err := config.Valid(); err != nil {
|
||||
return nil, fmt.Errorf("%w: %w", store.ErrBadConfig, err)
|
||||
}
|
||||
|
||||
if config.Bucket == "" {
|
||||
config.Bucket = "anubis"
|
||||
}
|
||||
|
||||
bdb, err := bbolt.Open(config.Path, 0600, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't open bbolt database %s: %w", config.Path, err)
|
||||
}
|
||||
|
||||
if err := bdb.Update(func(tx *bbolt.Tx) error {
|
||||
if _, err := tx.CreateBucketIfNotExists([]byte(config.Bucket)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("can't create bbolt bucket %q: %w", config.Bucket, err)
|
||||
}
|
||||
|
||||
result := &Store{
|
||||
bdb: bdb,
|
||||
bucket: []byte(config.Bucket),
|
||||
}
|
||||
|
||||
go result.cleanupThread(ctx)
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (Factory) Valid(data json.RawMessage) error {
|
||||
var config Config
|
||||
if err := json.Unmarshal([]byte(data), &config); err != nil {
|
||||
return fmt.Errorf("%w: %w", store.ErrBadConfig, err)
|
||||
}
|
||||
|
||||
if err := config.Valid(); err != nil {
|
||||
return fmt.Errorf("%w: %w", store.ErrBadConfig, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Path string `json:"path"`
|
||||
Bucket string `json:"bucket,omitempty"`
|
||||
}
|
||||
|
||||
func (c Config) Valid() error {
|
||||
var errs []error
|
||||
|
||||
if c.Path == "" {
|
||||
errs = append(errs, ErrMissingPath)
|
||||
} else {
|
||||
dir := filepath.Dir(c.Path)
|
||||
if err := os.WriteFile(filepath.Join(dir, ".test-file"), []byte(""), 0600); err != nil {
|
||||
errs = append(errs, ErrCantWriteToPath)
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) != 0 {
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
50
lib/store/bbolt/factory_test.go
Normal file
50
lib/store/bbolt/factory_test.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package bbolt
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestFactoryValid(t *testing.T) {
|
||||
f := Factory{}
|
||||
|
||||
t.Run("bad config", func(t *testing.T) {
|
||||
if err := f.Valid(json.RawMessage(`}`)); err == nil {
|
||||
t.Error("wanted parsing failure but got a successful result")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("invalid config", func(t *testing.T) {
|
||||
for _, tt := range []struct {
|
||||
name string
|
||||
cfg Config
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "missing path",
|
||||
cfg: Config{},
|
||||
err: ErrMissingPath,
|
||||
},
|
||||
{
|
||||
name: "unwritable folder",
|
||||
cfg: Config{
|
||||
Path: filepath.Join("/", "testdb"),
|
||||
},
|
||||
err: ErrCantWriteToPath,
|
||||
},
|
||||
} {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
data, err := json.Marshal(tt.cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := f.Valid(json.RawMessage(data)); !errors.Is(err, tt.err) {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -20,6 +20,9 @@ var (
|
||||
// ErrCantEncode is returned when a store adaptor cannot encode the value into
|
||||
// the format that the store uses.
|
||||
ErrCantEncode = errors.New("store: can't encode value")
|
||||
|
||||
// ErrBadConfig is returned when a store adaptor's configuration is invalid.
|
||||
ErrBadConfig = errors.New("store: configuration is invalid")
|
||||
)
|
||||
|
||||
// Interface defines the calls that Anubis uses for storage in a local or remote
|
||||
@@ -36,9 +39,7 @@ type Interface interface {
|
||||
Set(ctx context.Context, key string, value []byte, expiry time.Duration) error
|
||||
}
|
||||
|
||||
func z[T any]() T {
|
||||
return *new(T)
|
||||
}
|
||||
func z[T any]() T { return *new(T) }
|
||||
|
||||
type JSON[T any] struct {
|
||||
Underlying Interface
|
||||
|
||||
@@ -7,5 +7,5 @@ import (
|
||||
)
|
||||
|
||||
func TestImpl(t *testing.T) {
|
||||
storetest.Common(t, New(t.Context()))
|
||||
storetest.Common(t, factory{}, nil)
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package storetest
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -9,7 +10,16 @@ import (
|
||||
"github.com/TecharoHQ/anubis/lib/store"
|
||||
)
|
||||
|
||||
func Common(t *testing.T, s store.Interface) {
|
||||
func Common(t *testing.T, f store.Factory, config json.RawMessage) {
|
||||
if err := f.Valid(config); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s, err := f.Build(t.Context(), config)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, tt := range []struct {
|
||||
name string
|
||||
doer func(t *testing.T, s store.Interface) error
|
||||
|
||||
Reference in New Issue
Block a user