feat(reva): bump reva

Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
jkoberg
2024-11-11 14:32:34 +01:00
parent 4f42e344ea
commit 15d687dfe7
17 changed files with 295 additions and 1016 deletions

View File

@@ -0,0 +1,5 @@
Bugfix: Bump Reva
bumps reva version
https://github.com/owncloud/ocis/pull/10539

4
go.mod
View File

@@ -15,7 +15,7 @@ require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/coreos/go-oidc/v3 v3.11.0
github.com/cs3org/go-cs3apis v0.0.0-20241105092511-3ad35d174fc1
github.com/cs3org/reva/v2 v2.26.4
github.com/cs3org/reva/v2 v2.26.5-0.20241111162950-e77dd61e7edb
github.com/davidbyttow/govips/v2 v2.15.0
github.com/dhowden/tag v0.0.0-20240417053706-3d75831295e8
github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e
@@ -32,7 +32,6 @@ require (
github.com/go-micro/plugins/v4/server/grpc v1.2.0
github.com/go-micro/plugins/v4/server/http v1.2.2
github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20240726082623-6831adfdcdc4
github.com/go-micro/plugins/v4/wrapper/breaker/gobreaker v1.2.0
github.com/go-micro/plugins/v4/wrapper/monitoring/prometheus v1.2.0
github.com/go-micro/plugins/v4/wrapper/trace/opentelemetry v1.2.0
github.com/go-playground/validator/v10 v10.22.1
@@ -301,7 +300,6 @@ require (
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 // indirect
github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546 // indirect
github.com/skeema/knownhosts v1.2.1 // indirect
github.com/sony/gobreaker v0.5.0 // indirect
github.com/spacewander/go-suffix-tree v0.0.0-20191010040751-0865e368c784 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.2 // indirect

8
go.sum
View File

@@ -255,8 +255,8 @@ github.com/crewjam/saml v0.4.14 h1:g9FBNx62osKusnFzs3QTN5L9CVA/Egfgm+stJShzw/c=
github.com/crewjam/saml v0.4.14/go.mod h1:UVSZCf18jJkk6GpWNVqcyQJMD5HsRugBPf4I1nl2mME=
github.com/cs3org/go-cs3apis v0.0.0-20241105092511-3ad35d174fc1 h1:RU6LT6mkD16xZs011+8foU7T3LrPvTTSWeTQ9OgfhkA=
github.com/cs3org/go-cs3apis v0.0.0-20241105092511-3ad35d174fc1/go.mod h1:DedpcqXl193qF/08Y04IO0PpxyyMu8+GrkD6kWK2MEQ=
github.com/cs3org/reva/v2 v2.26.4 h1:wUmNSkXglIHrn+yxwJtHDvlSzxadFPANENGnwmG+5wI=
github.com/cs3org/reva/v2 v2.26.4/go.mod h1:KP0Zomt3dNIr/kU2M1mXzTIVFOtxBVS4qmBDMRCfrOQ=
github.com/cs3org/reva/v2 v2.26.5-0.20241111162950-e77dd61e7edb h1:owRv9x5GlKKdqCCM70kZKCsLAcDkFPkyOb129Jmklt0=
github.com/cs3org/reva/v2 v2.26.5-0.20241111162950-e77dd61e7edb/go.mod h1:KP0Zomt3dNIr/kU2M1mXzTIVFOtxBVS4qmBDMRCfrOQ=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg=
github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=
@@ -413,8 +413,6 @@ github.com/go-micro/plugins/v4/store/redis v1.2.1 h1:d9kwr9bSpoK9vkHkqcv+isQUbgB
github.com/go-micro/plugins/v4/store/redis v1.2.1/go.mod h1:MbCG0YiyPqETTtm7uHFmxQNCaW1o9hBoYtFwhbVjLUg=
github.com/go-micro/plugins/v4/transport/grpc v1.1.0 h1:mXfDYfFQLnVDzjGY3o84oe4prfux9h8txsnA19dKsj8=
github.com/go-micro/plugins/v4/transport/grpc v1.1.0/go.mod h1:J5xMp70xXZzm8yafICrDrWaUDd8Gwy8vt0xif7NcOPg=
github.com/go-micro/plugins/v4/wrapper/breaker/gobreaker v1.2.0 h1:EQj4l7fuOSz8ueUYhFlpZPp9+tN4JeONL32ARRKXW/U=
github.com/go-micro/plugins/v4/wrapper/breaker/gobreaker v1.2.0/go.mod h1:JR9Ox/iJIrcXm8nCWdAEBsyG7Q7lyMLzsTZPfXrqvwo=
github.com/go-micro/plugins/v4/wrapper/monitoring/prometheus v1.2.0 h1:UWBUYtMXCxQ9bIGOYcbLOjtPv8ovvCRjWWM6tHhB4S8=
github.com/go-micro/plugins/v4/wrapper/monitoring/prometheus v1.2.0/go.mod h1:8BYxs/wEE4ZJayHZQffw4A8s9rcPumyoNms0hYoNocM=
github.com/go-micro/plugins/v4/wrapper/trace/opentelemetry v1.2.0 h1:e2hgtWMNqJ3DmbMt9ZxzmH/BkVAw9Xg23l6CHrXQfKw=
@@ -1046,8 +1044,6 @@ github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:s
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smartystreets/gunit v1.0.4/go.mod h1:EH5qMBab2UclzXUcpR8b93eHsIlp9u+pDQIRp5DZNzQ=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/sony/gobreaker v0.5.0 h1:dRCvqm0P490vZPmy7ppEk2qCnCieBooFJ+YoXGYB+yg=
github.com/sony/gobreaker v0.5.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/spacewander/go-suffix-tree v0.0.0-20191010040751-0865e368c784 h1:0jjO3HdJfOn6gYHD/ZNZh0LLMxEAqkYX7xoDPQReEgs=
github.com/spacewander/go-suffix-tree v0.0.0-20191010040751-0865e368c784/go.mod h1:ff/5myEGgtsAwf26goQCO905GrEm5ugEZSd6OWTsrhM=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=

View File

@@ -59,6 +59,7 @@ const (
ActionUpdate
ActionMove
ActionDelete
ActionMoveFrom
)
type queueItem struct {
@@ -160,14 +161,14 @@ func (t *Tree) workScanQueue() {
}
// Scan scans the given path and updates the id chache
func (t *Tree) Scan(path string, action EventAction, isDir bool, recurse bool) error {
func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
// cases:
switch action {
case ActionCreate:
if !isDir {
// 1. New file (could be emitted as part of a new directory)
// -> assimilate file
// -> scan parent directory recursively
// -> scan parent directory recursively to update tree size and catch nodes that weren't covered by an event
if !t.scanDebouncer.InProgress(filepath.Dir(path)) {
t.scanDebouncer.Debounce(scanItem{
Path: path,
@@ -216,8 +217,24 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool, recurse bool) e
Recurse: isDir,
})
case ActionMoveFrom:
// 6. file/directory moved out of the watched directory
// -> update directory
if err := t.setDirty(filepath.Dir(path), true); err != nil {
return err
}
go func() { _ = t.WarmupIDCache(filepath.Dir(path), false, true) }()
case ActionDelete:
_ = t.HandleFileDelete(path)
// 7. Deleted file or directory
// -> update parent and all children
t.scanDebouncer.Debounce(scanItem{
Path: filepath.Dir(path),
ForceRescan: true,
Recurse: true,
})
}
return nil
@@ -593,6 +610,7 @@ func (t *Tree) WarmupIDCache(root string, assimilate, onlyDirty bool) error {
if !dirty {
return filepath.SkipDir
}
sizes[path] += 0 // Make sure to set the size to 0 for empty directories
}
attribs, err := t.lookup.MetadataBackend().All(context.Background(), path)

View File

@@ -1,3 +1,21 @@
// Copyright 2018-2024 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package tree
import (
@@ -64,15 +82,15 @@ start:
}
switch ev.Event {
case "CREATE":
go func() { _ = w.tree.Scan(ev.Path, ActionCreate, false, false) }()
go func() { _ = w.tree.Scan(ev.Path, ActionCreate, false) }()
case "CLOSE":
bytesWritten, err := strconv.Atoi(ev.BytesWritten)
if err == nil && bytesWritten > 0 {
go func() { _ = w.tree.Scan(ev.Path, ActionUpdate, false, true) }()
go func() { _ = w.tree.Scan(ev.Path, ActionUpdate, false) }()
}
case "RENAME":
go func() {
_ = w.tree.Scan(ev.Path, ActionMove, false, true)
_ = w.tree.Scan(ev.Path, ActionMove, false)
_ = w.tree.WarmupIDCache(ev.Path, false, false)
}()
}

View File

@@ -1,3 +1,21 @@
// Copyright 2018-2024 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package tree
import (
@@ -29,13 +47,13 @@ func (w *GpfsWatchFolderWatcher) Watch(topic string) {
Topic: topic,
})
lwev := &lwe{}
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
lwev := &lwe{}
err = json.Unmarshal(m.Value, lwev)
if err != nil {
continue
@@ -45,22 +63,28 @@ func (w *GpfsWatchFolderWatcher) Watch(topic string) {
continue
}
isDir := strings.Contains(lwev.Event, "IN_ISDIR")
go func() {
isDir := strings.Contains(lwev.Event, "IN_ISDIR")
switch {
case strings.Contains(lwev.Event, "IN_CREATE"):
go func() { _ = w.tree.Scan(lwev.Path, ActionCreate, isDir, false) }()
switch {
case strings.Contains(lwev.Event, "IN_DELETE"):
_ = w.tree.Scan(lwev.Path, ActionDelete, isDir)
case strings.Contains(lwev.Event, "IN_CLOSE_WRITE"):
bytesWritten, err := strconv.Atoi(lwev.BytesWritten)
if err == nil && bytesWritten > 0 {
go func() { _ = w.tree.Scan(lwev.Path, ActionUpdate, isDir, true) }()
case strings.Contains(lwev.Event, "IN_MOVE_FROM"):
_ = w.tree.Scan(lwev.Path, ActionMoveFrom, isDir)
case strings.Contains(lwev.Event, "IN_CREATE"):
_ = w.tree.Scan(lwev.Path, ActionCreate, isDir)
case strings.Contains(lwev.Event, "IN_CLOSE_WRITE"):
bytesWritten, err := strconv.Atoi(lwev.BytesWritten)
if err == nil && bytesWritten > 0 {
_ = w.tree.Scan(lwev.Path, ActionUpdate, isDir)
}
case strings.Contains(lwev.Event, "IN_MOVED_TO"):
_ = w.tree.Scan(lwev.Path, ActionMove, isDir)
}
case strings.Contains(lwev.Event, "IN_MOVED_TO"):
go func() {
_ = w.tree.Scan(lwev.Path, ActionMove, isDir, true)
}()
}
}()
}
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)

View File

@@ -1,3 +1,21 @@
// Copyright 2018-2024 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package tree
import (
@@ -30,6 +48,7 @@ func (iw *InotifyWatcher) Watch(path string) {
Events: []inotifywaitgo.EVENT{
inotifywaitgo.CREATE,
inotifywaitgo.MOVED_TO,
inotifywaitgo.MOVED_FROM,
inotifywaitgo.CLOSE_WRITE,
inotifywaitgo.DELETE,
},
@@ -45,18 +64,18 @@ func (iw *InotifyWatcher) Watch(path string) {
if isLockFile(event.Filename) || isTrash(event.Filename) || iw.tree.isUpload(event.Filename) {
continue
}
switch e {
case inotifywaitgo.DELETE:
go func() { _ = iw.tree.HandleFileDelete(event.Filename) }()
case inotifywaitgo.CREATE:
go func() { _ = iw.tree.Scan(event.Filename, ActionCreate, event.IsDir, false) }()
case inotifywaitgo.MOVED_TO:
go func() {
_ = iw.tree.Scan(event.Filename, ActionMove, event.IsDir, true)
}()
case inotifywaitgo.CLOSE_WRITE:
go func() { _ = iw.tree.Scan(event.Filename, ActionUpdate, event.IsDir, true) }()
}
go func() {
switch e {
case inotifywaitgo.DELETE:
_ = iw.tree.Scan(event.Filename, ActionDelete, event.IsDir)
case inotifywaitgo.MOVED_FROM:
_ = iw.tree.Scan(event.Filename, ActionMoveFrom, event.IsDir)
case inotifywaitgo.CREATE, inotifywaitgo.MOVED_TO:
_ = iw.tree.Scan(event.Filename, ActionCreate, event.IsDir)
case inotifywaitgo.CLOSE_WRITE:
_ = iw.tree.Scan(event.Filename, ActionUpdate, event.IsDir)
}
}()
}
case err := <-errors:

View File

@@ -310,6 +310,12 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node)
return errors.Wrap(err, "Decomposedfs: could not move child")
}
// update the id cache
if newNode.ID == "" {
newNode.ID = oldNode.ID
}
_ = t.lookup.(*lookup.Lookup).CacheID(ctx, newNode.SpaceID, newNode.ID, filepath.Join(newNode.ParentPath(), newNode.Name))
// rename the lock (if it exists)
if _, err := os.Stat(oldNode.LockFilePath()); err == nil {
err = os.Rename(
@@ -321,11 +327,6 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node)
}
}
// update the id cache
if newNode.ID == "" {
newNode.ID = oldNode.ID
}
_ = t.lookup.(*lookup.Lookup).CacheID(ctx, newNode.SpaceID, newNode.ID, filepath.Join(newNode.ParentPath(), newNode.Name))
// update id cache for the moved subtree.
if oldNode.IsDir(ctx) {
err = t.WarmupIDCache(filepath.Join(newNode.ParentPath(), newNode.Name), false, false)
@@ -334,11 +335,23 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node)
}
}
err = t.Propagate(ctx, oldNode, 0)
// the size diff is the current treesize or blobsize of the old/source node
var sizeDiff int64
if oldNode.IsDir(ctx) {
treeSize, err := oldNode.GetTreeSize(ctx)
if err != nil {
return err
}
sizeDiff = int64(treeSize)
} else {
sizeDiff = oldNode.Blobsize
}
err = t.Propagate(ctx, oldNode, -sizeDiff)
if err != nil {
return errors.Wrap(err, "Decomposedfs: Move: could not propagate old node")
}
err = t.Propagate(ctx, newNode, 0)
err = t.Propagate(ctx, newNode, sizeDiff)
if err != nil {
return errors.Wrap(err, "Decomposedfs: Move: could not propagate new node")
}

View File

@@ -30,6 +30,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/rogpeppe/go-internal/lockedfile"
"github.com/rs/zerolog"
)
// SyncPropagator implements synchronous treetime & treesize propagation
@@ -72,137 +73,137 @@ func (p SyncPropagator) Propagate(ctx context.Context, n *node.Node, sizeDiff in
sTime := time.Now().UTC()
// we loop until we reach the root
var err error
for err == nil && n.ID != root.ID {
sublog.Debug().Msg("propagating")
var (
err error
stop bool
)
attrs := node.Attributes{}
var f *lockedfile.File
// lock parent before reading treesize or tree time
_, subspan := tracer.Start(ctx, "lockedfile.OpenFile")
parentFilename := p.lookup.MetadataBackend().LockfilePath(n.ParentPath())
f, err = lockedfile.OpenFile(parentFilename, os.O_RDWR|os.O_CREATE, 0600)
subspan.End()
if err != nil {
sublog.Error().Err(err).
Str("parent filename", parentFilename).
Msg("Propagation failed. Could not open metadata for parent with lock.")
return err
}
// always log error if closing node fails
defer func() {
// ignore already closed error
cerr := f.Close()
if err == nil && cerr != nil && !errors.Is(cerr, os.ErrClosed) {
err = cerr // only overwrite err with en error from close if the former was nil
}
_ = os.Remove(f.Name())
}()
if n, err = n.Parent(ctx); err != nil {
sublog.Error().Err(err).
Msg("Propagation failed. Could not read parent node.")
return err
}
if !n.HasPropagation(ctx) {
sublog.Debug().Str("attr", prefixes.PropagationAttr).Msg("propagation attribute not set or unreadable, not propagating")
// if the attribute is not set treat it as false / none / no propagation
return nil
}
sublog = sublog.With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger()
if p.treeTimeAccounting {
// update the parent tree time if it is older than the nodes mtime
updateSyncTime := false
var tmTime time.Time
tmTime, err = n.GetTMTime(ctx)
switch {
case err != nil:
// missing attribute, or invalid format, overwrite
sublog.Debug().Err(err).
Msg("could not read tmtime attribute, overwriting")
updateSyncTime = true
case tmTime.Before(sTime):
sublog.Debug().
Time("tmtime", tmTime).
Time("stime", sTime).
Msg("parent tmtime is older than node mtime, updating")
updateSyncTime = true
default:
sublog.Debug().
Time("tmtime", tmTime).
Time("stime", sTime).
Dur("delta", sTime.Sub(tmTime)).
Msg("parent tmtime is younger than node mtime, not updating")
}
if updateSyncTime {
// update the tree time of the parent node
attrs.SetString(prefixes.TreeMTimeAttr, sTime.UTC().Format(time.RFC3339Nano))
}
attrs.SetString(prefixes.TmpEtagAttr, "")
}
// size accounting
if p.treeSizeAccounting && sizeDiff != 0 {
var newSize uint64
// read treesize
treeSize, err := n.GetTreeSize(ctx)
switch {
case metadata.IsAttrUnset(err):
// fallback to calculating the treesize
sublog.Warn().Msg("treesize attribute unset, falling back to calculating the treesize")
newSize, err = calculateTreeSize(ctx, p.lookup, n.InternalPath())
if err != nil {
return err
}
case err != nil:
sublog.Error().Err(err).
Msg("Faild to propagate treesize change. Error when reading the treesize attribute from parent")
return err
case sizeDiff > 0:
newSize = treeSize + uint64(sizeDiff)
case uint64(-sizeDiff) > treeSize:
// The sizeDiff is larger than the current treesize. Which would result in
// a negative new treesize. Something must have gone wrong with the accounting.
// Reset the current treesize to 0.
sublog.Error().Uint64("treeSize", treeSize).Int64("sizeDiff", sizeDiff).
Msg("Error when updating treesize of parent node. Updated treesize < 0. Reestting to 0")
newSize = 0
default:
newSize = treeSize - uint64(-sizeDiff)
}
// update the tree size of the node
attrs.SetString(prefixes.TreesizeAttr, strconv.FormatUint(newSize, 10))
sublog.Debug().Uint64("newSize", newSize).Msg("updated treesize of parent node")
}
if err = n.SetXattrsWithContext(ctx, attrs, false); err != nil {
sublog.Error().Err(err).Msg("Failed to update extend attributes of parent node")
return err
}
// Release node lock early, ignore already closed error
_, subspan = tracer.Start(ctx, "f.Close")
cerr := f.Close()
subspan.End()
if cerr != nil && !errors.Is(cerr, os.ErrClosed) {
sublog.Error().Err(cerr).Msg("Failed to close parent node and release lock")
return cerr
}
for err == nil && !stop && n.ID != root.ID {
n, stop, err = p.propagateItem(ctx, n, sTime, sizeDiff, sublog)
}
if err != nil {
sublog.Error().Err(err).Msg("error propagating")
return err
}
return nil
}
func (p SyncPropagator) propagateItem(ctx context.Context, n *node.Node, sTime time.Time, sizeDiff int64, log zerolog.Logger) (*node.Node, bool, error) {
log.Debug().Msg("propagating")
attrs := node.Attributes{}
var f *lockedfile.File
// lock parent before reading treesize or tree time
_, subspan := tracer.Start(ctx, "lockedfile.OpenFile")
parentFilename := p.lookup.MetadataBackend().LockfilePath(n.ParentPath())
f, err := lockedfile.OpenFile(parentFilename, os.O_RDWR|os.O_CREATE, 0600)
subspan.End()
if err != nil {
log.Error().Err(err).
Str("parent filename", parentFilename).
Msg("Propagation failed. Could not open metadata for parent with lock.")
return nil, true, err
}
// always log error if closing node fails
defer func() {
// ignore already closed error
cerr := f.Close()
if err == nil && cerr != nil && !errors.Is(cerr, os.ErrClosed) {
err = cerr // only overwrite err with en error from close if the former was nil
}
}()
if n, err = n.Parent(ctx); err != nil {
log.Error().Err(err).
Msg("Propagation failed. Could not read parent node.")
return n, true, err
}
if !n.HasPropagation(ctx) {
log.Debug().Str("attr", prefixes.PropagationAttr).Msg("propagation attribute not set or unreadable, not propagating")
// if the attribute is not set treat it as false / none / no propagation
return n, true, nil
}
log = log.With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger()
if p.treeTimeAccounting {
// update the parent tree time if it is older than the nodes mtime
updateSyncTime := false
var tmTime time.Time
tmTime, err = n.GetTMTime(ctx)
switch {
case err != nil:
// missing attribute, or invalid format, overwrite
log.Debug().Err(err).
Msg("could not read tmtime attribute, overwriting")
updateSyncTime = true
case tmTime.Before(sTime):
log.Debug().
Time("tmtime", tmTime).
Time("stime", sTime).
Msg("parent tmtime is older than node mtime, updating")
updateSyncTime = true
default:
log.Debug().
Time("tmtime", tmTime).
Time("stime", sTime).
Dur("delta", sTime.Sub(tmTime)).
Msg("parent tmtime is younger than node mtime, not updating")
}
if updateSyncTime {
// update the tree time of the parent node
attrs.SetString(prefixes.TreeMTimeAttr, sTime.UTC().Format(time.RFC3339Nano))
}
attrs.SetString(prefixes.TmpEtagAttr, "")
}
// size accounting
if p.treeSizeAccounting && sizeDiff != 0 {
var newSize uint64
// read treesize
treeSize, err := n.GetTreeSize(ctx)
switch {
case metadata.IsAttrUnset(err):
// fallback to calculating the treesize
log.Warn().Msg("treesize attribute unset, falling back to calculating the treesize")
newSize, err = calculateTreeSize(ctx, p.lookup, n.InternalPath())
if err != nil {
return n, true, err
}
case err != nil:
log.Error().Err(err).
Msg("Faild to propagate treesize change. Error when reading the treesize attribute from parent")
return n, true, err
case sizeDiff > 0:
newSize = treeSize + uint64(sizeDiff)
case uint64(-sizeDiff) > treeSize:
// The sizeDiff is larger than the current treesize. Which would result in
// a negative new treesize. Something must have gone wrong with the accounting.
// Reset the current treesize to 0.
log.Error().Uint64("treeSize", treeSize).Int64("sizeDiff", sizeDiff).
Msg("Error when updating treesize of parent node. Updated treesize < 0. Reestting to 0")
newSize = 0
default:
newSize = treeSize - uint64(-sizeDiff)
}
// update the tree size of the node
attrs.SetString(prefixes.TreesizeAttr, strconv.FormatUint(newSize, 10))
log.Debug().Uint64("newSize", newSize).Msg("updated treesize of parent node")
}
if err = n.SetXattrsWithContext(ctx, attrs, false); err != nil {
log.Error().Err(err).Msg("Failed to update extend attributes of parent node")
return n, true, err
}
return n, false, nil
}

View File

@@ -26,10 +26,11 @@ import (
"strings"
"time"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/google/uuid"
"github.com/pkg/errors"
tusd "github.com/tus/tusd/v2/pkg/handler"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
@@ -40,7 +41,6 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/pkg/errors"
)
// Upload uploads data to the given resource
@@ -90,7 +90,7 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u
}
}
if err := session.FinishUpload(ctx); err != nil {
if err := session.FinishUploadDecomposed(ctx); err != nil {
return &provider.ResourceInfo{}, err
}
@@ -321,7 +321,7 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere
if uploadLength == 0 {
// Directly finish this upload
err = session.FinishUpload(ctx)
err = session.FinishUploadDecomposed(ctx)
if err != nil {
return nil, err
}

View File

@@ -33,6 +33,12 @@ import (
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/golang-jwt/jwt"
"github.com/pkg/errors"
tusd "github.com/tus/tusd/v2/pkg/handler"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"github.com/cs3org/reva/v2/pkg/appctx"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
@@ -41,11 +47,6 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/golang-jwt/jwt"
"github.com/pkg/errors"
tusd "github.com/tus/tusd/v2/pkg/handler"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
var (
@@ -106,7 +107,25 @@ func (session *OcisSession) GetReader(ctx context.Context) (io.ReadCloser, error
}
// FinishUpload finishes an upload and moves the file to the internal destination
// implements tusd.DataStore interface
// returns tusd errors
func (session *OcisSession) FinishUpload(ctx context.Context) error {
err := session.FinishUploadDecomposed(ctx)
// we need to return a tusd error here to make the tusd handler return the correct status code
switch err.(type) {
case errtypes.AlreadyExists:
return tusd.NewError("ERR_ALREADY_EXISTS", err.Error(), http.StatusConflict)
case errtypes.Aborted:
return tusd.NewError("ERR_PRECONDITION_FAILED", err.Error(), http.StatusPreconditionFailed)
default:
return err
}
}
// FinishUploadDecomposed finishes an upload and moves the file to the internal destination
// retures errtypes errors
func (session *OcisSession) FinishUploadDecomposed(ctx context.Context) error {
ctx, span := tracer.Start(session.Context(ctx), "FinishUpload")
defer span.End()
log := appctx.GetLogger(ctx)
@@ -167,16 +186,7 @@ func (session *OcisSession) FinishUpload(ctx context.Context) error {
n, err := session.store.CreateNodeForUpload(session, attrs)
if err != nil {
session.store.Cleanup(ctx, session, true, false, false)
// we need to return a tusd error here to make the tusd handler return the correct status code
switch err.(type) {
case errtypes.AlreadyExists:
return ErrAlreadyExists
case errtypes.Aborted:
return tusd.NewError("ERR_PRECONDITION_FAILED", err.Error(), http.StatusPreconditionFailed)
default:
return err
}
return err
}
// increase the processing counter for every started processing
// will be decreased in Cleanup()

View File

@@ -1,191 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
Copyright 2015 Asim Aslam.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@@ -1,93 +0,0 @@
package gobreaker
import (
"context"
"sync"
"github.com/sony/gobreaker"
"go-micro.dev/v4/client"
"go-micro.dev/v4/errors"
)
type BreakerMethod int
const (
BreakService BreakerMethod = iota
BreakServiceEndpoint
)
type clientWrapper struct {
bs gobreaker.Settings
bm BreakerMethod
cbs map[string]*gobreaker.TwoStepCircuitBreaker
mu sync.Mutex
client.Client
}
func (c *clientWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
var svc string
switch c.bm {
case BreakService:
svc = req.Service()
case BreakServiceEndpoint:
svc = req.Service() + "." + req.Endpoint()
}
c.mu.Lock()
cb, ok := c.cbs[svc]
if !ok {
cb = gobreaker.NewTwoStepCircuitBreaker(c.bs)
c.cbs[svc] = cb
}
c.mu.Unlock()
cbAllow, err := cb.Allow()
if err != nil {
return errors.New(req.Service(), err.Error(), 502)
}
if err = c.Client.Call(ctx, req, rsp, opts...); err == nil {
cbAllow(true)
return nil
}
merr := errors.Parse(err.Error())
switch {
case merr.Code == 0:
merr.Code = 503
case len(merr.Id) == 0:
merr.Id = req.Service()
}
if merr.Code >= 500 {
cbAllow(false)
} else {
cbAllow(true)
}
return merr
}
// NewClientWrapper returns a client Wrapper.
func NewClientWrapper() client.Wrapper {
return func(c client.Client) client.Client {
w := &clientWrapper{}
w.bs = gobreaker.Settings{}
w.cbs = make(map[string]*gobreaker.TwoStepCircuitBreaker)
w.Client = c
return w
}
}
// NewCustomClientWrapper takes a gobreaker.Settings and BreakerMethod. Returns a client Wrapper.
func NewCustomClientWrapper(bs gobreaker.Settings, bm BreakerMethod) client.Wrapper {
return func(c client.Client) client.Client {
w := &clientWrapper{}
w.bm = bm
w.bs = bs
w.cbs = make(map[string]*gobreaker.TwoStepCircuitBreaker)
w.Client = c
return w
}
}

View File

@@ -1,21 +0,0 @@
The MIT License (MIT)
Copyright 2015 Sony Corporation
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@@ -1,132 +0,0 @@
gobreaker
=========
[![GoDoc](https://godoc.org/github.com/sony/gobreaker?status.svg)](http://godoc.org/github.com/sony/gobreaker)
[gobreaker][repo-url] implements the [Circuit Breaker pattern](https://msdn.microsoft.com/en-us/library/dn589784.aspx) in Go.
Installation
------------
```
go get github.com/sony/gobreaker
```
Usage
-----
The struct `CircuitBreaker` is a state machine to prevent sending requests that are likely to fail.
The function `NewCircuitBreaker` creates a new `CircuitBreaker`.
```go
func NewCircuitBreaker(st Settings) *CircuitBreaker
```
You can configure `CircuitBreaker` by the struct `Settings`:
```go
type Settings struct {
Name string
MaxRequests uint32
Interval time.Duration
Timeout time.Duration
ReadyToTrip func(counts Counts) bool
OnStateChange func(name string, from State, to State)
IsSuccessful func(err error) bool
}
```
- `Name` is the name of the `CircuitBreaker`.
- `MaxRequests` is the maximum number of requests allowed to pass through
when the `CircuitBreaker` is half-open.
If `MaxRequests` is 0, `CircuitBreaker` allows only 1 request.
- `Interval` is the cyclic period of the closed state
for `CircuitBreaker` to clear the internal `Counts`, described later in this section.
If `Interval` is 0, `CircuitBreaker` doesn't clear the internal `Counts` during the closed state.
- `Timeout` is the period of the open state,
after which the state of `CircuitBreaker` becomes half-open.
If `Timeout` is 0, the timeout value of `CircuitBreaker` is set to 60 seconds.
- `ReadyToTrip` is called with a copy of `Counts` whenever a request fails in the closed state.
If `ReadyToTrip` returns true, `CircuitBreaker` will be placed into the open state.
If `ReadyToTrip` is `nil`, default `ReadyToTrip` is used.
Default `ReadyToTrip` returns true when the number of consecutive failures is more than 5.
- `OnStateChange` is called whenever the state of `CircuitBreaker` changes.
- `IsSuccessful` is called with the error returned from a request.
If `IsSuccessful` returns true, the error is counted as a success.
Otherwise the error is counted as a failure.
If `IsSuccessful` is nil, default `IsSuccessful` is used, which returns false for all non-nil errors.
The struct `Counts` holds the numbers of requests and their successes/failures:
```go
type Counts struct {
Requests uint32
TotalSuccesses uint32
TotalFailures uint32
ConsecutiveSuccesses uint32
ConsecutiveFailures uint32
}
```
`CircuitBreaker` clears the internal `Counts` either
on the change of the state or at the closed-state intervals.
`Counts` ignores the results of the requests sent before clearing.
`CircuitBreaker` can wrap any function to send a request:
```go
func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error)
```
The method `Execute` runs the given request if `CircuitBreaker` accepts it.
`Execute` returns an error instantly if `CircuitBreaker` rejects the request.
Otherwise, `Execute` returns the result of the request.
If a panic occurs in the request, `CircuitBreaker` handles it as an error
and causes the same panic again.
Example
-------
```go
var cb *breaker.CircuitBreaker
func Get(url string) ([]byte, error) {
body, err := cb.Execute(func() (interface{}, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, nil
})
if err != nil {
return nil, err
}
return body.([]byte), nil
}
```
See [example](https://github.com/sony/gobreaker/blob/master/example) for details.
License
-------
The MIT License (MIT)
See [LICENSE](https://github.com/sony/gobreaker/blob/master/LICENSE) for details.
[repo-url]: https://github.com/sony/gobreaker

View File

@@ -1,380 +0,0 @@
// Package gobreaker implements the Circuit Breaker pattern.
// See https://msdn.microsoft.com/en-us/library/dn589784.aspx.
package gobreaker
import (
"errors"
"fmt"
"sync"
"time"
)
// State is a type that represents a state of CircuitBreaker.
type State int
// These constants are states of CircuitBreaker.
const (
StateClosed State = iota
StateHalfOpen
StateOpen
)
var (
// ErrTooManyRequests is returned when the CB state is half open and the requests count is over the cb maxRequests
ErrTooManyRequests = errors.New("too many requests")
// ErrOpenState is returned when the CB state is open
ErrOpenState = errors.New("circuit breaker is open")
)
// String implements stringer interface.
func (s State) String() string {
switch s {
case StateClosed:
return "closed"
case StateHalfOpen:
return "half-open"
case StateOpen:
return "open"
default:
return fmt.Sprintf("unknown state: %d", s)
}
}
// Counts holds the numbers of requests and their successes/failures.
// CircuitBreaker clears the internal Counts either
// on the change of the state or at the closed-state intervals.
// Counts ignores the results of the requests sent before clearing.
type Counts struct {
Requests uint32
TotalSuccesses uint32
TotalFailures uint32
ConsecutiveSuccesses uint32
ConsecutiveFailures uint32
}
func (c *Counts) onRequest() {
c.Requests++
}
func (c *Counts) onSuccess() {
c.TotalSuccesses++
c.ConsecutiveSuccesses++
c.ConsecutiveFailures = 0
}
func (c *Counts) onFailure() {
c.TotalFailures++
c.ConsecutiveFailures++
c.ConsecutiveSuccesses = 0
}
func (c *Counts) clear() {
c.Requests = 0
c.TotalSuccesses = 0
c.TotalFailures = 0
c.ConsecutiveSuccesses = 0
c.ConsecutiveFailures = 0
}
// Settings configures CircuitBreaker:
//
// Name is the name of the CircuitBreaker.
//
// MaxRequests is the maximum number of requests allowed to pass through
// when the CircuitBreaker is half-open.
// If MaxRequests is 0, the CircuitBreaker allows only 1 request.
//
// Interval is the cyclic period of the closed state
// for the CircuitBreaker to clear the internal Counts.
// If Interval is less than or equal to 0, the CircuitBreaker doesn't clear internal Counts during the closed state.
//
// Timeout is the period of the open state,
// after which the state of the CircuitBreaker becomes half-open.
// If Timeout is less than or equal to 0, the timeout value of the CircuitBreaker is set to 60 seconds.
//
// ReadyToTrip is called with a copy of Counts whenever a request fails in the closed state.
// If ReadyToTrip returns true, the CircuitBreaker will be placed into the open state.
// If ReadyToTrip is nil, default ReadyToTrip is used.
// Default ReadyToTrip returns true when the number of consecutive failures is more than 5.
//
// OnStateChange is called whenever the state of the CircuitBreaker changes.
//
// IsSuccessful is called with the error returned from a request.
// If IsSuccessful returns true, the error is counted as a success.
// Otherwise the error is counted as a failure.
// If IsSuccessful is nil, default IsSuccessful is used, which returns false for all non-nil errors.
type Settings struct {
Name string
MaxRequests uint32
Interval time.Duration
Timeout time.Duration
ReadyToTrip func(counts Counts) bool
OnStateChange func(name string, from State, to State)
IsSuccessful func(err error) bool
}
// CircuitBreaker is a state machine to prevent sending requests that are likely to fail.
type CircuitBreaker struct {
name string
maxRequests uint32
interval time.Duration
timeout time.Duration
readyToTrip func(counts Counts) bool
isSuccessful func(err error) bool
onStateChange func(name string, from State, to State)
mutex sync.Mutex
state State
generation uint64
counts Counts
expiry time.Time
}
// TwoStepCircuitBreaker is like CircuitBreaker but instead of surrounding a function
// with the breaker functionality, it only checks whether a request can proceed and
// expects the caller to report the outcome in a separate step using a callback.
type TwoStepCircuitBreaker struct {
cb *CircuitBreaker
}
// NewCircuitBreaker returns a new CircuitBreaker configured with the given Settings.
func NewCircuitBreaker(st Settings) *CircuitBreaker {
cb := new(CircuitBreaker)
cb.name = st.Name
cb.onStateChange = st.OnStateChange
if st.MaxRequests == 0 {
cb.maxRequests = 1
} else {
cb.maxRequests = st.MaxRequests
}
if st.Interval <= 0 {
cb.interval = defaultInterval
} else {
cb.interval = st.Interval
}
if st.Timeout <= 0 {
cb.timeout = defaultTimeout
} else {
cb.timeout = st.Timeout
}
if st.ReadyToTrip == nil {
cb.readyToTrip = defaultReadyToTrip
} else {
cb.readyToTrip = st.ReadyToTrip
}
if st.IsSuccessful == nil {
cb.isSuccessful = defaultIsSuccessful
} else {
cb.isSuccessful = st.IsSuccessful
}
cb.toNewGeneration(time.Now())
return cb
}
// NewTwoStepCircuitBreaker returns a new TwoStepCircuitBreaker configured with the given Settings.
func NewTwoStepCircuitBreaker(st Settings) *TwoStepCircuitBreaker {
return &TwoStepCircuitBreaker{
cb: NewCircuitBreaker(st),
}
}
const defaultInterval = time.Duration(0) * time.Second
const defaultTimeout = time.Duration(60) * time.Second
func defaultReadyToTrip(counts Counts) bool {
return counts.ConsecutiveFailures > 5
}
func defaultIsSuccessful(err error) bool {
return err == nil
}
// Name returns the name of the CircuitBreaker.
func (cb *CircuitBreaker) Name() string {
return cb.name
}
// State returns the current state of the CircuitBreaker.
func (cb *CircuitBreaker) State() State {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, _ := cb.currentState(now)
return state
}
// Counts returns internal counters
func (cb *CircuitBreaker) Counts() Counts {
cb.mutex.Lock()
defer cb.mutex.Unlock()
return cb.counts
}
// Execute runs the given request if the CircuitBreaker accepts it.
// Execute returns an error instantly if the CircuitBreaker rejects the request.
// Otherwise, Execute returns the result of the request.
// If a panic occurs in the request, the CircuitBreaker handles it as an error
// and causes the same panic again.
func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) {
generation, err := cb.beforeRequest()
if err != nil {
return nil, err
}
defer func() {
e := recover()
if e != nil {
cb.afterRequest(generation, false)
panic(e)
}
}()
result, err := req()
cb.afterRequest(generation, cb.isSuccessful(err))
return result, err
}
// Name returns the name of the TwoStepCircuitBreaker.
func (tscb *TwoStepCircuitBreaker) Name() string {
return tscb.cb.Name()
}
// State returns the current state of the TwoStepCircuitBreaker.
func (tscb *TwoStepCircuitBreaker) State() State {
return tscb.cb.State()
}
// Counts returns internal counters
func (tscb *TwoStepCircuitBreaker) Counts() Counts {
return tscb.cb.Counts()
}
// Allow checks if a new request can proceed. It returns a callback that should be used to
// register the success or failure in a separate step. If the circuit breaker doesn't allow
// requests, it returns an error.
func (tscb *TwoStepCircuitBreaker) Allow() (done func(success bool), err error) {
generation, err := tscb.cb.beforeRequest()
if err != nil {
return nil, err
}
return func(success bool) {
tscb.cb.afterRequest(generation, success)
}, nil
}
func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, generation := cb.currentState(now)
if state == StateOpen {
return generation, ErrOpenState
} else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests {
return generation, ErrTooManyRequests
}
cb.counts.onRequest()
return generation, nil
}
func (cb *CircuitBreaker) afterRequest(before uint64, success bool) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, generation := cb.currentState(now)
if generation != before {
return
}
if success {
cb.onSuccess(state, now)
} else {
cb.onFailure(state, now)
}
}
func (cb *CircuitBreaker) onSuccess(state State, now time.Time) {
switch state {
case StateClosed:
cb.counts.onSuccess()
case StateHalfOpen:
cb.counts.onSuccess()
if cb.counts.ConsecutiveSuccesses >= cb.maxRequests {
cb.setState(StateClosed, now)
}
}
}
func (cb *CircuitBreaker) onFailure(state State, now time.Time) {
switch state {
case StateClosed:
cb.counts.onFailure()
if cb.readyToTrip(cb.counts) {
cb.setState(StateOpen, now)
}
case StateHalfOpen:
cb.setState(StateOpen, now)
}
}
func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) {
switch cb.state {
case StateClosed:
if !cb.expiry.IsZero() && cb.expiry.Before(now) {
cb.toNewGeneration(now)
}
case StateOpen:
if cb.expiry.Before(now) {
cb.setState(StateHalfOpen, now)
}
}
return cb.state, cb.generation
}
func (cb *CircuitBreaker) setState(state State, now time.Time) {
if cb.state == state {
return
}
prev := cb.state
cb.state = state
cb.toNewGeneration(now)
if cb.onStateChange != nil {
cb.onStateChange(cb.name, prev, state)
}
}
func (cb *CircuitBreaker) toNewGeneration(now time.Time) {
cb.generation++
cb.counts.clear()
var zero time.Time
switch cb.state {
case StateClosed:
if cb.interval == 0 {
cb.expiry = zero
} else {
cb.expiry = now.Add(cb.interval)
}
case StateOpen:
cb.expiry = now.Add(cb.timeout)
default: // StateHalfOpen
cb.expiry = zero
}
}

8
vendor/modules.txt vendored
View File

@@ -367,7 +367,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1
github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1
github.com/cs3org/go-cs3apis/cs3/tx/v1beta1
github.com/cs3org/go-cs3apis/cs3/types/v1beta1
# github.com/cs3org/reva/v2 v2.26.4
# github.com/cs3org/reva/v2 v2.26.5-0.20241111162950-e77dd61e7edb
## explicit; go 1.22.0
github.com/cs3org/reva/v2/cmd/revad/internal/grace
github.com/cs3org/reva/v2/cmd/revad/runtime
@@ -961,9 +961,6 @@ github.com/go-micro/plugins/v4/store/nats-js-kv
# github.com/go-micro/plugins/v4/store/redis v1.2.1
## explicit; go 1.17
github.com/go-micro/plugins/v4/store/redis
# github.com/go-micro/plugins/v4/wrapper/breaker/gobreaker v1.2.0
## explicit; go 1.17
github.com/go-micro/plugins/v4/wrapper/breaker/gobreaker
# github.com/go-micro/plugins/v4/wrapper/monitoring/prometheus v1.2.0
## explicit; go 1.17
github.com/go-micro/plugins/v4/wrapper/monitoring/prometheus
@@ -1798,9 +1795,6 @@ github.com/sirupsen/logrus
# github.com/skeema/knownhosts v1.2.1
## explicit; go 1.17
github.com/skeema/knownhosts
# github.com/sony/gobreaker v0.5.0
## explicit; go 1.12
github.com/sony/gobreaker
# github.com/spacewander/go-suffix-tree v0.0.0-20191010040751-0865e368c784
## explicit
github.com/spacewander/go-suffix-tree