diff --git a/.github/actions/spelling/expect.txt b/.github/actions/spelling/expect.txt index 9cd15606..f5bb8ffb 100644 --- a/.github/actions/spelling/expect.txt +++ b/.github/actions/spelling/expect.txt @@ -1,4 +1,7 @@ acs +Actorified +actorifiedstore +actorify Aibrew alibaba alrest @@ -157,6 +160,7 @@ ifm Imagesift imgproxy impressum +inbox inp internets IPTo diff --git a/docs/docs/CHANGELOG.md b/docs/docs/CHANGELOG.md index 8f717c6d..a5be9d55 100644 --- a/docs/docs/CHANGELOG.md +++ b/docs/docs/CHANGELOG.md @@ -13,13 +13,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 -- Fix lock convoy problem in decaymap ([#1103](https://github.com/TecharoHQ/anubis/issues/1103)) -- Document missing environment variables in installation guide: `SLOG_LEVEL`, `COOKIE_PREFIX`, `FORCED_LANGUAGE`, and `TARGET_DISABLE_KEEPALIVE` ([#1086](https://github.com/TecharoHQ/anubis/pull/1086)) -- Add validation warning when persistent storage is used without setting signing keys -- Fixed `robots2policy` to properly group consecutive user agents into `any:` instead of only processing the last one ([#925](https://github.com/TecharoHQ/anubis/pull/925)) +- Fix lock convoy problem in decaymap ([#1103](https://github.com/TecharoHQ/anubis/issues/1103)). +- Fix lock convoy problem in bbolt by implementing the actor pattern ([#1103](https://github.com/TecharoHQ/anubis/issues/1103)). +- Document missing environment variables in installation guide: `SLOG_LEVEL`, `COOKIE_PREFIX`, `FORCED_LANGUAGE`, and `TARGET_DISABLE_KEEPALIVE` ([#1086](https://github.com/TecharoHQ/anubis/pull/1086)). +- Add validation warning when persistent storage is used without setting signing keys. +- Fixed `robots2policy` to properly group consecutive user agents into `any:` instead of only processing the last one ([#925](https://github.com/TecharoHQ/anubis/pull/925)). - Add the [`s3api` storage backend](./admin/policies.mdx#s3api) to allow Anubis to use S3 API compatible object storage as its storage backend. - Make `cmd/containerbuild` support commas for separating elements of the `--docker-tags` argument as well as newlines. -- Add the `DIFFICULTY_IN_JWT` option, which allows one to add the `difficulty` field in the JWT claims which indicates the difficulty of the token ([#1063](https://github.com/TecharoHQ/anubis/pull/1063)) +- Add the `DIFFICULTY_IN_JWT` option, which allows one to add the `difficulty` field in the JWT claims which indicates the difficulty of the token ([#1063](https://github.com/TecharoHQ/anubis/pull/1063)). - Ported the client-side JS to TypeScript to avoid egregious errors in the future. - Fixes concurrency problems with very old browsers ([#1082](https://github.com/TecharoHQ/anubis/issues/1082)). diff --git a/internal/actorify/actorify.go b/internal/actorify/actorify.go new file mode 100644 index 00000000..907a2cb9 --- /dev/null +++ b/internal/actorify/actorify.go @@ -0,0 +1,107 @@ +// Package actorify lets you transform a parallel operation into a serialized +// operation via the Actor pattern[1]. +// +// [1]: https://en.wikipedia.org/wiki/Actor_model +package actorify + +import ( + "context" + "errors" +) + +func z[Z any]() Z { + var z Z + return z +} + +var ( + // ErrActorDied is returned when the actor inbox or reply channel was closed. + ErrActorDied = errors.New("actorify: the actor inbox or reply channel was closed") +) + +// Handler is a function alias for the underlying logic the Actor should call. +type Handler[Input, Output any] func(ctx context.Context, input Input) (Output, error) + +// Actor is a serializing wrapper that runs a function in a background goroutine. +// Whenever the Call method is invoked, a message is sent to the actor's inbox and then +// the callee waits for a response. Depending on how busy the actor is, this may take +// a moment. +type Actor[Input, Output any] struct { + handler Handler[Input, Output] + inbox chan *message[Input, Output] +} + +type message[Input, Output any] struct { + ctx context.Context + arg Input + reply chan reply[Output] +} + +type reply[Output any] struct { + output Output + err error +} + +// New constructs a new Actor and starts its background thread. Cancel the context and you cancel +// the Actor. +func New[Input, Output any](ctx context.Context, handler Handler[Input, Output]) *Actor[Input, Output] { + result := &Actor[Input, Output]{ + handler: handler, + inbox: make(chan *message[Input, Output], 32), + } + + go result.handle(ctx) + + return result +} + +func (a *Actor[Input, Output]) handle(ctx context.Context) { + for { + select { + case <-ctx.Done(): + close(a.inbox) + return + case msg, ok := <-a.inbox: + if !ok { + if msg.reply != nil { + close(msg.reply) + } + + return + } + + result, err := a.handler(msg.ctx, msg.arg) + + reply := reply[Output]{ + output: result, + err: err, + } + + msg.reply <- reply + } + } +} + +// Call calls the Actor with a given Input and returns the handler's Output. +// +// This only works with unary functions by design. If you need to have more inputs, define +// a struct type to use as a container. +func (a *Actor[Input, Output]) Call(ctx context.Context, input Input) (Output, error) { + replyCh := make(chan reply[Output]) + + a.inbox <- &message[Input, Output]{ + arg: input, + reply: replyCh, + } + + select { + case reply, ok := <-replyCh: + if !ok { + return z[Output](), ErrActorDied + } + + return reply.output, reply.err + case <-ctx.Done(): + return z[Output](), context.Cause(ctx) + } +} diff --git a/lib/store/actorifiedstore.go b/lib/store/actorifiedstore.go new file mode 100644 index 00000000..d495736e --- /dev/null +++ b/lib/store/actorifiedstore.go @@ -0,0 +1,82 @@ +package store + +import ( + "context" + "time" + + "github.com/TecharoHQ/anubis/internal/actorify" +) + +type unit struct{} + +type ActorifiedStore struct { + Interface + + deleteActor *actorify.Actor[string, unit] + getActor *actorify.Actor[string, []byte] + setActor *actorify.Actor[*actorSetReq, unit] + cancel context.CancelFunc +} + +type actorSetReq struct { + key string + value []byte + expiry time.Duration +} + +func NewActorifiedStore(backend Interface) *ActorifiedStore { + ctx, cancel := context.WithCancel(context.Background()) + + result := &ActorifiedStore{ + Interface: backend, + cancel: cancel, + } + + result.deleteActor = actorify.New(ctx, result.actorDelete) + result.getActor = actorify.New(ctx, backend.Get) + result.setActor = actorify.New(ctx, result.actorSet) + + return result +} + +func (a *ActorifiedStore) Close() { a.cancel() } + +func (a *ActorifiedStore) Delete(ctx context.Context, key string) error { + if _, err := a.deleteActor.Call(ctx, key); err != nil { + return err + } + + return nil +} + +func (a *ActorifiedStore) Get(ctx context.Context, key string) ([]byte, error) { + return a.getActor.Call(ctx, key) +} + +func (a *ActorifiedStore) Set(ctx context.Context, key string, value []byte, expiry time.Duration) error { + if _, err := a.setActor.Call(ctx, &actorSetReq{ + key: key, + value: value, + expiry: expiry, + }); err != nil { + return err + } + + return nil +} + +func (a *ActorifiedStore) actorDelete(ctx context.Context, key string) (unit, error) { + if err := a.Interface.Delete(ctx, key); err != nil { + return unit{}, err + } + + return unit{}, nil +} + +func (a *ActorifiedStore) actorSet(ctx context.Context, req *actorSetReq) (unit, error) { + if err := a.Interface.Set(ctx, req.key, req.value, req.expiry); err != nil { + return unit{}, err + } + + return unit{}, nil +} diff --git a/lib/store/bbolt/factory.go b/lib/store/bbolt/factory.go index 74c924c3..04879b7e 100644 --- a/lib/store/bbolt/factory.go +++ b/lib/store/bbolt/factory.go @@ -48,7 +48,7 @@ func (Factory) Build(ctx context.Context, data json.RawMessage) (store.Interface go result.cleanupThread(ctx) - return result, nil + return store.NewActorifiedStore(result), nil } // Valid parses and validates the bbolt store Config or returns