Merge pull request #749 from owncloud/rebuild-index

Rebuild index
This commit is contained in:
Jörn Friedrich Dreyer
2020-10-24 09:07:13 +02:00
committed by GitHub
27 changed files with 2812 additions and 1176 deletions

View File

@@ -0,0 +1,34 @@
package command
import (
"context"
"fmt"
"github.com/micro/cli/v2"
"github.com/micro/go-micro/v2/client/grpc"
merrors "github.com/micro/go-micro/v2/errors"
"github.com/owncloud/ocis/accounts/pkg/config"
index "github.com/owncloud/ocis/accounts/pkg/proto/v0"
)
// RebuildIndex rebuilds the entire configured index.
func RebuildIndex(cdf *config.Config) *cli.Command {
return &cli.Command{
Name: "rebuildIndex",
Usage: "Rebuilds the service's index, i.e. deleting and then re-adding all existing documents",
Aliases: []string{"rebuild", "ri"},
Action: func(ctx *cli.Context) error {
idxSvcID := "com.owncloud.api.accounts"
idxSvc := index.NewIndexService(idxSvcID, grpc.NewClient())
_, err := idxSvc.RebuildIndex(context.Background(), &index.RebuildIndexRequest{})
if err != nil {
fmt.Println(merrors.FromError(err).Detail)
return err
}
fmt.Println("index rebuilt successfully")
return nil
},
}
}

View File

@@ -49,6 +49,7 @@ func Execute() error {
InspectAccount(cfg),
RemoveAccount(cfg),
PrintVersion(cfg),
RebuildIndex(cfg),
},
}

View File

@@ -99,12 +99,10 @@ func (idx *Autoincrement) Init() error {
idx.storageProvider = client
ctx := context.Background()
tk, err := idx.authenticate(ctx)
ctx, err := idx.getAuthenticatedContext(context.Background())
if err != nil {
return err
}
ctx = metadata.AppendToOutgoingContext(ctx, token.TokenHeader, tk)
if err := idx.makeDirIfNotExists(ctx, idx.indexBaseDir); err != nil {
return err
@@ -170,14 +168,12 @@ func (idx *Autoincrement) Remove(id string, v string) error {
return err
}
ctx := context.Background()
t, err := idx.authenticate(ctx)
ctx, err := idx.getAuthenticatedContext(context.Background())
if err != nil {
return err
}
deletePath := path.Join("/meta", idx.indexRootDir, v)
ctx = metadata.AppendToOutgoingContext(ctx, token.TokenHeader, t)
resp, err := idx.storageProvider.Delete(ctx, &provider.DeleteRequest{
Ref: &provider.Reference{
Spec: &provider.Reference_Path{Path: deletePath},
@@ -211,13 +207,11 @@ func (idx *Autoincrement) Update(id, oldV, newV string) error {
// Search allows for glob search on the index.
func (idx *Autoincrement) Search(pattern string) ([]string, error) {
ctx := context.Background()
t, err := idx.authenticate(ctx)
ctx, err := idx.getAuthenticatedContext(context.Background())
if err != nil {
return nil, err
}
ctx = metadata.AppendToOutgoingContext(ctx, token.TokenHeader, t)
res, err := idx.storageProvider.ListContainer(ctx, &provider.ListContainerRequest{
Ref: &provider.Reference{
Spec: &provider.Reference_Path{Path: path.Join("/meta", idx.indexRootDir)},
@@ -326,13 +320,11 @@ func (idx *Autoincrement) authenticate(ctx context.Context) (token string, err e
}
func (idx *Autoincrement) next() (int, error) {
ctx := context.Background()
t, err := idx.authenticate(ctx)
ctx, err := idx.getAuthenticatedContext(context.Background())
if err != nil {
return -1, err
}
ctx = metadata.AppendToOutgoingContext(ctx, token.TokenHeader, t)
res, err := idx.storageProvider.ListContainer(ctx, &provider.ListContainerRequest{
Ref: &provider.Reference{
Spec: &provider.Reference_Path{Path: path.Join("/meta", idx.indexRootDir)},
@@ -365,3 +357,22 @@ func (idx *Autoincrement) next() (int, error) {
return latest + 1, nil
}
func (idx *Autoincrement) getAuthenticatedContext(ctx context.Context) (context.Context, error) {
t, err := idx.authenticate(ctx)
if err != nil {
return nil, err
}
ctx = metadata.AppendToOutgoingContext(ctx, token.TokenHeader, t)
return ctx, nil
}
// Delete deletes the index folder from its storage.
func (idx *Autoincrement) Delete() error {
ctx, err := idx.getAuthenticatedContext(context.Background())
if err != nil {
return err
}
return deleteIndexRoot(ctx, idx.storageProvider, idx.indexRootDir)
}

View File

@@ -0,0 +1,26 @@
package cs3
import (
"context"
"fmt"
"path"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
)
func deleteIndexRoot(ctx context.Context, storageProvider provider.ProviderAPIClient, indexRootDir string) error {
res, err := storageProvider.Delete(ctx, &provider.DeleteRequest{
Ref: &provider.Reference{
Spec: &provider.Reference_Path{Path: path.Join("/meta", indexRootDir)},
},
})
if err != nil {
return err
}
if res.Status.Code != rpc.Code_CODE_OK {
return fmt.Errorf("error deleting index root dir: %v", indexRootDir)
}
return nil
}

View File

@@ -378,6 +378,16 @@ func (idx *NonUnique) getAuthenticatedContext(ctx context.Context) (context.Cont
return ctx, nil
}
// Delete deletes the index folder from its storage.
func (idx *NonUnique) Delete() error {
ctx, err := idx.getAuthenticatedContext(context.Background())
if err != nil {
return err
}
return deleteIndexRoot(ctx, idx.storageProvider, idx.indexRootDir)
}
func (idx *NonUnique) authenticate(ctx context.Context) (token string, err error) {
return storage.AuthenticateCS3(ctx, idx.cs3conf.ServiceUser, idx.tokenManager)
}

View File

@@ -344,3 +344,22 @@ func (idx *Unique) makeDirIfNotExists(ctx context.Context, folder string) error
func (idx *Unique) authenticate(ctx context.Context) (token string, err error) {
return storage.AuthenticateCS3(ctx, idx.cs3conf.ServiceUser, idx.tokenManager)
}
func (idx *Unique) getAuthenticatedContext(ctx context.Context) (context.Context, error) {
t, err := idx.authenticate(ctx)
if err != nil {
return nil, err
}
ctx = metadata.AppendToOutgoingContext(ctx, token.TokenHeader, t)
return ctx, nil
}
// Delete deletes the index folder from its storage.
func (idx *Unique) Delete() error {
ctx, err := idx.getAuthenticatedContext(context.Background())
if err != nil {
return err
}
return deleteIndexRoot(ctx, idx.storageProvider, idx.indexRootDir)
}

View File

@@ -5,8 +5,6 @@ import (
"os"
"path"
"path/filepath"
"reflect"
"sort"
"strconv"
"strings"
@@ -19,31 +17,26 @@ import (
// Autoincrement are fields for an index of type autoincrement.
type Autoincrement struct {
indexBy string
typeName string
filesDir string
indexBaseDir string
indexRootDir string
indexBy string
typeName string
filesDir string
indexBaseDir string
indexRootDir string
bound *option.Bound
}
// - Creating an autoincrement index has to be thread safe.
// - Validation: autoincrement indexes should only work on integers.
func init() {
registry.IndexConstructorRegistry["disk"]["autoincrement"] = NewAutoincrementIndex
}
// NewAutoincrementIndex instantiates a new AutoincrementIndex instance. Init() should be
// called afterward to ensure correct on-disk structure.
// NewAutoincrementIndex instantiates a new AutoincrementIndex instance. Init() MUST be called upon instantiation.
func NewAutoincrementIndex(o ...option.Option) index.Index {
opts := &option.Options{}
for _, opt := range o {
opt(opts)
}
// validate the field
if opts.Entity == nil {
panic("invalid autoincrement index: configured without entity")
}
@@ -54,25 +47,15 @@ func NewAutoincrementIndex(o ...option.Option) index.Index {
}
return &Autoincrement{
indexBy: opts.IndexBy,
typeName: opts.TypeName,
filesDir: opts.FilesDir,
bound: opts.Bound,
indexBaseDir: path.Join(opts.DataDir, "index.disk"),
indexRootDir: path.Join(path.Join(opts.DataDir, "index.disk"), strings.Join([]string{"autoincrement", opts.TypeName, opts.IndexBy}, ".")),
indexBy: opts.IndexBy,
typeName: opts.TypeName,
filesDir: opts.FilesDir,
bound: opts.Bound,
indexBaseDir: path.Join(opts.DataDir, "index.disk"),
indexRootDir: path.Join(path.Join(opts.DataDir, "index.disk"), strings.Join([]string{"autoincrement", opts.TypeName, opts.IndexBy}, ".")),
}
}
var (
validKinds = []reflect.Kind{
reflect.Int,
reflect.Int8,
reflect.Int16,
reflect.Int32,
reflect.Int64,
}
)
// Init initializes an autoincrement index.
func (idx *Autoincrement) Init() error {
if _, err := os.Stat(idx.filesDir); err != nil {
@@ -207,38 +190,6 @@ func (idx *Autoincrement) FilesDir() string {
return idx.filesDir
}
func isValidKind(k reflect.Kind) bool {
for _, v := range validKinds {
if k == v {
return true
}
}
return false
}
func getKind(i interface{}, field string) (reflect.Kind, error) {
r := reflect.ValueOf(i)
return reflect.Indirect(r).FieldByName(field).Kind(), nil
}
func readDir(dirname string) ([]os.FileInfo, error) {
f, err := os.Open(dirname)
if err != nil {
return nil, err
}
list, err := f.Readdir(-1)
f.Close()
if err != nil {
return nil, err
}
sort.Slice(list, func(i, j int) bool {
a, _ := strconv.Atoi(list[i].Name())
b, _ := strconv.Atoi(list[j].Name())
return a < b
})
return list, nil
}
func (idx *Autoincrement) next() (int, error) {
files, err := readDir(idx.indexRootDir)
if err != nil {
@@ -249,7 +200,7 @@ func (idx *Autoincrement) next() (int, error) {
return int(idx.bound.Lower), nil
}
latest, err := strconv.Atoi(path.Base(files[len(files)-1].Name()))
latest, err := lastValueFromTree(files)
if err != nil {
return -1, err
}
@@ -260,3 +211,16 @@ func (idx *Autoincrement) next() (int, error) {
return latest + 1, nil
}
// Delete deletes the index root folder from the configured storage.
func (idx *Autoincrement) Delete() error {
return os.RemoveAll(idx.indexRootDir)
}
func lastValueFromTree(files []os.FileInfo) (int, error) {
latest, err := strconv.Atoi(path.Base(files[len(files)-1].Name()))
if err != nil {
return -1, err
}
return latest, nil
}

View File

@@ -0,0 +1,52 @@
package disk
import (
"os"
"reflect"
"sort"
"strconv"
)
var (
validKinds = []reflect.Kind{
reflect.Int,
reflect.Int8,
reflect.Int16,
reflect.Int32,
reflect.Int64,
}
)
// verifies an autoincrement field kind on the target struct.
func isValidKind(k reflect.Kind) bool {
for _, v := range validKinds {
if k == v {
return true
}
}
return false
}
func getKind(i interface{}, field string) (reflect.Kind, error) {
r := reflect.ValueOf(i)
return reflect.Indirect(r).FieldByName(field).Kind(), nil
}
// readDir is an implementation of os.ReadDir but with different sorting.
func readDir(dirname string) ([]os.FileInfo, error) {
f, err := os.Open(dirname)
if err != nil {
return nil, err
}
list, err := f.Readdir(-1)
f.Close()
if err != nil {
return nil, err
}
sort.Slice(list, func(i, j int) bool {
a, _ := strconv.Atoi(list[i].Name())
b, _ := strconv.Atoi(list[j].Name())
return a < b
})
return list, nil
}

View File

@@ -233,3 +233,8 @@ func (idx *NonUnique) TypeName() string {
func (idx *NonUnique) FilesDir() string {
return idx.filesDir
}
// Delete deletes the index folder from its storage.
func (idx *NonUnique) Delete() error {
return os.RemoveAll(idx.indexRootDir)
}

View File

@@ -220,3 +220,8 @@ func isValidSymlink(path string) (err error) {
return
}
// Delete deletes the index folder from its storage.
func (idx *Unique) Delete() error {
return os.RemoveAll(idx.indexRootDir)
}

View File

@@ -13,4 +13,5 @@ type Index interface {
IndexBy() string
TypeName() string
FilesDir() string
Delete() error // Delete deletes the index folder from its storage.
}

View File

@@ -42,6 +42,23 @@ func getRegistryStrategy(cfg *config.Config) string {
return "cs3"
}
// Reset takes care of deleting all indices from storage and from the internal map of indices
func (i Indexer) Reset() error {
for j := range i.indices {
for _, indices := range i.indices[j].IndicesByField {
for _, idx := range indices {
err := idx.Delete()
if err != nil {
return err
}
}
}
delete(i.indices, j)
}
return nil
}
// AddIndex adds a new index to the indexer receiver.
func (i Indexer) AddIndex(t interface{}, indexBy, pkName, entityDirName, indexType string, bound *option.Bound, caseInsensitive bool) error {
strategy := getRegistryStrategy(i.config)

View File

@@ -14,37 +14,37 @@ import (
"github.com/stretchr/testify/assert"
)
const cs3RootFolder = "/var/tmp/ocis/storage/users/data"
func TestIndexer_CS3_AddWithUniqueIndex(t *testing.T) {
dataDir, err := WriteIndexTestData(Data, "ID", cs3RootFolder)
assert.NoError(t, err)
indexer := createCs3Indexer()
err = indexer.AddIndex(&User{}, "UserName", "ID", "users", "unique", nil, false)
assert.NoError(t, err)
u := &User{ID: "abcdefg-123", UserName: "mikey", Email: "mikey@example.com"}
_, err = indexer.Add(u)
assert.NoError(t, err)
_ = os.RemoveAll(dataDir)
}
func TestIndexer_CS3_AddWithNonUniqueIndex(t *testing.T) {
dataDir, err := WriteIndexTestData(Data, "ID", cs3RootFolder)
assert.NoError(t, err)
indexer := createCs3Indexer()
err = indexer.AddIndex(&User{}, "UserName", "ID", "users", "non_unique", nil, false)
assert.NoError(t, err)
u := &User{ID: "abcdefg-123", UserName: "mikey", Email: "mikey@example.com"}
_, err = indexer.Add(u)
assert.NoError(t, err)
_ = os.RemoveAll(dataDir)
}
//const cs3RootFolder = "/var/tmp/ocis/storage/users/data"
//
//func TestIndexer_CS3_AddWithUniqueIndex(t *testing.T) {
// dataDir, err := WriteIndexTestData(Data, "ID", cs3RootFolder)
// assert.NoError(t, err)
// indexer := createCs3Indexer()
//
// err = indexer.AddIndex(&User{}, "UserName", "ID", "users", "unique", nil, false)
// assert.NoError(t, err)
//
// u := &User{ID: "abcdefg-123", UserName: "mikey", Email: "mikey@example.com"}
// _, err = indexer.Add(u)
// assert.NoError(t, err)
//
// _ = os.RemoveAll(dataDir)
//}
//
//func TestIndexer_CS3_AddWithNonUniqueIndex(t *testing.T) {
// dataDir, err := WriteIndexTestData(Data, "ID", cs3RootFolder)
// assert.NoError(t, err)
// indexer := createCs3Indexer()
//
// err = indexer.AddIndex(&User{}, "UserName", "ID", "users", "non_unique", nil, false)
// assert.NoError(t, err)
//
// u := &User{ID: "abcdefg-123", UserName: "mikey", Email: "mikey@example.com"}
// _, err = indexer.Add(u)
// assert.NoError(t, err)
//
// _ = os.RemoveAll(dataDir)
//}
func TestIndexer_Disk_FindByWithUniqueIndex(t *testing.T) {
dataDir, err := WriteIndexTestData(Data, "ID", "")
@@ -251,18 +251,18 @@ func TestIndexer_Disk_UpdateWithNonUniqueIndex(t *testing.T) {
_ = os.RemoveAll(dataDir)
}
func createCs3Indexer() *Indexer {
return CreateIndexer(&config.Config{
Repo: config.Repo{
CS3: config.CS3{
ProviderAddr: "0.0.0.0:9215",
DataURL: "http://localhost:9216",
DataPrefix: "data",
JWTSecret: "Pive-Fumkiu4",
},
},
})
}
//func createCs3Indexer() *Indexer {
// return CreateIndexer(&config.Config{
// Repo: config.Repo{
// CS3: config.CS3{
// ProviderAddr: "0.0.0.0:9215",
// DataURL: "http://localhost:9216",
// DataPrefix: "data",
// JWTSecret: "Pive-Fumkiu4",
// },
// },
// })
//}
func createDiskIndexer(dataDir string) *Indexer {
return CreateIndexer(&config.Config{

File diff suppressed because it is too large Load Diff

View File

@@ -551,3 +551,77 @@ func (h *groupsServiceHandler) RemoveMember(ctx context.Context, in *RemoveMembe
func (h *groupsServiceHandler) ListMembers(ctx context.Context, in *ListMembersRequest, out *ListMembersResponse) error {
return h.GroupsServiceHandler.ListMembers(ctx, in, out)
}
// Api Endpoints for IndexService service
func NewIndexServiceEndpoints() []*api.Endpoint {
return []*api.Endpoint{
&api.Endpoint{
Name: "IndexService.RebuildIndex",
Path: []string{"/api/v0/index/rebuild"},
Method: []string{"POST"},
Body: "*",
Handler: "rpc",
},
}
}
// Client API for IndexService service
type IndexService interface {
RebuildIndex(ctx context.Context, in *RebuildIndexRequest, opts ...client.CallOption) (*RebuildIndexResponse, error)
}
type indexService struct {
c client.Client
name string
}
func NewIndexService(name string, c client.Client) IndexService {
return &indexService{
c: c,
name: name,
}
}
func (c *indexService) RebuildIndex(ctx context.Context, in *RebuildIndexRequest, opts ...client.CallOption) (*RebuildIndexResponse, error) {
req := c.c.NewRequest(c.name, "IndexService.RebuildIndex", in)
out := new(RebuildIndexResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for IndexService service
type IndexServiceHandler interface {
RebuildIndex(context.Context, *RebuildIndexRequest, *RebuildIndexResponse) error
}
func RegisterIndexServiceHandler(s server.Server, hdlr IndexServiceHandler, opts ...server.HandlerOption) error {
type indexService interface {
RebuildIndex(ctx context.Context, in *RebuildIndexRequest, out *RebuildIndexResponse) error
}
type IndexService struct {
indexService
}
h := &indexServiceHandler{hdlr}
opts = append(opts, api.WithEndpoint(&api.Endpoint{
Name: "IndexService.RebuildIndex",
Path: []string{"/api/v0/index/rebuild"},
Method: []string{"POST"},
Body: "*",
Handler: "rpc",
}))
return s.Handle(s.NewHandler(&IndexService{h}, opts...))
}
type indexServiceHandler struct {
IndexServiceHandler
}
func (h *indexServiceHandler) RebuildIndex(ctx context.Context, in *RebuildIndexRequest, out *RebuildIndexResponse) error {
return h.IndexServiceHandler.RebuildIndex(ctx, in, out)
}

View File

@@ -1208,19 +1208,23 @@ func TestListMembers(t *testing.T) {
}
func TestListMembersEmptyGroup(t *testing.T) {
group := &proto.Group{Id: "5d58e5ec-842e-498b-8800-61f2ec6f911c", GidNumber: 30002, OnPremisesSamAccountName: "quantum-group", DisplayName: "Quantum Group", Members: []*proto.Account{}}
createGroup(t, group)
group := &proto.Group{Id: "5d58e5ec-842e-498b-8800-61f2ec6f911c", GidNumber: 60000, OnPremisesSamAccountName: "quantum-group", DisplayName: "Quantum Group", Members: []*proto.Account{}}
client := service.Client()
cl := proto.NewGroupsService("com.owncloud.api.accounts", client)
request := &proto.CreateGroupRequest{Group: group}
_, err := cl.CreateGroup(context.Background(), request)
if err == nil {
newCreatedGroups = append(newCreatedGroups, group.Id)
}
req := &proto.ListMembersRequest{Id: group.Id}
res, err := cl.ListMembers(context.Background(), req)
listRes, err := cl.ListMembers(context.Background(), req)
assert.NoError(t, err)
assert.Empty(t, res.Members)
assert.Empty(t, listRes.Members)
cleanUp(t)
}

View File

@@ -372,6 +372,120 @@ func RegisterGroupsServiceWeb(r chi.Router, i GroupsServiceHandler, middlewares
r.MethodFunc("POST", "/api/v0/groups/{id=*}/members/$ref", handler.ListMembers)
}
type webIndexServiceHandler struct {
r chi.Router
h IndexServiceHandler
}
func (h *webIndexServiceHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.r.ServeHTTP(w, r)
}
func (h *webIndexServiceHandler) RebuildIndex(w http.ResponseWriter, r *http.Request) {
req := &RebuildIndexRequest{}
resp := &RebuildIndexResponse{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusPreconditionFailed)
return
}
if err := h.h.RebuildIndex(
r.Context(),
req,
resp,
); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
render.Status(r, http.StatusCreated)
render.JSON(w, r, resp)
}
func RegisterIndexServiceWeb(r chi.Router, i IndexServiceHandler, middlewares ...func(http.Handler) http.Handler) {
handler := &webIndexServiceHandler{
r: r,
h: i,
}
r.MethodFunc("POST", "/api/v0/index/rebuild", handler.RebuildIndex)
}
// RebuildIndexRequestJSONMarshaler describes the default jsonpb.Marshaler used by all
// instances of RebuildIndexRequest. This struct is safe to replace or modify but
// should not be done so concurrently.
var RebuildIndexRequestJSONMarshaler = new(jsonpb.Marshaler)
// MarshalJSON satisfies the encoding/json Marshaler interface. This method
// uses the more correct jsonpb package to correctly marshal the message.
func (m *RebuildIndexRequest) MarshalJSON() ([]byte, error) {
if m == nil {
return json.Marshal(nil)
}
buf := &bytes.Buffer{}
if err := RebuildIndexRequestJSONMarshaler.Marshal(buf, m); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
var _ json.Marshaler = (*RebuildIndexRequest)(nil)
// RebuildIndexRequestJSONUnmarshaler describes the default jsonpb.Unmarshaler used by all
// instances of RebuildIndexRequest. This struct is safe to replace or modify but
// should not be done so concurrently.
var RebuildIndexRequestJSONUnmarshaler = new(jsonpb.Unmarshaler)
// UnmarshalJSON satisfies the encoding/json Unmarshaler interface. This method
// uses the more correct jsonpb package to correctly unmarshal the message.
func (m *RebuildIndexRequest) UnmarshalJSON(b []byte) error {
return RebuildIndexRequestJSONUnmarshaler.Unmarshal(bytes.NewReader(b), m)
}
var _ json.Unmarshaler = (*RebuildIndexRequest)(nil)
// RebuildIndexResponseJSONMarshaler describes the default jsonpb.Marshaler used by all
// instances of RebuildIndexResponse. This struct is safe to replace or modify but
// should not be done so concurrently.
var RebuildIndexResponseJSONMarshaler = new(jsonpb.Marshaler)
// MarshalJSON satisfies the encoding/json Marshaler interface. This method
// uses the more correct jsonpb package to correctly marshal the message.
func (m *RebuildIndexResponse) MarshalJSON() ([]byte, error) {
if m == nil {
return json.Marshal(nil)
}
buf := &bytes.Buffer{}
if err := RebuildIndexResponseJSONMarshaler.Marshal(buf, m); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
var _ json.Marshaler = (*RebuildIndexResponse)(nil)
// RebuildIndexResponseJSONUnmarshaler describes the default jsonpb.Unmarshaler used by all
// instances of RebuildIndexResponse. This struct is safe to replace or modify but
// should not be done so concurrently.
var RebuildIndexResponseJSONUnmarshaler = new(jsonpb.Unmarshaler)
// UnmarshalJSON satisfies the encoding/json Unmarshaler interface. This method
// uses the more correct jsonpb package to correctly unmarshal the message.
func (m *RebuildIndexResponse) UnmarshalJSON(b []byte) error {
return RebuildIndexResponseJSONUnmarshaler.Unmarshal(bytes.NewReader(b), m)
}
var _ json.Unmarshaler = (*RebuildIndexResponse)(nil)
// ListAccountsRequestJSONMarshaler describes the default jsonpb.Marshaler used by all
// instances of ListAccountsRequest. This struct is safe to replace or modify but
// should not be done so concurrently.

View File

@@ -136,6 +136,23 @@ service GroupsService {
}
service IndexService {
rpc RebuildIndex(RebuildIndexRequest) returns (RebuildIndexResponse) {
// All request parameters go into body.
option (google.api.http) = {
// URLs are broken
post: "/api/v0/index/rebuild"
body: "*"
};
}
}
message RebuildIndexRequest {
}
message RebuildIndexResponse {
}
message ListAccountsRequest {
// Optional. The maximum number of accounts to return in the response
int32 page_size = 1 [(google.api.field_behavior) = OPTIONAL];

File diff suppressed because one or more lines are too long

View File

@@ -26,6 +26,9 @@ func Server(opts ...Option) grpc.Service {
if err := proto.RegisterGroupsServiceHandler(service.Server(), handler); err != nil {
options.Logger.Fatal().Err(err).Msg("could not register groups handler")
}
if err := proto.RegisterIndexServiceHandler(service.Server(), handler); err != nil {
options.Logger.Fatal().Err(err).Msg("could not register index handler")
}
service.Init()
return service

View File

@@ -146,6 +146,26 @@ func (s Service) ListAccounts(ctx context.Context, in *proto.ListAccountsRequest
return nil
}
if in.Query == "" {
err = s.repo.LoadAccounts(ctx, &out.Accounts)
if err != nil {
s.log.Err(err).Msg("failed to load all accounts from storage")
return merrors.InternalServerError(s.id, "failed to load all accounts")
}
for i := range out.Accounts {
a := out.Accounts[i]
// TODO add groups only if requested
// if in.FieldMask ...
s.expandMemberOf(a)
if a.PasswordProfile != nil {
a.PasswordProfile.Password = ""
}
}
return nil
}
searchResults, err := s.findAccountsByQuery(ctx, in.Query)
out.Accounts = make([]*proto.Account, 0, len(searchResults))
@@ -180,10 +200,6 @@ func (s Service) findAccountsByQuery(ctx context.Context, query string) ([]strin
var searchResults []string
var err error
if query == "" {
return s.index.FindByPartial(&proto.Account{}, "Mail", "*")
}
// TODO: more explicit queries have to be on top
var onPremOrMailQuery = regexp.MustCompile(`^on_premises_sam_account_name eq '(.*)' or mail eq '(.*)'$`)
match := onPremOrMailQuery.FindStringSubmatch(query)

View File

@@ -0,0 +1,100 @@
package service
import (
"context"
"fmt"
"github.com/owncloud/ocis/accounts/pkg/storage"
"github.com/owncloud/ocis/accounts/pkg/config"
"github.com/owncloud/ocis/accounts/pkg/indexer"
"github.com/owncloud/ocis/accounts/pkg/indexer/option"
"github.com/owncloud/ocis/accounts/pkg/proto/v0"
)
// RebuildIndex deletes all indices (in memory and on storage) and rebuilds them from scratch.
func (s Service) RebuildIndex(ctx context.Context, request *proto.RebuildIndexRequest, response *proto.RebuildIndexResponse) error {
if err := s.index.Reset(); err != nil {
return fmt.Errorf("failed to delete index containers: %w", err)
}
if err := recreateContainers(s.index, s.Config); err != nil {
return fmt.Errorf("failed to recreate index containers: %w", err)
}
if err := reindexDocuments(ctx, s.repo, s.index); err != nil {
return fmt.Errorf("failed to reindex documents: %w", err)
}
return nil
}
// recreateContainers adds all indices to the indexer that we have for this service.
func recreateContainers(idx *indexer.Indexer, cfg *config.Config) error {
// Accounts
if err := idx.AddIndex(&proto.Account{}, "DisplayName", "Id", "accounts", "non_unique", nil, true); err != nil {
return err
}
if err := idx.AddIndex(&proto.Account{}, "Mail", "Id", "accounts", "unique", nil, true); err != nil {
return err
}
if err := idx.AddIndex(&proto.Account{}, "OnPremisesSamAccountName", "Id", "accounts", "unique", nil, true); err != nil {
return err
}
if err := idx.AddIndex(&proto.Account{}, "PreferredName", "Id", "accounts", "unique", nil, true); err != nil {
return err
}
if err := idx.AddIndex(&proto.Account{}, "UidNumber", "Id", "accounts", "autoincrement", &option.Bound{
Lower: cfg.Index.UID.Lower,
Upper: cfg.Index.UID.Upper,
}, false); err != nil {
return err
}
// Groups
if err := idx.AddIndex(&proto.Group{}, "OnPremisesSamAccountName", "Id", "groups", "unique", nil, true); err != nil {
return err
}
if err := idx.AddIndex(&proto.Group{}, "DisplayName", "Id", "groups", "non_unique", nil, true); err != nil {
return err
}
if err := idx.AddIndex(&proto.Group{}, "GidNumber", "Id", "groups", "autoincrement", &option.Bound{
Lower: cfg.Index.GID.Lower,
Upper: cfg.Index.GID.Upper,
}, false); err != nil {
return err
}
return nil
}
// reindexDocuments loads all existing documents and adds them to the index.
func reindexDocuments(ctx context.Context, repo storage.Repo, index *indexer.Indexer) error {
accounts := make([]*proto.Account, 0)
if err := repo.LoadAccounts(ctx, &accounts); err != nil {
return err
}
for i := range accounts {
_, err := index.Add(accounts[i])
if err != nil {
return err
}
}
groups := make([]*proto.Group, 0)
if err := repo.LoadGroups(ctx, &groups); err != nil {
return err
}
for i := range groups {
_, err := index.Add(groups[i])
if err != nil {
return err
}
}
return nil
}

View File

@@ -9,8 +9,6 @@ import (
"strings"
"time"
"github.com/owncloud/ocis/accounts/pkg/indexer/option"
"github.com/owncloud/ocis/accounts/pkg/indexer"
"github.com/owncloud/ocis/accounts/pkg/storage"
@@ -77,45 +75,9 @@ func New(opts ...Option) (s *Service, err error) {
func (s Service) buildIndex() (*indexer.Indexer, error) {
idx := indexer.CreateIndexer(s.Config)
// Accounts
if err := idx.AddIndex(&proto.Account{}, "DisplayName", "Id", "accounts", "non_unique", nil, true); err != nil {
if err := recreateContainers(idx, s.Config); err != nil {
return nil, err
}
if err := idx.AddIndex(&proto.Account{}, "Mail", "Id", "accounts", "unique", nil, true); err != nil {
return nil, err
}
if err := idx.AddIndex(&proto.Account{}, "OnPremisesSamAccountName", "Id", "accounts", "unique", nil, true); err != nil {
return nil, err
}
if err := idx.AddIndex(&proto.Account{}, "PreferredName", "Id", "accounts", "unique", nil, true); err != nil {
return nil, err
}
if err := idx.AddIndex(&proto.Account{}, "UidNumber", "Id", "accounts", "autoincrement", &option.Bound{
Lower: s.Config.Index.UID.Lower,
Upper: s.Config.Index.UID.Upper,
}, false); err != nil {
return nil, err
}
// Groups
if err := idx.AddIndex(&proto.Group{}, "OnPremisesSamAccountName", "Id", "groups", "unique", nil, true); err != nil {
return nil, err
}
if err := idx.AddIndex(&proto.Group{}, "DisplayName", "Id", "groups", "non_unique", nil, true); err != nil {
return nil, err
}
if err := idx.AddIndex(&proto.Group{}, "GidNumber", "Id", "groups", "autoincrement", &option.Bound{
Lower: s.Config.Index.GID.Lower,
Upper: s.Config.Index.GID.Upper,
}, false); err != nil {
return nil, err
}
return idx, nil
}

View File

@@ -8,6 +8,7 @@ import (
"io/ioutil"
"net/http"
"path"
"path/filepath"
"strconv"
"strings"
@@ -20,6 +21,7 @@ import (
"github.com/cs3org/reva/pkg/token/manager/jwt"
"github.com/owncloud/ocis/accounts/pkg/config"
"github.com/owncloud/ocis/accounts/pkg/proto/v0"
olog "github.com/owncloud/ocis/ocis-pkg/log"
"google.golang.org/grpc/metadata"
)
@@ -92,6 +94,40 @@ func (r CS3Repo) LoadAccount(ctx context.Context, id string, a *proto.Account) (
return err
}
return r.loadAccount(id, t, a)
}
// LoadAccounts loads all the accounts from the cs3 api
func (r CS3Repo) LoadAccounts(ctx context.Context, a *[]*proto.Account) (err error) {
t, err := r.authenticate(ctx)
if err != nil {
return err
}
ctx = metadata.AppendToOutgoingContext(ctx, token.TokenHeader, t)
res, err := r.storageProvider.ListContainer(ctx, &provider.ListContainerRequest{
Ref: &provider.Reference{
Spec: &provider.Reference_Path{Path: path.Join("/meta", accountsFolder)},
},
})
if err != nil {
return err
}
log := olog.NewLogger(olog.Pretty(r.cfg.Log.Pretty), olog.Color(r.cfg.Log.Color), olog.Level(r.cfg.Log.Level))
for i := range res.Infos {
acc := &proto.Account{}
err := r.loadAccount(filepath.Base(res.Infos[i].Path), t, acc)
if err != nil {
log.Err(err).Msg("could not load account")
continue
}
*a = append(*a, acc)
}
return nil
}
func (r CS3Repo) loadAccount(id string, t string, a *proto.Account) error {
resp, err := r.dataProvider.get(r.accountURL(id), t)
if err != nil {
return err
@@ -172,6 +208,40 @@ func (r CS3Repo) LoadGroup(ctx context.Context, id string, g *proto.Group) (err
return err
}
return r.loadGroup(id, t, g)
}
// LoadGroups loads all the groups from the cs3 api
func (r CS3Repo) LoadGroups(ctx context.Context, g *[]*proto.Group) (err error) {
t, err := r.authenticate(ctx)
if err != nil {
return err
}
ctx = metadata.AppendToOutgoingContext(ctx, token.TokenHeader, t)
res, err := r.storageProvider.ListContainer(ctx, &provider.ListContainerRequest{
Ref: &provider.Reference{
Spec: &provider.Reference_Path{Path: path.Join("/meta", groupsFolder)},
},
})
if err != nil {
return err
}
log := olog.NewLogger(olog.Pretty(r.cfg.Log.Pretty), olog.Color(r.cfg.Log.Color), olog.Level(r.cfg.Log.Level))
for i := range res.Infos {
grp := &proto.Group{}
err := r.loadGroup(filepath.Base(res.Infos[i].Path), t, grp)
if err != nil {
log.Err(err).Msg("could not load account")
continue
}
*g = append(*g, grp)
}
return nil
}
func (r CS3Repo) loadGroup(id string, t string, g *proto.Group) error {
resp, err := r.dataProvider.get(r.groupURL(id), t)
if err != nil {
return err

View File

@@ -17,8 +17,8 @@ var groupLock sync.Mutex
// DiskRepo provides a local filesystem implementation of the Repo interface
type DiskRepo struct {
cfg *config.Config
log olog.Logger
cfg *config.Config
log olog.Logger
}
// NewDiskRepo creates a new disk repo
@@ -37,8 +37,8 @@ func NewDiskRepo(cfg *config.Config, log olog.Logger) DiskRepo {
}
}
return DiskRepo{
cfg: cfg,
log: log,
cfg: cfg,
log: log,
}
}
@@ -70,6 +70,20 @@ func (r DiskRepo) LoadAccount(ctx context.Context, id string, a *proto.Account)
return json.Unmarshal(data, a)
}
// LoadAccounts loads all the accounts from the local filesystem
func (r DiskRepo) LoadAccounts(ctx context.Context, a *[]*proto.Account) (err error) {
root := filepath.Join(r.cfg.Repo.Disk.Path, accountsFolder)
return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
acc := &proto.Account{}
if e := r.LoadAccount(ctx, filepath.Base(path), acc); e != nil {
r.log.Err(e).Msg("could not load account")
return nil
}
*a = append(*a, acc)
return nil
})
}
// DeleteAccount from the local filesystem
func (r DiskRepo) DeleteAccount(ctx context.Context, id string) (err error) {
path := filepath.Join(r.cfg.Repo.Disk.Path, accountsFolder, id)
@@ -118,6 +132,20 @@ func (r DiskRepo) LoadGroup(ctx context.Context, id string, g *proto.Group) (err
return json.Unmarshal(data, g)
}
// LoadGroups loads all the groups from the local filesystem
func (r DiskRepo) LoadGroups(ctx context.Context, g *[]*proto.Group) (err error) {
root := filepath.Join(r.cfg.Repo.Disk.Path, groupsFolder)
return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
grp := &proto.Group{}
if e := r.LoadGroup(ctx, filepath.Base(path), grp); e != nil {
r.log.Err(e).Msg("could not load group")
return nil
}
*g = append(*g, grp)
return nil
})
}
// DeleteGroup from the local filesystem
func (r DiskRepo) DeleteGroup(ctx context.Context, id string) (err error) {
path := filepath.Join(r.cfg.Repo.Disk.Path, groupsFolder, id)
@@ -128,9 +156,6 @@ func (r DiskRepo) DeleteGroup(ctx context.Context, id string) (err error) {
}
return
//r.log.Error().Err(err).Str("id", id).Str("path", path).Msg("could not remove group")
//return merrors.InternalServerError(r.serviceID, "could not remove group: %v", err.Error())
}
// deflateMemberOf replaces the groups of a user with an instance that only contains the id

View File

@@ -2,20 +2,23 @@ package storage
import (
"context"
"github.com/owncloud/ocis/accounts/pkg/proto/v0"
)
const (
accountsFolder = "accounts"
groupsFolder = "groups"
groupsFolder = "groups"
)
// Repo defines the storage operations
type Repo interface {
WriteAccount(ctx context.Context, a *proto.Account) (err error)
LoadAccount(ctx context.Context, id string, a *proto.Account) (err error)
LoadAccounts(ctx context.Context, a *[]*proto.Account) (err error)
DeleteAccount(ctx context.Context, id string) (err error)
WriteGroup(ctx context.Context, g *proto.Group) (err error)
LoadGroup(ctx context.Context, id string, g *proto.Group) (err error)
LoadGroups(ctx context.Context, g *[]*proto.Group) (err error)
DeleteGroup(ctx context.Context, id string) (err error)
}

View File

@@ -0,0 +1,9 @@
Change: Rebuild index command for accounts
Tags: accounts
The index for the accounts service can now be rebuilt by running the cli command `./bin/ocis accounts rebuild`.
It deletes all configured indices and rebuilds them from the documents found on storage. For this we also introduced
a `LoadAccounts` and `LoadGroups` function on storage for loading all existing documents.
https://github.com/owncloud/ocis/pull/748