feat(ocis): benchmark revision listing possiblities

Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
jkoberg
2024-08-21 16:41:42 +02:00
parent 585bd82192
commit e548f29b84
4 changed files with 250 additions and 143 deletions

View File

@@ -0,0 +1,5 @@
Enhancement: Improve revisions purge
The `revisions purge` command would time out on big spaces. We have improved performance by parallelizing the process.
https://github.com/owncloud/ocis/pull/9891

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"path/filepath"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
ocisbs "github.com/cs3org/reva/v2/pkg/storage/fs/ocis/blobstore"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup"
s3bs "github.com/cs3org/reva/v2/pkg/storage/fs/s3ng/blobstore"
@@ -19,7 +20,7 @@ import (
var (
// _nodesGlobPattern is the glob pattern to find all nodes
_nodesGlobPattern = "spaces/*/*/*/*/*/*/*/*"
_nodesGlobPattern = "spaces/*/*/nodes/"
)
// RevisionsCommand is the entrypoint for the revisions command.
@@ -30,7 +31,7 @@ func RevisionsCommand(cfg *config.Config) *cli.Command {
Subcommands: []*cli.Command{
PurgeRevisionsCommand(cfg),
},
Before: func(c *cli.Context) error {
Before: func(_ *cli.Context) error {
return configlog.ReturnError(parser.ParseConfig(cfg, true))
},
Action: func(_ *cli.Context) error {
@@ -74,6 +75,11 @@ func PurgeRevisionsCommand(cfg *config.Config) *cli.Command {
Aliases: []string{"r"},
Usage: "purge all revisions of this file/space. If not set, all revisions will be purged",
},
&cli.StringFlag{
Name: "glob-mechanism",
Usage: "the glob mechanism to find all nodes. Can be 'glob', 'list' or 'workers'. 'glob' uses globbing with a single worker. 'workers' spawns multiple go routines, accelatering the command drastically but causing high cpu and ram usage. 'list' looks for references by listing directories with multiple workers. Default is 'glob'",
Value: "glob",
},
},
Action: func(c *cli.Context) error {
basePath := c.String("basepath")
@@ -108,43 +114,72 @@ func PurgeRevisionsCommand(cfg *config.Config) *cli.Command {
return err
}
p, err := generatePath(basePath, c.String("resource-id"))
if err != nil {
fmt.Printf("❌ Error parsing resourceID: %s", err)
return err
var rid *provider.ResourceId
resid, err := storagespace.ParseID(c.String("resource-id"))
if err == nil {
rid = &resid
}
if err := revisions.PurgeRevisions(p, bs, c.Bool("dry-run"), c.Bool("verbose")); err != nil {
fmt.Printf("❌ Error purging revisions: %s", err)
return err
mechanism := c.String("glob-mechanism")
if rid.GetOpaqueId() != "" {
mechanism = "glob"
}
var ch <-chan string
switch mechanism {
default:
fallthrough
case "glob":
p := generatePath(basePath, rid)
if rid.GetOpaqueId() == "" {
p = filepath.Join(p, "*/*/*/*/*")
}
ch = revisions.Glob(p)
case "workers":
p := generatePath(basePath, rid)
ch = revisions.GlobWorkers(p, "/*", "/*/*/*/*")
case "list":
p := filepath.Join(basePath, "spaces")
if rid != nil {
p = generatePath(basePath, rid)
}
ch = revisions.List(p, 10)
}
files, blobs, revisions := revisions.PurgeRevisions(ch, bs, c.Bool("dry-run"), c.Bool("verbose"))
printResults(files, blobs, revisions, c.Bool("dry-run"))
return nil
},
}
}
func generatePath(basePath string, resourceID string) (string, error) {
if resourceID == "" {
return filepath.Join(basePath, _nodesGlobPattern), nil
func printResults(countFiles, countBlobs, countRevisions int, dryRun bool) {
switch {
case countFiles == 0 && countRevisions == 0 && countBlobs == 0:
fmt.Println("❎ No revisions found. Storage provider is clean.")
case !dryRun:
fmt.Printf("✅ Deleted %d revisions (%d files / %d blobs)\n", countRevisions, countFiles, countBlobs)
default:
fmt.Printf("👉 Would delete %d revisions (%d files / %d blobs)\n", countRevisions, countFiles, countBlobs)
}
}
rid, err := storagespace.ParseID(resourceID)
if err != nil {
return "", err
func generatePath(basePath string, rid *provider.ResourceId) string {
if rid == nil {
return filepath.Join(basePath, _nodesGlobPattern)
}
sid := lookup.Pathify(rid.GetSpaceId(), 1, 2)
if sid == "" {
sid = "*/*"
return ""
}
nid := lookup.Pathify(rid.GetOpaqueId(), 4, 2)
if nid == "" {
nid = "*/*/*/*/"
return filepath.Join(basePath, "spaces", sid, "nodes")
}
return filepath.Join(basePath, "spaces", sid, "nodes", nid+"*"), nil
return filepath.Join(basePath, "spaces", sid, "nodes", nid+"*")
}
func init() {

View File

@@ -7,6 +7,7 @@ import (
"path/filepath"
"regexp"
"strings"
"sync"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/shamaton/msgpack/v2"
@@ -25,16 +26,12 @@ type DelBlobstore interface {
Delete(node *node.Node) error
}
// PurgeRevisionsGlob removes all revisions from a storage provider using globbing.
func PurgeRevisionsGlob(pattern string, bs DelBlobstore, dryRun bool, verbose bool) (int, int, int) {
if verbose {
fmt.Println("Looking for nodes in", pattern)
}
// Glob uses globbing to find all revision nodes in a storage provider.
func Glob(pattern string) <-chan string {
ch := make(chan string)
go func() {
defer close(ch)
nodes, err := filepath.Glob(pattern)
nodes, err := filepath.Glob(filepath.Join(pattern))
if err != nil {
fmt.Println("error globbing", pattern, err)
return
@@ -52,11 +49,51 @@ func PurgeRevisionsGlob(pattern string, bs DelBlobstore, dryRun bool, verbose bo
}
}()
return purgeRevisions(ch, bs, dryRun, verbose)
return ch
}
// PurgeRevisionsWalk removes all revisions from a storage provider using walking.
func PurgeRevisionsWalk(base string, bs DelBlobstore, dryRun bool, verbose bool) (int, int, int) {
// GlobWorkers uses multiple go routine to glob all revision nodes in a storage provider.
func GlobWorkers(pattern string, depth string, remainder string) <-chan string {
wg := sync.WaitGroup{}
ch := make(chan string)
go func() {
defer close(ch)
nodes, err := filepath.Glob(pattern + depth)
if err != nil {
fmt.Println("error globbing", pattern, err)
return
}
if len(nodes) == 0 {
fmt.Println("no nodes found. Double check storage path")
return
}
for _, node := range nodes {
wg.Add(1)
go func(node string) {
defer wg.Done()
nodes, err := filepath.Glob(node + remainder)
if err != nil {
fmt.Println("error globbing", node, err)
return
}
for _, n := range nodes {
if _versionRegex.MatchString(n) {
ch <- n
}
}
}(node)
}
wg.Wait()
}()
return ch
}
// Walk walks the storage provider to find all revision nodes.
func Walk(base string) <-chan string {
ch := make(chan string)
go func() {
defer close(ch)
@@ -79,58 +116,25 @@ func PurgeRevisionsWalk(base string, bs DelBlobstore, dryRun bool, verbose bool)
}
}()
return purgeRevisions(ch, bs, dryRun, verbose)
return ch
}
// PurgeRevisionsList removes all revisions from a storage provider using listing.
func PurgeRevisionsList(base string, bs DelBlobstore, dryRun bool, verbose bool) (int, int, int) {
// List uses directory listing to find all revision nodes in a storage provider.
func List(base string, workers int) <-chan string {
ch := make(chan string)
go func() {
defer close(ch)
if err := listFolder(base, ch); err != nil {
if err := listFolder(base, ch, make(chan struct{}, workers)); err != nil {
fmt.Println("error listing", base, err)
return
}
}()
return purgeRevisions(ch, bs, dryRun, verbose)
return ch
}
func listFolder(path string, ch chan<- string) error {
children, err := os.ReadDir(path)
if err != nil {
return err
}
for _, child := range children {
if child.IsDir() {
if err := listFolder(filepath.Join(path, child.Name()), ch); err != nil {
return err
}
}
if _versionRegex.MatchString(child.Name()) {
ch <- filepath.Join(path, child.Name())
}
}
return nil
}
// PrintResults prints the results
func PrintResults(countFiles, countBlobs, countRevisions int, dryRun bool) error {
switch {
case countFiles == 0 && countRevisions == 0 && countBlobs == 0:
fmt.Println("❎ No revisions found. Storage provider is clean.")
case !dryRun:
fmt.Printf("✅ Deleted %d revisions (%d files / %d blobs)\n", countRevisions, countFiles, countBlobs)
default:
fmt.Printf("👉 Would delete %d revisions (%d files / %d blobs)\n", countRevisions, countFiles, countBlobs)
}
return nil
}
func purgeRevisions(nodes <-chan string, bs DelBlobstore, dryRun, verbose bool) (int, int, int) {
// PurgeRevisions removes all revisions from a storage provider.
func PurgeRevisions(nodes <-chan string, bs DelBlobstore, dryRun, verbose bool) (int, int, int) {
countFiles := 0
countBlobs := 0
countRevisions := 0
@@ -201,6 +205,37 @@ func purgeRevisions(nodes <-chan string, bs DelBlobstore, dryRun, verbose bool)
return countFiles, countBlobs, countRevisions
}
func listFolder(path string, ch chan<- string, workers chan struct{}) error {
workers <- struct{}{}
wg := sync.WaitGroup{}
children, err := os.ReadDir(path)
if err != nil {
<-workers
return err
}
for _, child := range children {
if child.IsDir() {
wg.Add(1)
go func() {
defer wg.Done()
if err := listFolder(filepath.Join(path, child.Name()), ch, workers); err != nil {
fmt.Println("error listing", path, err)
}
}()
}
if _versionRegex.MatchString(child.Name()) {
ch <- filepath.Join(path, child.Name())
}
}
<-workers
wg.Wait()
return nil
}
func getBlobID(path string) (string, error) {
b, err := os.ReadFile(path)
if err != nil {

View File

@@ -14,7 +14,7 @@ import (
)
var (
_basePath = "test_temp/spaces/8f/638374-6ea8-4f0d-80c4-66d9b49830a5/nodes/"
_basePath = "/spaces/8f/638374-6ea8-4f0d-80c4-66d9b49830a5/nodes/"
)
// func TestInit(t *testing.T) {
@@ -22,117 +22,149 @@ var (
// defer os.RemoveAll("test_temp")
// }
func TestGlob30(t *testing.T) { testGlob(t, 10, 2) }
func TestGlob80(t *testing.T) { testGlob(t, 20, 3) }
func TestGlob250(t *testing.T) { testGlob(t, 50, 4) }
func TestGlob600(t *testing.T) { testGlob(t, 100, 5) }
func TestGlob30(t *testing.T) { test(t, 10, 2, glob) }
func TestGlob80(t *testing.T) { test(t, 20, 3, glob) }
func TestGlob250(t *testing.T) { test(t, 50, 4, glob) }
func TestGlob600(t *testing.T) { test(t, 100, 5, glob) }
func TestWalk30(t *testing.T) { testWalk(t, 10, 2) }
func TestWalk80(t *testing.T) { testWalk(t, 20, 3) }
func TestWalk250(t *testing.T) { testWalk(t, 50, 4) }
func TestWalk600(t *testing.T) { testWalk(t, 100, 5) }
func TestWalk30(t *testing.T) { test(t, 10, 2, walk) }
func TestWalk80(t *testing.T) { test(t, 20, 3, walk) }
func TestWalk250(t *testing.T) { test(t, 50, 4, walk) }
func TestWalk600(t *testing.T) { test(t, 100, 5, walk) }
func TestList30(t *testing.T) { testList(t, 10, 2) }
func TestList80(t *testing.T) { testList(t, 20, 3) }
func TestList250(t *testing.T) { testList(t, 50, 4) }
func TestList600(t *testing.T) { testList(t, 100, 5) }
func TestList30(t *testing.T) { test(t, 10, 2, list2) }
func TestList80(t *testing.T) { test(t, 20, 3, list10) }
func TestList250(t *testing.T) { test(t, 50, 4, list20) }
func TestList600(t *testing.T) { test(t, 100, 5, list2) }
func BenchmarkGlob30(b *testing.B) { benchmarkGlob(b, 10, 2) }
func BenchmarkWalk30(b *testing.B) { benchmarkWalk(b, 10, 2) }
func BenchmarkList30(b *testing.B) { benchmarkList(b, 10, 2) }
func TestGlobWorkers30(t *testing.T) { test(t, 10, 2, globWorkersD1) }
func TestGlobWorkers80(t *testing.T) { test(t, 20, 3, globWorkersD2) }
func TestGlobWorkers250(t *testing.T) { test(t, 50, 4, globWorkersD4) }
func TestGlobWorkers600(t *testing.T) { test(t, 100, 5, globWorkersD2) }
func BenchmarkGlob80(b *testing.B) { benchmarkGlob(b, 20, 3) }
func BenchmarkWalk80(b *testing.B) { benchmarkWalk(b, 20, 3) }
func BenchmarkList80(b *testing.B) { benchmarkList(b, 20, 3) }
func BenchmarkGlob30(b *testing.B) { benchmark(b, 10, 2, glob) }
func BenchmarkWalk30(b *testing.B) { benchmark(b, 10, 2, walk) }
func BenchmarkList30(b *testing.B) { benchmark(b, 10, 2, list2) }
func BenchmarkGlobWorkers30(b *testing.B) { benchmark(b, 10, 2, globWorkersD2) }
func BenchmarkGlob250(b *testing.B) { benchmarkGlob(b, 50, 4) }
func BenchmarkWalk250(b *testing.B) { benchmarkWalk(b, 50, 4) }
func BenchmarkList250(b *testing.B) { benchmarkList(b, 50, 4) }
func BenchmarkGlob80(b *testing.B) { benchmark(b, 20, 3, glob) }
func BenchmarkWalk80(b *testing.B) { benchmark(b, 20, 3, walk) }
func BenchmarkList80(b *testing.B) { benchmark(b, 20, 3, list2) }
func BenchmarkGlobWorkers80(b *testing.B) { benchmark(b, 20, 3, globWorkersD2) }
func BenchmarkGlob600(b *testing.B) { benchmarkGlob(b, 100, 5) }
func BenchmarkWalk600(b *testing.B) { benchmarkWalk(b, 100, 5) }
func BenchmarkList600(b *testing.B) { benchmarkList(b, 100, 5) }
func BenchmarkGlob250(b *testing.B) { benchmark(b, 50, 4, glob) }
func BenchmarkWalk250(b *testing.B) { benchmark(b, 50, 4, walk) }
func BenchmarkList250(b *testing.B) { benchmark(b, 50, 4, list2) }
func BenchmarkGlobWorkers250(b *testing.B) { benchmark(b, 50, 4, globWorkersD2) }
func BenchmarkGlob11000(b *testing.B) { benchmarkGlob(b, 1000, 10) }
func BenchmarkWalk11000(b *testing.B) { benchmarkWalk(b, 1000, 10) }
func BenchmarkList11000(b *testing.B) { benchmarkList(b, 1000, 10) }
func BenchmarkGlobAT600(b *testing.B) { benchmark(b, 100, 5, glob) }
func BenchmarkWalkAT600(b *testing.B) { benchmark(b, 100, 5, walk) }
func BenchmarkList2AT600(b *testing.B) { benchmark(b, 100, 5, list2) }
func BenchmarkList10AT600(b *testing.B) { benchmark(b, 100, 5, list10) }
func BenchmarkList20AT600(b *testing.B) { benchmark(b, 100, 5, list20) }
func BenchmarkGlobWorkersD1AT600(b *testing.B) { benchmark(b, 100, 5, globWorkersD1) }
func BenchmarkGlobWorkersD2AT600(b *testing.B) { benchmark(b, 100, 5, globWorkersD2) }
func BenchmarkGlobWorkersD4AT600(b *testing.B) { benchmark(b, 100, 5, globWorkersD4) }
func BenchmarkGlob110000(b *testing.B) { benchmarkGlob(b, 10000, 10) }
func BenchmarkWalk110000(b *testing.B) { benchmarkWalk(b, 10000, 10) }
func BenchmarkList110000(b *testing.B) { benchmarkList(b, 10000, 10) }
func BenchmarkGlobAT22000(b *testing.B) { benchmark(b, 2000, 10, glob) }
func BenchmarkWalkAT22000(b *testing.B) { benchmark(b, 2000, 10, walk) }
func BenchmarkList2AT22000(b *testing.B) { benchmark(b, 2000, 10, list2) }
func BenchmarkList10AT22000(b *testing.B) { benchmark(b, 2000, 10, list10) }
func BenchmarkList20AT22000(b *testing.B) { benchmark(b, 2000, 10, list20) }
func BenchmarkGlobWorkersD1AT22000(b *testing.B) { benchmark(b, 2000, 10, globWorkersD1) }
func BenchmarkGlobWorkersD2AT22000(b *testing.B) { benchmark(b, 2000, 10, globWorkersD2) }
func BenchmarkGlobWorkersD4AT22000(b *testing.B) { benchmark(b, 2000, 10, globWorkersD4) }
func benchmarkGlob(b *testing.B, numNodes int, numRevisions int) {
initialize(numNodes, numRevisions)
defer os.RemoveAll("test_temp")
func BenchmarkGlob110000(b *testing.B) { benchmark(b, 10000, 10, glob) }
func BenchmarkWalk110000(b *testing.B) { benchmark(b, 10000, 10, walk) }
func BenchmarkList110000(b *testing.B) { benchmark(b, 10000, 10, list2) }
func BenchmarkGlobWorkers110000(b *testing.B) { benchmark(b, 10000, 10, globWorkersD2) }
func benchmark(b *testing.B, numNodes int, numRevisions int, f func(string) <-chan string) {
base := initialize(numNodes, numRevisions)
defer os.RemoveAll(base)
b.ResetTimer()
for i := 0; i < b.N; i++ {
PurgeRevisionsGlob(_basePath+"*/*/*/*/*", nil, false, false)
ch := f(base)
PurgeRevisions(ch, nil, false, false)
}
b.StopTimer()
}
func benchmarkWalk(b *testing.B, numNodes int, numRevisions int) {
initialize(numNodes, numRevisions)
defer os.RemoveAll("test_temp")
func test(t *testing.T, numNodes int, numRevisions int, f func(string) <-chan string) {
base := initialize(numNodes, numRevisions)
defer os.RemoveAll(base)
for i := 0; i < b.N; i++ {
PurgeRevisionsWalk(_basePath, nil, false, false)
}
}
func benchmarkList(b *testing.B, numNodes int, numRevisions int) {
initialize(numNodes, numRevisions)
defer os.RemoveAll("test_temp")
for i := 0; i < b.N; i++ {
PurgeRevisionsList(_basePath, nil, false, false)
}
}
func testGlob(t *testing.T, numNodes int, numRevisions int) {
initialize(numNodes, numRevisions)
defer os.RemoveAll("test_temp")
_, _, revisions := PurgeRevisionsGlob(_basePath+"*/*/*/*/*", nil, false, false)
ch := f(base)
_, _, revisions := PurgeRevisions(ch, nil, false, false)
require.Equal(t, numNodes*numRevisions, revisions, "Deleted Revisions")
}
func testWalk(t *testing.T, numNodes int, numRevisions int) {
initialize(numNodes, numRevisions)
defer os.RemoveAll("test_temp")
_, _, revisions := PurgeRevisionsWalk(_basePath, nil, false, false)
require.Equal(t, numNodes*numRevisions, revisions, "Deleted Revisions")
func glob(base string) <-chan string {
return Glob(base + _basePath + "*/*/*/*/*")
}
func testList(t *testing.T, numNodes int, numRevisions int) {
initialize(numNodes, numRevisions)
defer os.RemoveAll("test_temp")
_, _, revisions := PurgeRevisionsList(_basePath, nil, false, false)
require.Equal(t, numNodes*numRevisions, revisions, "Deleted Revisions")
func walk(base string) <-chan string {
return Walk(base + _basePath)
}
func initialize(numNodes int, numRevisions int) {
// create base path
if err := os.MkdirAll(_basePath, fs.ModePerm); err != nil {
func list2(base string) <-chan string {
return List(base+_basePath, 2)
}
func list10(base string) <-chan string {
return List(base+_basePath, 10)
}
func list20(base string) <-chan string {
return List(base+_basePath, 20)
}
func globWorkersD1(base string) <-chan string {
return GlobWorkers(base+_basePath, "*", "/*/*/*/*")
}
func globWorkersD2(base string) <-chan string {
return GlobWorkers(base+_basePath, "*/*", "/*/*/*")
}
func globWorkersD4(base string) <-chan string {
return GlobWorkers(base+_basePath, "*/*/*/*", "/*")
}
func initialize(numNodes int, numRevisions int) string {
base := "test_temp_" + uuid.New().String()
if err := os.Mkdir(base, os.ModePerm); err != nil {
fmt.Println("Error creating test_temp directory", err)
os.RemoveAll(base)
os.Exit(1)
}
// create base path
if err := os.MkdirAll(base+_basePath, fs.ModePerm); err != nil {
fmt.Println("Error creating base path", err)
os.RemoveAll(base)
os.Exit(1)
}
for i := 0; i < numNodes; i++ {
path := lookup.Pathify(uuid.New().String(), 4, 2)
dir := filepath.Dir(path)
if err := os.MkdirAll(_basePath+dir, fs.ModePerm); err != nil {
if err := os.MkdirAll(base+_basePath+dir, fs.ModePerm); err != nil {
fmt.Println("Error creating test_temp directory", err)
os.RemoveAll(base)
os.Exit(1)
}
if _, err := os.Create(_basePath + path); err != nil {
if _, err := os.Create(base + _basePath + path); err != nil {
fmt.Println("Error creating file", err)
os.RemoveAll(base)
os.Exit(1)
}
for i := 0; i < numRevisions; i++ {
os.Create(_basePath + path + ".REV.2024-05-22T07:32:53.89969" + strconv.Itoa(i) + "Z")
os.Create(base + _basePath + path + ".REV.2024-05-22T07:32:53.89969" + strconv.Itoa(i) + "Z")
}
}
return base
}