feat(ocis): concurrency for consistency command

Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
jkoberg
2024-05-31 16:35:18 +02:00
parent 969dabae78
commit e96921708e
2 changed files with 245 additions and 146 deletions

View File

@@ -2,16 +2,11 @@
package backup
import (
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"regexp"
"strings"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/shamaton/msgpack/v2"
)
// Inconsistency describes the type of inconsistency
@@ -40,13 +35,7 @@ var (
_trashRegex = regexp.MustCompile(`\.T\.[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]+Z$`)
)
// ListBlobstore required to check blob consistency
type ListBlobstore interface {
List() ([]*node.Node, error)
Path(node *node.Node) string
}
// Consistency holds the node and blob data of a space
// Consistency holds the node and blob data of a storage provider
type Consistency struct {
// Storing the data like this might take a lot of memory
// we might need to optimize this if we run into memory issues
@@ -57,14 +46,10 @@ type Consistency struct {
nodeToLink map[string]string
blobToNode map[string]string
fsys fs.FS
discpath string
lbs ListBlobstore
}
// New creates a new Consistency object
func New(fsys fs.FS, discpath string, lbs ListBlobstore) *Consistency {
// NewConsistency creates a new Consistency object
func NewConsistency() *Consistency {
return &Consistency{
Nodes: make(map[string][]Inconsistency),
LinkedNodes: make(map[string][]Inconsistency),
@@ -73,10 +58,6 @@ func New(fsys fs.FS, discpath string, lbs ListBlobstore) *Consistency {
nodeToLink: make(map[string]string),
blobToNode: make(map[string]string),
fsys: fsys,
discpath: discpath,
lbs: lbs,
}
}
@@ -84,128 +65,86 @@ func New(fsys fs.FS, discpath string, lbs ListBlobstore) *Consistency {
func CheckProviderConsistency(storagepath string, lbs ListBlobstore) error {
fsys := os.DirFS(storagepath)
c := New(fsys, storagepath, lbs)
if err := c.Initialize(); err != nil {
return err
}
if err := c.Evaluate(); err != nil {
return err
}
return c.PrintResults()
}
// Initialize initializes the Consistency object
func (c *Consistency) Initialize() error {
dirs, err := fs.Glob(c.fsys, "spaces/*/*/nodes/*/*/*/*")
nodes, links, blobs, quit, err := NewProvider(fsys, storagepath, lbs).ProduceData()
if err != nil {
return err
}
if len(dirs) == 0 {
return errors.New("no backup found. Double check storage path")
}
c := NewConsistency()
c.GatherData(nodes, links, blobs, quit)
for _, d := range dirs {
entries, err := fs.ReadDir(c.fsys, d)
if err != nil {
return err
}
if len(entries) == 0 {
fmt.Println("empty dir", filepath.Join(c.discpath, d))
continue
}
for _, e := range entries {
switch {
case e.IsDir():
ls, err := fs.ReadDir(c.fsys, filepath.Join(d, e.Name()))
if err != nil {
fmt.Println("error reading dir", err)
continue
}
for _, l := range ls {
linkpath := filepath.Join(c.discpath, d, e.Name(), l.Name())
r, _ := os.Readlink(linkpath)
nodePath := filepath.Join(c.discpath, d, e.Name(), r)
c.LinkedNodes[nodePath] = []Inconsistency{}
c.nodeToLink[nodePath] = linkpath
}
fallthrough
case filepath.Ext(e.Name()) == "" || _versionRegex.MatchString(e.Name()) || _trashRegex.MatchString(e.Name()):
if !c.filesExist(filepath.Join(d, e.Name())) {
dp := filepath.Join(c.discpath, d, e.Name())
c.Nodes[dp] = append(c.Nodes[dp], InconsistencyFilesMissing)
}
inc := c.checkNode(filepath.Join(d, e.Name()))
dp := filepath.Join(c.discpath, d, e.Name())
if inc != "" {
c.Nodes[dp] = append(c.Nodes[dp], inc)
} else if len(c.Nodes[dp]) == 0 {
c.Nodes[dp] = []Inconsistency{}
}
}
}
}
links, err := fs.Glob(c.fsys, "spaces/*/*/trash/*/*/*/*/*")
if err != nil {
return err
}
for _, l := range links {
linkpath := filepath.Join(c.discpath, l)
r, _ := os.Readlink(linkpath)
p := filepath.Join(c.discpath, l, "..", r)
c.LinkedNodes[p] = []Inconsistency{}
c.nodeToLink[p] = linkpath
}
return nil
return c.PrintResults(storagepath)
}
// Evaluate evaluates inconsistencies
func (c *Consistency) Evaluate() error {
// GatherData gathers and evaluates data produced by the DataProvider
func (c *Consistency) GatherData(nodes chan NodeData, links chan LinkData, blobs chan BlobData, quit chan struct{}) {
c.gatherData(nodes, links, blobs, quit)
for n := range c.Nodes {
if _, ok := c.LinkedNodes[n]; !ok && c.requiresSymlink(n) {
if len(c.Nodes[n]) == 0 {
c.Nodes[n] = append(c.Nodes[n], InconsistencySymlinkMissing)
continue
}
deleteInconsistency(c.LinkedNodes, n)
deleteInconsistency(c.Nodes, n)
}
// LinkedNodes should be empty now
for l := range c.LinkedNodes {
c.LinkedNodes[l] = append(c.LinkedNodes[l], InconsistencyNodeMissing)
}
blobs, err := c.lbs.List()
if err != nil {
return err
for b := range c.Blobs {
c.Blobs[b] = append(c.Blobs[b], InconsistencyBlobOrphaned)
}
for _, bn := range blobs {
p := c.lbs.Path(bn)
if _, ok := c.BlobReferences[p]; !ok {
c.Blobs[p] = append(c.Blobs[p], InconsistencyBlobOrphaned)
continue
}
deleteInconsistency(c.BlobReferences, p)
}
// BlobReferences should be empty now
for b := range c.BlobReferences {
c.BlobReferences[b] = append(c.BlobReferences[b], InconsistencyBlobMissing)
}
}
func (c *Consistency) gatherData(nodes chan NodeData, links chan LinkData, blobs chan BlobData, quit chan struct{}) {
for {
select {
case n := <-nodes:
// does it have inconsistencies?
if len(n.Inconsistencies) != 0 {
c.Nodes[n.NodePath] = append(c.Nodes[n.NodePath], n.Inconsistencies...)
}
// is it linked?
if _, ok := c.LinkedNodes[n.NodePath]; ok {
deleteInconsistency(c.LinkedNodes, n.NodePath)
deleteInconsistency(c.Nodes, n.NodePath)
} else if requiresSymlink(n.NodePath) {
c.Nodes[n.NodePath] = c.Nodes[n.NodePath]
}
// does it have a blob?
if n.BlobPath != "" {
if _, ok := c.Blobs[n.BlobPath]; ok {
deleteInconsistency(c.Blobs, n.BlobPath)
} else {
c.BlobReferences[n.BlobPath] = []Inconsistency{}
c.blobToNode[n.BlobPath] = n.NodePath
}
}
case l := <-links:
// does it have a node?
if _, ok := c.Nodes[l.NodePath]; ok {
deleteInconsistency(c.Nodes, l.NodePath)
} else {
c.LinkedNodes[l.NodePath] = []Inconsistency{}
c.nodeToLink[l.NodePath] = l.LinkPath
}
case b := <-blobs:
// does it have a reference?
if _, ok := c.BlobReferences[b.BlobPath]; ok {
deleteInconsistency(c.BlobReferences, b.BlobPath)
} else {
c.Blobs[b.BlobPath] = []Inconsistency{}
}
case <-quit:
return
}
}
return nil
}
// PrintResults prints the results of the evaluation
func (c *Consistency) PrintResults() error {
func (c *Consistency) PrintResults(discpath string) error {
if len(c.Nodes) != 0 {
fmt.Println("\n🚨 Inconsistent Nodes:")
}
@@ -231,34 +170,13 @@ func (c *Consistency) PrintResults() error {
fmt.Printf("\t👉 %v\tblob: %s\n\t\t\t\treferencing node:%s\n", c.BlobReferences[b], b, c.blobToNode[b])
}
if len(c.Nodes) == 0 && len(c.LinkedNodes) == 0 && len(c.Blobs) == 0 && len(c.BlobReferences) == 0 {
fmt.Printf("💚 No inconsistency found. The backup in '%s' seems to be valid.\n", c.discpath)
fmt.Printf("💚 No inconsistency found. The backup in '%s' seems to be valid.\n", discpath)
}
return nil
}
func (c *Consistency) checkNode(path string) Inconsistency {
b, err := fs.ReadFile(c.fsys, path+".mpk")
if err != nil {
return InconsistencyFilesMissing
}
m := map[string][]byte{}
if err := msgpack.Unmarshal(b, &m); err != nil {
return InconsistencyMalformedFile
}
if bid := m["user.ocis.blobid"]; string(bid) != "" {
spaceID, _ := getIDsFromPath(filepath.Join(c.discpath, path))
p := c.lbs.Path(&node.Node{BlobID: string(bid), SpaceID: spaceID})
c.BlobReferences[p] = []Inconsistency{}
c.blobToNode[p] = filepath.Join(c.discpath, path)
}
return ""
}
func (c *Consistency) requiresSymlink(path string) bool {
func requiresSymlink(path string) bool {
spaceID, nodeID := getIDsFromPath(path)
if nodeID != "" && spaceID != "" && (spaceID == nodeID || _versionRegex.MatchString(nodeID)) {
return false
@@ -267,7 +185,7 @@ func (c *Consistency) requiresSymlink(path string) bool {
return true
}
func (c *Consistency) filesExist(path string) bool {
func (c *DataProvider) filesExist(path string) bool {
check := func(p string) bool {
_, err := fs.Stat(c.fsys, p)
return err == nil

181
ocis/pkg/backup/provider.go Normal file
View File

@@ -0,0 +1,181 @@
package backup
import (
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"sync"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/shamaton/msgpack/v2"
)
// ListBlobstore required to check blob consistency
type ListBlobstore interface {
List() ([]*node.Node, error)
Path(node *node.Node) string
}
// DataProvider provides data for the consistency check
type DataProvider struct {
fsys fs.FS
discpath string
lbs ListBlobstore
}
// NodeData holds data about the nodes
type NodeData struct {
NodePath string
BlobPath string
Inconsistencies []Inconsistency
}
// LinkData about the symlinks
type LinkData struct {
LinkPath string
NodePath string
}
// BlobData about the blobs in the blobstore
type BlobData struct {
BlobPath string
}
// NewProvider creates a new DataProvider object
func NewProvider(fsys fs.FS, discpath string, lbs ListBlobstore) *DataProvider {
return &DataProvider{
fsys: fsys,
discpath: discpath,
lbs: lbs,
}
}
// ProduceData produces data for the consistency check
func (c *DataProvider) ProduceData() (chan NodeData, chan LinkData, chan BlobData, chan struct{}, error) {
dirs, err := fs.Glob(c.fsys, "spaces/*/*/nodes/*/*/*/*")
if err != nil {
return nil, nil, nil, nil, err
}
if len(dirs) == 0 {
return nil, nil, nil, nil, errors.New("no backup found. Double check storage path")
}
nodes := make(chan NodeData)
links := make(chan LinkData)
blobs := make(chan BlobData)
quit := make(chan struct{})
wg := sync.WaitGroup{}
// crawl spaces
wg.Add(1)
go func() {
for _, d := range dirs {
entries, err := fs.ReadDir(c.fsys, d)
if err != nil {
fmt.Println("error reading dir", err)
continue
}
if len(entries) == 0 {
fmt.Println("empty dir", filepath.Join(c.discpath, d))
continue
}
for _, e := range entries {
switch {
case e.IsDir():
ls, err := fs.ReadDir(c.fsys, filepath.Join(d, e.Name()))
if err != nil {
fmt.Println("error reading dir", err)
continue
}
for _, l := range ls {
linkpath := filepath.Join(c.discpath, d, e.Name(), l.Name())
r, _ := os.Readlink(linkpath)
nodePath := filepath.Join(c.discpath, d, e.Name(), r)
links <- LinkData{LinkPath: linkpath, NodePath: nodePath}
}
fallthrough
case filepath.Ext(e.Name()) == "" || _versionRegex.MatchString(e.Name()) || _trashRegex.MatchString(e.Name()):
np := filepath.Join(c.discpath, d, e.Name())
var inc []Inconsistency
if !c.filesExist(filepath.Join(d, e.Name())) {
inc = append(inc, InconsistencyFilesMissing)
}
bp, i := c.getBlobPath(filepath.Join(d, e.Name()))
if i != "" {
inc = append(inc, i)
}
nodes <- NodeData{NodePath: np, BlobPath: bp, Inconsistencies: inc}
}
}
}
wg.Done()
}()
// crawl trash
wg.Add(1)
go func() {
linkpaths, err := fs.Glob(c.fsys, "spaces/*/*/trash/*/*/*/*/*")
if err != nil {
fmt.Println("error reading trash", err)
}
for _, l := range linkpaths {
linkpath := filepath.Join(c.discpath, l)
r, _ := os.Readlink(linkpath)
p := filepath.Join(c.discpath, l, "..", r)
links <- LinkData{LinkPath: linkpath, NodePath: p}
}
wg.Done()
}()
// crawl blobstore
wg.Add(1)
go func() {
bs, err := c.lbs.List()
if err != nil {
fmt.Println("error listing blobs", err)
}
for _, bn := range bs {
blobs <- BlobData{BlobPath: c.lbs.Path(bn)}
}
wg.Done()
}()
// wait for all crawlers to finish
go func() {
wg.Wait()
quit <- struct{}{}
close(nodes)
close(links)
close(blobs)
close(quit)
}()
return nodes, links, blobs, quit, nil
}
func (c *DataProvider) getBlobPath(path string) (string, Inconsistency) {
b, err := fs.ReadFile(c.fsys, path+".mpk")
if err != nil {
return "", InconsistencyFilesMissing
}
m := map[string][]byte{}
if err := msgpack.Unmarshal(b, &m); err != nil {
return "", InconsistencyMalformedFile
}
if bid := m["user.ocis.blobid"]; string(bid) != "" {
spaceID, _ := getIDsFromPath(filepath.Join(c.discpath, path))
return c.lbs.Path(&node.Node{BlobID: string(bid), SpaceID: spaceID}), ""
}
return "", ""
}