reva-bump-2.41.0 (#2032)

This commit is contained in:
Viktor Scharf
2025-12-15 17:03:58 +01:00
committed by GitHub
parent 4dcecbf5c0
commit 8aac5f6318
14 changed files with 370 additions and 69 deletions

2
go.mod
View File

@@ -64,7 +64,7 @@ require (
github.com/open-policy-agent/opa v1.10.1
github.com/opencloud-eu/icap-client v0.0.0-20250930132611-28a2afe62d89
github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76
github.com/opencloud-eu/reva/v2 v2.40.1
github.com/opencloud-eu/reva/v2 v2.41.0
github.com/opensearch-project/opensearch-go/v4 v4.5.0
github.com/orcaman/concurrent-map v1.0.0
github.com/pkg/errors v0.9.1

4
go.sum
View File

@@ -963,8 +963,8 @@ github.com/opencloud-eu/inotifywaitgo v0.0.0-20251111171128-a390bae3c5e9 h1:dIft
github.com/opencloud-eu/inotifywaitgo v0.0.0-20251111171128-a390bae3c5e9/go.mod h1:JWyDC6H+5oZRdUJUgKuaye+8Ph5hEs6HVzVoPKzWSGI=
github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76 h1:vD/EdfDUrv4omSFjrinT8Mvf+8D7f9g4vgQ2oiDrVUI=
github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76/go.mod h1:pzatilMEHZFT3qV7C/X3MqOa3NlRQuYhlRhZTL+hN6Q=
github.com/opencloud-eu/reva/v2 v2.40.1 h1:QwMkbGMhwDSwfk2WxbnTpIig2BugPBaVFjWcy2DSU3U=
github.com/opencloud-eu/reva/v2 v2.40.1/go.mod h1:DGH08n2mvtsQLkt8o15FV6m51FwSJJGhjR8Ty+iIJww=
github.com/opencloud-eu/reva/v2 v2.41.0 h1:oie8+sxcA+drREXRTqm0LmfUdy/mmaa6pA6wkdF6tF4=
github.com/opencloud-eu/reva/v2 v2.41.0/go.mod h1:DGH08n2mvtsQLkt8o15FV6m51FwSJJGhjR8Ty+iIJww=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040=

View File

@@ -46,10 +46,27 @@ type Options struct {
WatchRoot string `mapstructure:"watch_root"` // base directory for the watch. events will be considered relative to this path
WatchNotificationBrokers string `mapstructure:"watch_notification_brokers"`
NatsWatcher NatsWatcherConfig `mapstructure:"natswatcher"`
// InotifyWatcher specific options
InotifyStatsFrequency time.Duration `mapstructure:"inotify_stats_frequency"`
}
// NatsWatcherConfig is the configuration needed for a NATS watcher event stream.
type NatsWatcherConfig struct {
Endpoint string `mapstructure:"address"`
Cluster string `mapstructure:"clusterID"`
Stream string `mapstructure:"stream"`
Durable string `mapstructure:"durable-name"`
TLSInsecure bool `mapstructure:"tls-insecure"`
TLSRootCACertificate string `mapstructure:"tls-root-ca-cert"`
EnableTLS bool `mapstructure:"enable-tls"`
AuthUsername string `mapstructure:"username"`
AuthPassword string `mapstructure:"password"`
MaxAckPending int `mapstructure:"max-ack-pending"`
AckWait time.Duration `mapstructure:"ack-wait"`
}
// New returns a new Options instance for the given configuration
func New(m map[string]interface{}) (*Options, error) {
// default to hybrid metadatabackend for posixfs

View File

@@ -21,6 +21,7 @@ package trashbin
import (
"context"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
@@ -589,6 +590,10 @@ func (tb *Trashbin) IsEmpty(ctx context.Context, spaceID string) bool {
}
dirItems, err := trash.ReadDir(1)
if err != nil {
if err == io.EOF {
// empty trash
return true
}
// if we cannot read the trash, we assume there are no trashed items
tb.log.Error().Err(err).Str("spaceID", spaceID).Msg("trashbin: error reading trash directory")
return true

View File

@@ -39,6 +39,7 @@ import (
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/opencloud-eu/reva/v2/pkg/events"
"github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher"
"github.com/opencloud-eu/reva/v2/pkg/storage/pkg/decomposedfs/metadata"
"github.com/opencloud-eu/reva/v2/pkg/storage/pkg/decomposedfs/metadata/prefixes"
"github.com/opencloud-eu/reva/v2/pkg/storage/pkg/decomposedfs/node"
@@ -54,16 +55,6 @@ type ScanDebouncer struct {
mutex sync.Mutex
}
type EventAction int
const (
ActionCreate EventAction = iota
ActionUpdate
ActionMove
ActionDelete
ActionMoveFrom
)
type queueItem struct {
item scanItem
timer *time.Timer
@@ -190,10 +181,10 @@ func (t *Tree) workScanQueue() {
}
// Scan scans the given path and updates the id chache
func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
func (t *Tree) Scan(path string, action watcher.EventAction, isDir bool) error {
// cases:
switch action {
case ActionCreate:
case watcher.ActionCreate:
t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("scanning path (ActionCreate)")
if !isDir {
// 1. New file (could be emitted as part of a new directory)
@@ -225,7 +216,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
})
}
case ActionUpdate:
case watcher.ActionUpdate:
t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("scanning path (ActionUpdate)")
// 3. Updated file
// -> update file unless parent directory is being rescanned
@@ -241,7 +232,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
AssimilationCounter.WithLabelValues(_labelDir, _labelUpdated).Inc()
}
case ActionMove:
case watcher.ActionMove:
t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("scanning path (ActionMove)")
// 4. Moved file
// -> update file
@@ -258,7 +249,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
AssimilationCounter.WithLabelValues(_labelDir, _labelMoved).Inc()
}
case ActionMoveFrom:
case watcher.ActionMoveFrom:
t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("scanning path (ActionMoveFrom)")
// 6. file/directory moved out of the watched directory
// -> remove from caches
@@ -279,7 +270,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
// We do not do metrics here because this has been handled in `ActionMove`
case ActionDelete:
case watcher.ActionDelete:
t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("handling deleted item")
// 7. Deleted file or directory
@@ -426,6 +417,15 @@ func (t *Tree) assimilate(item scanItem) error {
}
}
fi, err := os.Lstat(item.Path)
if err != nil {
return err
}
if !fi.IsDir() && !fi.Mode().IsRegular() {
t.log.Trace().Str("path", item.Path).Msg("skipping non-regular file")
return nil
}
if id != "" {
// the file has an id set, we already know it from the past
@@ -451,20 +451,10 @@ func (t *Tree) assimilate(item scanItem) error {
// compare metadata mtime with actual mtime. if it matches AND the path hasn't changed (move operation)
// we can skip the assimilation because the file was handled by us
fi, err := os.Lstat(item.Path)
if err != nil {
return err
}
if previousPath == item.Path && mtime.Equal(fi.ModTime()) {
return nil
}
if !fi.IsDir() && !fi.Mode().IsRegular() {
t.log.Trace().Str("path", item.Path).Msg("skipping non-regular file")
return nil
}
// was it moved or copied/restored with a clashing id?
if ok && len(parentID) > 0 && previousPath != item.Path {
_, err := os.Stat(previousPath)
@@ -675,6 +665,7 @@ assimilate:
}
var n *node.Node
sizeDiff := int64(0)
if fi.IsDir() {
// The Space's name attribute might not match the directory name. Use the name as
// it was set before. Also the space root doesn't have a 'type' attribute
@@ -712,44 +703,46 @@ assimilate:
n.SpaceRoot = &node.Node{BaseNode: node.BaseNode{SpaceID: spaceID, ID: spaceID}}
prevBlobSize, err := previousAttribs.Int64(prefixes.BlobsizeAttr)
if err == nil && prevBlobSize != fi.Size() {
// file size changed, trigger propagation of tree size changes
err = t.Propagate(context.Background(), n, fi.Size()-prevBlobSize)
if err != nil {
t.log.Error().Err(err).Str("path", path).Msg("could not propagate tree size changes")
}
if err != nil || prevBlobSize < 0 {
prevBlobSize = 0
}
if prevBlobSize != fi.Size() {
sizeDiff = fi.Size() - prevBlobSize
}
}
attributes.SetTime(prefixes.MTimeAttr, fi.ModTime())
n.SpaceRoot = &node.Node{BaseNode: node.BaseNode{SpaceID: spaceID, ID: spaceID}}
if t.options.EnableFSRevisions {
if !fi.IsDir() && t.options.EnableFSRevisions {
go func() {
// Copy the previous current version to a revision
currentNode := node.NewBaseNode(n.SpaceID, n.ID+node.CurrentIDDelimiter, t.lookup)
currentPath := currentNode.InternalPath()
stat, err := os.Stat(currentPath)
if err != nil {
t.log.Error().Err(err).Str("path", path).Str("currentPath", currentPath).Msg("could not stat current path")
return
}
revisionPath := t.lookup.VersionPath(n.SpaceID, n.ID, stat.ModTime().UTC().Format(time.RFC3339Nano))
if err == nil {
revisionPath := t.lookup.VersionPath(n.SpaceID, n.ID, stat.ModTime().UTC().Format(time.RFC3339Nano))
err = os.Rename(currentPath, revisionPath)
if err != nil {
t.log.Error().Err(err).Str("path", path).Str("revisionPath", revisionPath).Msg("could not create revision")
return
err = os.Rename(currentPath, revisionPath)
if err != nil {
t.log.Error().Err(err).Str("path", path).Str("revisionPath", revisionPath).Msg("could not create revision")
return
}
}
// Copy the new version to the current version
if err := os.MkdirAll(filepath.Dir(currentPath), 0700); err != nil {
t.log.Error().Err(err).Str("path", path).Str("currentPath", currentPath).Msg("could not create base path for current file")
return
}
w, err := os.OpenFile(currentPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
if err != nil {
t.log.Error().Err(err).Str("path", path).Str("currentPath", currentPath).Msg("could not open current path for writing")
return
}
defer w.Close()
r, err := os.OpenFile(n.InternalPath(), os.O_RDONLY, 0600)
r, err := os.OpenFile(path, os.O_RDONLY, 0600)
if err != nil {
t.log.Error().Err(err).Str("path", path).Msg("could not open file for reading")
return
@@ -775,7 +768,7 @@ assimilate:
}()
}
err = t.Propagate(context.Background(), n, 0)
err = t.Propagate(context.Background(), n, sizeDiff)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to propagate")
}

View File

@@ -8,6 +8,7 @@ import (
"encoding/json"
"path/filepath"
"github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
kafka "github.com/segmentio/kafka-go"
@@ -97,17 +98,17 @@ func (w *CephFSWatcher) Watch(topic string) {
go func() {
switch {
case mask&CEPH_MDS_NOTIFY_DELETE > 0:
err = w.tree.Scan(path, ActionDelete, isDir)
err = w.tree.Scan(path, watcher.ActionDelete, isDir)
case mask&CEPH_MDS_NOTIFY_MOVED_TO > 0:
if ev.SrcMask > 0 {
// This is a move, clean up the old path
err = w.tree.Scan(filepath.Join(w.tree.options.WatchRoot, ev.SrcPath), ActionMoveFrom, isDir)
err = w.tree.Scan(filepath.Join(w.tree.options.WatchRoot, ev.SrcPath), watcher.ActionMoveFrom, isDir)
}
err = w.tree.Scan(path, ActionMove, isDir)
err = w.tree.Scan(path, watcher.ActionMove, isDir)
case mask&CEPH_MDS_NOTIFY_CREATE > 0:
err = w.tree.Scan(path, ActionCreate, isDir)
err = w.tree.Scan(path, watcher.ActionCreate, isDir)
case mask&CEPH_MDS_NOTIFY_CLOSE_WRITE > 0:
err = w.tree.Scan(path, ActionUpdate, isDir)
err = w.tree.Scan(path, watcher.ActionUpdate, isDir)
case mask&CEPH_MDS_NOTIFY_CLOSE > 0:
// ignore, already handled by CLOSE_WRITE
default:

View File

@@ -26,6 +26,7 @@ import (
"strconv"
"time"
"github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher"
"github.com/rs/zerolog"
)
@@ -88,15 +89,15 @@ start:
go func() {
switch ev.Event {
case "CREATE":
err = w.tree.Scan(ev.Path, ActionCreate, false)
err = w.tree.Scan(ev.Path, watcher.ActionCreate, false)
case "CLOSE":
var bytesWritten int
bytesWritten, err = strconv.Atoi(ev.BytesWritten)
if err == nil && bytesWritten > 0 {
err = w.tree.Scan(ev.Path, ActionUpdate, false)
err = w.tree.Scan(ev.Path, watcher.ActionUpdate, false)
}
case "RENAME":
err = w.tree.Scan(ev.Path, ActionMove, false)
err = w.tree.Scan(ev.Path, watcher.ActionMove, false)
if warmupErr := w.tree.WarmupIDCache(ev.Path, false, false); warmupErr != nil {
w.log.Error().Err(warmupErr).Str("path", ev.Path).Msg("error warming up id cache")
}

View File

@@ -26,6 +26,7 @@ import (
"strconv"
"strings"
"github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher"
"github.com/rs/zerolog"
kafka "github.com/segmentio/kafka-go"
)
@@ -77,21 +78,21 @@ func (w *GpfsWatchFolderWatcher) Watch(topic string) {
var err error
switch {
case strings.Contains(lwev.Event, "IN_DELETE"):
err = w.tree.Scan(path, ActionDelete, isDir)
err = w.tree.Scan(path, watcher.ActionDelete, isDir)
case strings.Contains(lwev.Event, "IN_MOVE_FROM"):
err = w.tree.Scan(path, ActionMoveFrom, isDir)
err = w.tree.Scan(path, watcher.ActionMoveFrom, isDir)
case strings.Contains(lwev.Event, "IN_CREATE"):
err = w.tree.Scan(path, ActionCreate, isDir)
err = w.tree.Scan(path, watcher.ActionCreate, isDir)
case strings.Contains(lwev.Event, "IN_CLOSE_WRITE"):
bytesWritten, convErr := strconv.Atoi(lwev.BytesWritten)
if convErr == nil && bytesWritten > 0 {
err = w.tree.Scan(path, ActionUpdate, isDir)
err = w.tree.Scan(path, watcher.ActionUpdate, isDir)
}
case strings.Contains(lwev.Event, "IN_MOVED_TO"):
err = w.tree.Scan(path, ActionMove, isDir)
err = w.tree.Scan(path, watcher.ActionMove, isDir)
}
if err != nil {
w.log.Error().Err(err).Str("path", path).Msg("error scanning path")

View File

@@ -30,6 +30,7 @@ import (
"time"
"github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/options"
"github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher"
"github.com/pablodz/inotifywaitgo/inotifywaitgo"
"github.com/rs/zerolog"
slogzerolog "github.com/samber/slog-zerolog/v2"
@@ -96,15 +97,15 @@ func (iw *InotifyWatcher) Watch(path string) {
var err error
switch e {
case inotifywaitgo.DELETE:
err = iw.tree.Scan(event.Filename, ActionDelete, event.IsDir)
err = iw.tree.Scan(event.Filename, watcher.ActionDelete, event.IsDir)
case inotifywaitgo.MOVED_FROM:
err = iw.tree.Scan(event.Filename, ActionMoveFrom, event.IsDir)
err = iw.tree.Scan(event.Filename, watcher.ActionMoveFrom, event.IsDir)
case inotifywaitgo.MOVED_TO:
err = iw.tree.Scan(event.Filename, ActionMove, event.IsDir)
err = iw.tree.Scan(event.Filename, watcher.ActionMove, event.IsDir)
case inotifywaitgo.CREATE:
err = iw.tree.Scan(event.Filename, ActionCreate, event.IsDir)
err = iw.tree.Scan(event.Filename, watcher.ActionCreate, event.IsDir)
case inotifywaitgo.CLOSE_WRITE:
err = iw.tree.Scan(event.Filename, ActionUpdate, event.IsDir)
err = iw.tree.Scan(event.Filename, watcher.ActionUpdate, event.IsDir)
case inotifywaitgo.CLOSE:
// ignore, already handled by CLOSE_WRITE
default:

View File

@@ -47,6 +47,7 @@ import (
"github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/lookup"
"github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/options"
"github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/trashbin"
"github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher/natswatcher"
"github.com/opencloud-eu/reva/v2/pkg/storage/pkg/decomposedfs"
"github.com/opencloud-eu/reva/v2/pkg/storage/pkg/decomposedfs/metadata"
"github.com/opencloud-eu/reva/v2/pkg/storage/pkg/decomposedfs/metadata/prefixes"
@@ -147,6 +148,11 @@ func New(lu node.PathLookup, bs node.Blobstore, um usermapper.Mapper, trashbin *
if err != nil {
return nil, err
}
case "natswatcher":
t.watcher, err = natswatcher.New(context.TODO(), t, o.NatsWatcher, o.WatchRoot, log)
if err != nil {
return nil, err
}
default:
t.watcher, err = NewInotifyWatcher(t, o, log)
if err != nil {
@@ -499,8 +505,18 @@ func (t *Tree) ListFolder(ctx context.Context, n *node.Node) ([]*node.Node, erro
_, nodeID, err := t.lookup.IDsForPath(ctx, path)
if err != nil {
t.log.Error().Err(err).Str("path", path).Msg("failed to get ids for entry")
continue
// we don't know about this node yet for some reason, assimilate it on the fly
t.log.Info().Err(err).Str("path", path).Msg("encountered unknown entity while listing the directory. Assimilate.")
err = t.assimilate(scanItem{Path: path})
if err != nil {
t.log.Error().Err(err).Str("path", path).Msg("failed to assimilate node")
continue
}
_, nodeID, err = t.lookup.IDsForPath(ctx, path)
if err != nil || nodeID == "" {
t.log.Error().Err(err).Str("path", path).Msg("still could not resolve node after assimilation")
continue
}
}
child, err := node.ReadNode(ctx, t.lookup, n.SpaceID, nodeID, false, n.SpaceRoot, true)
@@ -708,9 +724,23 @@ func (t *Tree) createDirNode(ctx context.Context, n *node.Node) (err error) {
t.log.Error().Err(err).Str("spaceID", n.SpaceID).Str("id", n.ID).Str("path", path).Msg("could not cache id")
}
// Write mtime from filesystem to metadata to preven re-assimilation
d, err := os.Open(path)
if err != nil {
return err
}
fi, err := d.Stat()
if err != nil {
return err
}
mtime := fi.ModTime()
attributes := n.NodeMetadata(ctx)
attributes[prefixes.MTimeAttr] = []byte(mtime.UTC().Format(time.RFC3339Nano))
attributes[prefixes.IDAttr] = []byte(n.ID)
attributes[prefixes.TreesizeAttr] = []byte("0") // initialize as empty, TODO why bother? if it is not set we could treat it as 0?
if t.options.TreeTimeAccounting || t.options.TreeSizeAccounting {
attributes[prefixes.PropagationAttr] = []byte("1") // mark the node for propagation
}

View File

@@ -0,0 +1,11 @@
package watcher
type EventAction int
const (
ActionCreate EventAction = iota
ActionUpdate
ActionMove
ActionDelete
ActionMoveFrom
)

View File

@@ -0,0 +1,236 @@
package natswatcher
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"path/filepath"
"time"
"github.com/cenkalti/backoff"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/options"
"github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher"
"github.com/rs/zerolog"
"github.com/vmihailenco/msgpack/v5"
)
// natsEvent represents the event encoded in MessagePack.
// we abbreviate the the properties to save some space
type natsEvent struct {
Event string `msgpack:"e"`
Path string `msgpack:"p,omitempty"`
ToPath string `msgpack:"t,omitempty"`
IsDir bool `msgpack:"d,omitempty"`
}
// NatsWatcher consumes filesystem-style events from NATS JetStream.
type NatsWatcher struct {
ctx context.Context
tree Scannable
log *zerolog.Logger
watchRoot string
config options.NatsWatcherConfig
}
type Scannable interface {
Scan(path string, action watcher.EventAction, isDir bool) error
}
// NewNatsWatcher creates a new NATS watcher.
func New(ctx context.Context, tree Scannable, cfg options.NatsWatcherConfig, watchRoot string, log *zerolog.Logger) (*NatsWatcher, error) {
return &NatsWatcher{
ctx: ctx,
tree: tree,
log: log,
watchRoot: watchRoot,
config: cfg,
}, nil
}
// Watch starts consuming events from a NATS JetStream subject
func (w *NatsWatcher) Watch(path string) {
w.log.Info().Str("stream", w.config.Stream).Msg("starting NATS watcher with auto-reconnect")
for {
select {
case <-w.ctx.Done():
w.log.Debug().Msg("context cancelled, stopping NATS watcher")
return
default:
}
// Try to connect with exponential backoff
nc, js, err := w.connectWithBackoff()
if err != nil {
w.log.Error().Err(err).Msg("failed to establish NATS connection after retries")
time.Sleep(5 * time.Second)
continue
}
if err := w.consume(js); err != nil {
w.log.Error().Err(err).Msg("NATS consumer exited with error, reconnecting")
}
_ = nc.Drain()
nc.Close()
time.Sleep(2 * time.Second)
}
}
// connectWithBackoff repeatedly attempts to connect to NATS JetStream with exponential backoff.
func (w *NatsWatcher) connectWithBackoff() (*nats.Conn, jetstream.JetStream, error) {
var nc *nats.Conn
var js jetstream.JetStream
b := backoff.NewExponentialBackOff()
b.InitialInterval = 1 * time.Second
b.MaxInterval = 30 * time.Second
b.MaxElapsedTime = 0 // never stop
connect := func() error {
select {
case <-w.ctx.Done():
return backoff.Permanent(w.ctx.Err())
default:
}
var err error
nc, err = w.connect()
if err != nil {
w.log.Warn().Err(err).Msg("failed to connect to NATS, retrying")
return err
}
js, err = jetstream.New(nc)
if err != nil {
nc.Close()
w.log.Warn().Err(err).Msg("failed to create jetstream context, retrying")
return err
}
w.log.Info().Str("endpoint", w.config.Endpoint).Msg("connected to NATS JetStream")
return nil
}
if err := backoff.Retry(connect, backoff.WithContext(b, w.ctx)); err != nil {
return nil, nil, err
}
return nc, js, nil
}
// consume subscribes to JetStream and handles messages.
func (w *NatsWatcher) consume(js jetstream.JetStream) error {
stream, err := js.Stream(w.ctx, w.config.Stream)
if err != nil {
return fmt.Errorf("failed to get stream: %w", err)
}
consumer, err := stream.CreateOrUpdateConsumer(w.ctx, jetstream.ConsumerConfig{
Durable: w.config.Durable,
AckPolicy: jetstream.AckExplicitPolicy,
MaxAckPending: w.config.MaxAckPending,
AckWait: w.config.AckWait,
})
if err != nil {
return fmt.Errorf("failed to create consumer: %w", err)
}
w.log.Info().
Str("stream", w.config.Stream).
Msg("started consuming from JetStream")
_, err = consumer.Consume(func(msg jetstream.Msg) {
defer func() {
if ackErr := msg.Ack(); ackErr != nil {
w.log.Warn().Err(ackErr).Msg("failed to ack message")
}
}()
var ev natsEvent
if err := msgpack.Unmarshal(msg.Data(), &ev); err != nil {
w.log.Error().Err(err).Msg("failed to decode MessagePack event")
return
}
w.handleEvent(ev)
})
if err != nil {
return fmt.Errorf("consumer error: %w", err)
}
<-w.ctx.Done()
return w.ctx.Err()
}
// connect establishes a single NATS connection with optional TLS and auth.
func (w *NatsWatcher) connect() (*nats.Conn, error) {
var tlsConf *tls.Config
if w.config.EnableTLS {
var rootCAPool *x509.CertPool
if w.config.TLSRootCACertificate != "" {
rootCrtFile, err := os.ReadFile(w.config.TLSRootCACertificate)
if err != nil {
return nil, fmt.Errorf("failed to read root CA: %w", err)
}
rootCAPool = x509.NewCertPool()
rootCAPool.AppendCertsFromPEM(rootCrtFile)
w.config.TLSInsecure = false
}
tlsConf = &tls.Config{
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: w.config.TLSInsecure,
RootCAs: rootCAPool,
}
}
opts := []nats.Option{nats.Name("opencloud-posixfs-natswatcher")}
if tlsConf != nil {
opts = append(opts, nats.Secure(tlsConf))
}
if w.config.AuthUsername != "" && w.config.AuthPassword != "" {
opts = append(opts, nats.UserInfo(w.config.AuthUsername, w.config.AuthPassword))
}
return nats.Connect(w.config.Endpoint, opts...)
}
// handleEvent applies the event to the local tree.
func (w *NatsWatcher) handleEvent(ev natsEvent) {
var err error
// Determine the relevant path
path := filepath.Join(w.watchRoot, ev.Path)
switch ev.Event {
case "CREATE":
err = w.tree.Scan(path, watcher.ActionCreate, ev.IsDir)
case "MOVED_TO":
err = w.tree.Scan(path, watcher.ActionMove, ev.IsDir)
case "MOVE_FROM":
err = w.tree.Scan(path, watcher.ActionMoveFrom, ev.IsDir)
case "MOVE": // support event with source and target path
err = w.tree.Scan(path, watcher.ActionMoveFrom, ev.IsDir)
if err == nil {
w.log.Error().Err(err).Interface("event", ev).Msg("error processing event")
}
tgt := filepath.Join(w.watchRoot, ev.ToPath)
if tgt == "" {
w.log.Warn().Interface("event", ev).Msg("MOVE event missing target path")
} else {
err = w.tree.Scan(tgt, watcher.ActionMove, ev.IsDir)
}
case "CLOSE_WRITE":
err = w.tree.Scan(path, watcher.ActionUpdate, ev.IsDir)
case "DELETE":
err = w.tree.Scan(path, watcher.ActionDelete, ev.IsDir)
default:
w.log.Warn().Str("event", ev.Event).Msg("unhandled event type")
}
if err != nil {
w.log.Error().Err(err).Interface("event", ev).Msg("error processing event")
}
}

View File

@@ -2,6 +2,7 @@ package metadata
import (
"context"
"fmt"
"io"
"io/fs"
"os"
@@ -292,17 +293,19 @@ func (b HybridBackend) SetMultiple(ctx context.Context, n MetadataNode, attribs
}
}
xerrs := 0
total := 0
var xerr error
// error handling: Count if there are errors while setting the attribs.
// if there were any, return an error.
for key, val := range attribs {
total++
if xerr = xattr.Set(path, key, val); xerr != nil {
// log
xerrs++
}
}
if xerrs > 0 {
return errors.Wrap(xerr, "Failed to set all xattrs")
return fmt.Errorf("failed to set %d/%d xattrs: %w", xerrs, total, xerr)
}
attribs, err = b.getAll(ctx, n, true, false, false)

4
vendor/modules.txt vendored
View File

@@ -1368,7 +1368,7 @@ github.com/opencloud-eu/icap-client
# github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76
## explicit; go 1.18
github.com/opencloud-eu/libre-graph-api-go
# github.com/opencloud-eu/reva/v2 v2.40.1
# github.com/opencloud-eu/reva/v2 v2.41.0
## explicit; go 1.24.1
github.com/opencloud-eu/reva/v2/cmd/revad/internal/grace
github.com/opencloud-eu/reva/v2/cmd/revad/runtime
@@ -1682,6 +1682,8 @@ github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/options
github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/timemanager
github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/trashbin
github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree
github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher
github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher/natswatcher
github.com/opencloud-eu/reva/v2/pkg/storage/fs/registry
github.com/opencloud-eu/reva/v2/pkg/storage/fs/s3ng
github.com/opencloud-eu/reva/v2/pkg/storage/fs/s3ng/blobstore