diff --git a/changelog/unreleased/add-rebuild-jsoncs3-index-command.md b/changelog/unreleased/add-rebuild-jsoncs3-index-command.md new file mode 100644 index 000000000..6998c9a1d --- /dev/null +++ b/changelog/unreleased/add-rebuild-jsoncs3-index-command.md @@ -0,0 +1,5 @@ +Enhancement: Add command for rebuilding the jsoncs3 share manager indexes + +We added a command for rebuilding the jsoncs3 share manager indexes. + +https://github.com/owncloud/ocis/pull/6971 diff --git a/ocis/pkg/command/migrate.go b/ocis/pkg/command/migrate.go index 379f52559..f978381a2 100644 --- a/ocis/pkg/command/migrate.go +++ b/ocis/pkg/command/migrate.go @@ -2,25 +2,38 @@ package command import ( "context" + "encoding/json" + "fmt" "os" + "path/filepath" "sort" + "strings" "sync" collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/publicshare" publicregistry "github.com/cs3org/reva/v2/pkg/publicshare/manager/registry" + "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/cs3org/reva/v2/pkg/share" + "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3" + "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/providercache" + "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/shareid" "github.com/cs3org/reva/v2/pkg/share/manager/registry" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/migrator" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" + "github.com/cs3org/reva/v2/pkg/storage/utils/metadata" + "github.com/mitchellh/mapstructure" tw "github.com/olekukonko/tablewriter" + "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/owncloud/ocis/v2/ocis-pkg/config" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" "github.com/owncloud/ocis/v2/ocis-pkg/config/parser" oclog "github.com/owncloud/ocis/v2/ocis-pkg/log" + mregistry "github.com/owncloud/ocis/v2/ocis-pkg/registry" "github.com/owncloud/ocis/v2/ocis/pkg/register" sharing "github.com/owncloud/ocis/v2/services/sharing/pkg/config" sharingparser "github.com/owncloud/ocis/v2/services/sharing/pkg/config/parser" @@ -37,6 +50,7 @@ func Migrate(cfg *config.Config) *cli.Command { MigrateDecomposedfs(cfg), MigrateShares(cfg), MigratePublicShares(cfg), + RebuildJSONCS3Indexes(cfg), }, } } @@ -45,6 +59,142 @@ func init() { register.AddCommand(Migrate) } +// RebuildJSONCS3Indexes rebuilds the share indexes from the shares json +func RebuildJSONCS3Indexes(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "rebuild-jsoncs3-indexes", + Usage: "rebuild the share indexes from the shares json", + Subcommands: []*cli.Command{ + ListDecomposedfsMigrations(cfg), + }, + Flags: []cli.Flag{}, + Before: func(c *cli.Context) error { + // Parse base config + if err := parser.ParseConfig(cfg, true); err != nil { + return configlog.ReturnError(err) + } + + // Parse sharing config + cfg.Sharing.Commons = cfg.Commons + return configlog.ReturnError(sharingparser.ParseConfig(cfg.Sharing)) + }, + Action: func(c *cli.Context) error { + log := logger() + ctx := log.WithContext(context.Background()) + rcfg := revaShareConfig(cfg.Sharing) + + // Initialize registry to make service lookup work + _ = mregistry.GetRegistry() + + // Get a jsoncs3 manager to operate its caches + type config struct { + GatewayAddr string `mapstructure:"gateway_addr"` + MaxConcurrency int `mapstructure:"max_concurrency"` + ProviderAddr string `mapstructure:"provider_addr"` + ServiceUserID string `mapstructure:"service_user_id"` + ServiceUserIdp string `mapstructure:"service_user_idp"` + MachineAuthAPIKey string `mapstructure:"machine_auth_apikey"` + } + conf := &config{} + if err := mapstructure.Decode(rcfg["jsoncs3"], conf); err != nil { + err = errors.Wrap(err, "error creating a new manager") + return err + } + s, err := metadata.NewCS3Storage(conf.GatewayAddr, conf.ProviderAddr, conf.ServiceUserID, conf.ServiceUserIdp, conf.MachineAuthAPIKey) + if err != nil { + return err + } + err = s.Init(ctx, "jsoncs3-share-manager-metadata") + if err != nil { + return err + } + gc, err := pool.GetGatewayServiceClient(conf.GatewayAddr) + if err != nil { + return err + } + mgr, err := jsoncs3.New(s, gc, 0, nil, 1) + if err != nil { + return err + } + + // Rebuild indexes + errorsOccured := false + storages, err := s.ReadDir(ctx, "storages") + if err != nil { + return err + } + for iStorage, storage := range storages { + fmt.Printf("Scanning storage %s (%d/%d)\n", storage, iStorage+1, len(storages)) + spaces, err := s.ReadDir(ctx, filepath.Join("storages", storage)) + if err != nil { + fmt.Printf("failed! (%s)\n", err.Error()) + errorsOccured = true + continue + } + + for iSpace, space := range spaces { + fmt.Printf(" Rebuilding space '%s' %d/%d...", strings.TrimSuffix(space, ".json"), iSpace+1, len(spaces)) + + spaceBlob, err := s.SimpleDownload(ctx, filepath.Join("storages", storage, space)) + if err != nil { + fmt.Printf(" failed! (%s)\n", err.Error()) + errorsOccured = true + continue + } + shares := &providercache.Shares{} + err = json.Unmarshal(spaceBlob, shares) + if err != nil { + fmt.Printf(" failed! (%s)\n", err.Error()) + errorsOccured = true + continue + } + for _, share := range shares.Shares { + err = mgr.Cache.Add(ctx, share.ResourceId.StorageId, share.ResourceId.SpaceId, share.Id.OpaqueId, share) + if err != nil { + fmt.Printf(" adding share '%s' to the cache failed! (%s)\n", share.Id.OpaqueId, err.Error()) + errorsOccured = true + } + err = mgr.CreatedCache.Add(ctx, share.Creator.OpaqueId, share.Id.OpaqueId) + if err != nil { + fmt.Printf(" adding share '%s' to the created cache failed! (%s)\n", share.Id.OpaqueId, err.Error()) + errorsOccured = true + } + + spaceId := share.ResourceId.StorageId + shareid.IDDelimiter + share.ResourceId.SpaceId + switch share.Grantee.Type { + case provider.GranteeType_GRANTEE_TYPE_USER: + userid := share.Grantee.GetUserId().GetOpaqueId() + rs := &collaboration.ReceivedShare{ + Share: share, + State: collaboration.ShareState_SHARE_STATE_PENDING, + } + err := mgr.UserReceivedStates.Add(ctx, userid, spaceId, rs) + if err != nil { + fmt.Printf(" adding share '%s' to the user cache failed! (%s)\n", share.Id.OpaqueId, err.Error()) + errorsOccured = true + } + case provider.GranteeType_GRANTEE_TYPE_GROUP: + groupid := share.Grantee.GetGroupId().GetOpaqueId() + err := mgr.GroupReceivedCache.Add(ctx, groupid, spaceId) + if err != nil { + fmt.Printf(" adding share '%s' to the group cache failed! (%s)\n", share.Id.OpaqueId, err.Error()) + errorsOccured = true + } + } + } + fmt.Printf(" done\n") + } + fmt.Printf("done\n") + } + if errorsOccured { + fmt.Printf("There were errors. Please review the logs.") + } + + return nil + }, + } +} + // MigrateDecomposedfs is the entrypoint for the decomposedfs migrate command func MigrateDecomposedfs(cfg *config.Config) *cli.Command { return &cli.Command{