Merge pull request #6971 from aduffeck/rebuild-jsoncs3-index

Rebuild jsoncs3 index
This commit is contained in:
Andre Duffeck
2023-08-07 13:13:19 +02:00
committed by GitHub
2 changed files with 155 additions and 0 deletions

View File

@@ -0,0 +1,5 @@
Enhancement: Add command for rebuilding the jsoncs3 share manager indexes
We added a command for rebuilding the jsoncs3 share manager indexes.
https://github.com/owncloud/ocis/pull/6971

View File

@@ -2,25 +2,38 @@ package command
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"sync"
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/publicshare"
publicregistry "github.com/cs3org/reva/v2/pkg/publicshare/manager/registry"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/share"
"github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3"
"github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/providercache"
"github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/shareid"
"github.com/cs3org/reva/v2/pkg/share/manager/registry"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/migrator"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata"
"github.com/mitchellh/mapstructure"
tw "github.com/olekukonko/tablewriter"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/owncloud/ocis/v2/ocis-pkg/config"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/ocis-pkg/config/parser"
oclog "github.com/owncloud/ocis/v2/ocis-pkg/log"
mregistry "github.com/owncloud/ocis/v2/ocis-pkg/registry"
"github.com/owncloud/ocis/v2/ocis/pkg/register"
sharing "github.com/owncloud/ocis/v2/services/sharing/pkg/config"
sharingparser "github.com/owncloud/ocis/v2/services/sharing/pkg/config/parser"
@@ -37,6 +50,7 @@ func Migrate(cfg *config.Config) *cli.Command {
MigrateDecomposedfs(cfg),
MigrateShares(cfg),
MigratePublicShares(cfg),
RebuildJSONCS3Indexes(cfg),
},
}
}
@@ -45,6 +59,142 @@ func init() {
register.AddCommand(Migrate)
}
// RebuildJSONCS3Indexes rebuilds the share indexes from the shares json
func RebuildJSONCS3Indexes(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "rebuild-jsoncs3-indexes",
Usage: "rebuild the share indexes from the shares json",
Subcommands: []*cli.Command{
ListDecomposedfsMigrations(cfg),
},
Flags: []cli.Flag{},
Before: func(c *cli.Context) error {
// Parse base config
if err := parser.ParseConfig(cfg, true); err != nil {
return configlog.ReturnError(err)
}
// Parse sharing config
cfg.Sharing.Commons = cfg.Commons
return configlog.ReturnError(sharingparser.ParseConfig(cfg.Sharing))
},
Action: func(c *cli.Context) error {
log := logger()
ctx := log.WithContext(context.Background())
rcfg := revaShareConfig(cfg.Sharing)
// Initialize registry to make service lookup work
_ = mregistry.GetRegistry()
// Get a jsoncs3 manager to operate its caches
type config struct {
GatewayAddr string `mapstructure:"gateway_addr"`
MaxConcurrency int `mapstructure:"max_concurrency"`
ProviderAddr string `mapstructure:"provider_addr"`
ServiceUserID string `mapstructure:"service_user_id"`
ServiceUserIdp string `mapstructure:"service_user_idp"`
MachineAuthAPIKey string `mapstructure:"machine_auth_apikey"`
}
conf := &config{}
if err := mapstructure.Decode(rcfg["jsoncs3"], conf); err != nil {
err = errors.Wrap(err, "error creating a new manager")
return err
}
s, err := metadata.NewCS3Storage(conf.GatewayAddr, conf.ProviderAddr, conf.ServiceUserID, conf.ServiceUserIdp, conf.MachineAuthAPIKey)
if err != nil {
return err
}
err = s.Init(ctx, "jsoncs3-share-manager-metadata")
if err != nil {
return err
}
gc, err := pool.GetGatewayServiceClient(conf.GatewayAddr)
if err != nil {
return err
}
mgr, err := jsoncs3.New(s, gc, 0, nil, 1)
if err != nil {
return err
}
// Rebuild indexes
errorsOccured := false
storages, err := s.ReadDir(ctx, "storages")
if err != nil {
return err
}
for iStorage, storage := range storages {
fmt.Printf("Scanning storage %s (%d/%d)\n", storage, iStorage+1, len(storages))
spaces, err := s.ReadDir(ctx, filepath.Join("storages", storage))
if err != nil {
fmt.Printf("failed! (%s)\n", err.Error())
errorsOccured = true
continue
}
for iSpace, space := range spaces {
fmt.Printf(" Rebuilding space '%s' %d/%d...", strings.TrimSuffix(space, ".json"), iSpace+1, len(spaces))
spaceBlob, err := s.SimpleDownload(ctx, filepath.Join("storages", storage, space))
if err != nil {
fmt.Printf(" failed! (%s)\n", err.Error())
errorsOccured = true
continue
}
shares := &providercache.Shares{}
err = json.Unmarshal(spaceBlob, shares)
if err != nil {
fmt.Printf(" failed! (%s)\n", err.Error())
errorsOccured = true
continue
}
for _, share := range shares.Shares {
err = mgr.Cache.Add(ctx, share.ResourceId.StorageId, share.ResourceId.SpaceId, share.Id.OpaqueId, share)
if err != nil {
fmt.Printf(" adding share '%s' to the cache failed! (%s)\n", share.Id.OpaqueId, err.Error())
errorsOccured = true
}
err = mgr.CreatedCache.Add(ctx, share.Creator.OpaqueId, share.Id.OpaqueId)
if err != nil {
fmt.Printf(" adding share '%s' to the created cache failed! (%s)\n", share.Id.OpaqueId, err.Error())
errorsOccured = true
}
spaceId := share.ResourceId.StorageId + shareid.IDDelimiter + share.ResourceId.SpaceId
switch share.Grantee.Type {
case provider.GranteeType_GRANTEE_TYPE_USER:
userid := share.Grantee.GetUserId().GetOpaqueId()
rs := &collaboration.ReceivedShare{
Share: share,
State: collaboration.ShareState_SHARE_STATE_PENDING,
}
err := mgr.UserReceivedStates.Add(ctx, userid, spaceId, rs)
if err != nil {
fmt.Printf(" adding share '%s' to the user cache failed! (%s)\n", share.Id.OpaqueId, err.Error())
errorsOccured = true
}
case provider.GranteeType_GRANTEE_TYPE_GROUP:
groupid := share.Grantee.GetGroupId().GetOpaqueId()
err := mgr.GroupReceivedCache.Add(ctx, groupid, spaceId)
if err != nil {
fmt.Printf(" adding share '%s' to the group cache failed! (%s)\n", share.Id.OpaqueId, err.Error())
errorsOccured = true
}
}
}
fmt.Printf(" done\n")
}
fmt.Printf("done\n")
}
if errorsOccured {
fmt.Printf("There were errors. Please review the logs.")
}
return nil
},
}
}
// MigrateDecomposedfs is the entrypoint for the decomposedfs migrate command
func MigrateDecomposedfs(cfg *config.Config) *cli.Command {
return &cli.Command{