WIP: make the accounts service use the upload workflow

This commit is contained in:
Willy Kloucek
2021-12-02 18:52:49 +01:00
parent c067da5496
commit 3fd070061d
12 changed files with 307 additions and 342 deletions
-4
View File
@@ -89,8 +89,6 @@ type Disk struct {
// CS3 is the cs3 implementation of the storage.
type CS3 struct {
ProviderAddr string `ocisConfig:"provider_addr"`
DataURL string `ocisConfig:"data_url"`
DataPrefix string `ocisConfig:"data_prefix"`
JWTSecret string `ocisConfig:"jwt_secret"`
}
@@ -185,8 +183,6 @@ func DefaultConfig() *Config {
},
CS3: CS3{
ProviderAddr: "localhost:9215",
DataURL: "http://localhost:9216",
DataPrefix: "data",
JWTSecret: "Pive-Fumkiu4",
},
},
-8
View File
@@ -100,14 +100,6 @@ func structMappings(cfg *Config) []shared.EnvBinding {
EnvVars: []string{"ACCOUNTS_STORAGE_CS3_PROVIDER_ADDR"},
Destination: &cfg.Repo.CS3.ProviderAddr,
},
{
EnvVars: []string{"ACCOUNTS_STORAGE_CS3_DATA_URL"},
Destination: &cfg.Repo.CS3.DataURL,
},
{
EnvVars: []string{"ACCOUNTS_STORAGE_CS3_DATA_PREFIX"},
Destination: &cfg.Repo.CS3.DataPrefix,
},
{
EnvVars: []string{"OCIS_JWT_SECRET", "ACCOUNTS_STORAGE_CS3_JWT_SECRET"},
Destination: &cfg.Repo.CS3.JWTSecret,
-2
View File
@@ -138,8 +138,6 @@ func configFromSvc(cfg *config.Config) (*idxcfg.Config, error) {
Backend: cfg.Repo.Backend,
CS3: idxcfg.CS3{
ProviderAddr: cfg.Repo.CS3.ProviderAddr,
DataURL: cfg.Repo.CS3.DataURL,
DataPrefix: cfg.Repo.CS3.DataPrefix,
JWTSecret: cfg.Repo.CS3.JWTSecret,
},
}
+114 -92
View File
@@ -4,12 +4,11 @@ import (
"bytes"
"context"
"encoding/json"
"io"
"errors"
"io/ioutil"
"net/http"
"path"
"path/filepath"
"strings"
"github.com/cs3org/reva/pkg/auth/scope"
@@ -28,10 +27,10 @@ import (
// CS3Repo provides a cs3 implementation of the Repo interface
type CS3Repo struct {
cfg *config.Config
tm token.Manager
storageProvider provider.ProviderAPIClient
dataProvider dataProviderClient // Used to create and download data via http, bypassing reva upload protocol
cfg *config.Config
tm token.Manager
storageProvider provider.ProviderAPIClient
dataGatewayClient *http.Client
}
// NewCS3Repo creates a new cs3 repo
@@ -50,14 +49,10 @@ func NewCS3Repo(cfg *config.Config) (Repo, error) {
}
return CS3Repo{
cfg: cfg,
tm: tokenManager,
storageProvider: client,
dataProvider: dataProviderClient{
client: http.Client{
Transport: http.DefaultTransport,
},
},
cfg: cfg,
tm: tokenManager,
storageProvider: client,
dataGatewayClient: http.DefaultClient,
}, nil
}
@@ -78,14 +73,9 @@ func (r CS3Repo) WriteAccount(ctx context.Context, a *proto.Account) (err error)
return err
}
resp, err := r.dataProvider.put(r.accountURL(a.Id), bytes.NewReader(by), t)
if err != nil {
return err
}
if err = resp.Body.Close(); err != nil {
return err
}
return nil
err = r.uploadHelper(ctx, r.accountURL(a.Id), by)
return err
}
// LoadAccount loads an account via cs3 by id and writes it to the provided account
@@ -94,8 +84,9 @@ func (r CS3Repo) LoadAccount(ctx context.Context, id string, a *proto.Account) (
if err != nil {
return err
}
ctx = metadata.AppendToOutgoingContext(ctx, revactx.TokenHeader, t)
return r.loadAccount(id, t, a)
return r.loadAccount(ctx, id, a)
}
// LoadAccounts loads all the accounts from the cs3 api
@@ -118,7 +109,7 @@ func (r CS3Repo) LoadAccounts(ctx context.Context, a *[]*proto.Account) (err err
log := olog.NewLogger(olog.Pretty(r.cfg.Log.Pretty), olog.Color(r.cfg.Log.Color), olog.Level(r.cfg.Log.Level))
for i := range res.Infos {
acc := &proto.Account{}
err := r.loadAccount(filepath.Base(res.Infos[i].Path), t, acc)
err := r.loadAccount(ctx, filepath.Base(res.Infos[i].Path), acc)
if err != nil {
log.Err(err).Msg("could not load account")
continue
@@ -128,24 +119,16 @@ func (r CS3Repo) LoadAccounts(ctx context.Context, a *[]*proto.Account) (err err
return nil
}
func (r CS3Repo) loadAccount(id string, t string, a *proto.Account) error {
resp, err := r.dataProvider.get(r.accountURL(id), t)
func (r CS3Repo) loadAccount(ctx context.Context, id string, a *proto.Account) error {
account, err := r.downloadHelper(ctx, r.accountURL(id))
if err != nil {
switch err.(type) {
case notFoundErr:
return notFoundErr{"account", id}
}
return err
}
if resp.StatusCode != http.StatusOK {
return &notFoundErr{"account", id}
}
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if err = resp.Body.Close(); err != nil {
return err
}
return json.Unmarshal(b, &a)
return json.Unmarshal(account, &a)
}
// DeleteAccount deletes an account via cs3 by id
@@ -192,14 +175,8 @@ func (r CS3Repo) WriteGroup(ctx context.Context, g *proto.Group) (err error) {
return err
}
resp, err := r.dataProvider.put(r.groupURL(g.Id), bytes.NewReader(by), t)
if err != nil {
return err
}
if err = resp.Body.Close(); err != nil {
return err
}
return nil
err = r.uploadHelper(ctx, r.groupURL(g.Id), by)
return err
}
// LoadGroup loads a group via cs3 by id and writes it to the provided group
@@ -208,8 +185,9 @@ func (r CS3Repo) LoadGroup(ctx context.Context, id string, g *proto.Group) (err
if err != nil {
return err
}
ctx = metadata.AppendToOutgoingContext(ctx, revactx.TokenHeader, t)
return r.loadGroup(id, t, g)
return r.loadGroup(ctx, id, g)
}
// LoadGroups loads all the groups from the cs3 api
@@ -232,7 +210,7 @@ func (r CS3Repo) LoadGroups(ctx context.Context, g *[]*proto.Group) (err error)
log := olog.NewLogger(olog.Pretty(r.cfg.Log.Pretty), olog.Color(r.cfg.Log.Color), olog.Level(r.cfg.Log.Level))
for i := range res.Infos {
grp := &proto.Group{}
err := r.loadGroup(filepath.Base(res.Infos[i].Path), t, grp)
err := r.loadGroup(ctx, filepath.Base(res.Infos[i].Path), grp)
if err != nil {
log.Err(err).Msg("could not load account")
continue
@@ -242,24 +220,16 @@ func (r CS3Repo) LoadGroups(ctx context.Context, g *[]*proto.Group) (err error)
return nil
}
func (r CS3Repo) loadGroup(id string, t string, g *proto.Group) error {
resp, err := r.dataProvider.get(r.groupURL(id), t)
func (r CS3Repo) loadGroup(ctx context.Context, id string, g *proto.Group) error {
group, err := r.downloadHelper(ctx, r.groupURL(id))
if err != nil {
switch err.(type) {
case notFoundErr:
return notFoundErr{"group", id}
}
return err
}
if resp.StatusCode == http.StatusNotFound {
return &notFoundErr{"group", id}
}
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if err = resp.Body.Close(); err != nil {
return err
}
return json.Unmarshal(b, &g)
return json.Unmarshal(group, &g)
}
// DeleteGroup deletes a group via cs3 by id
@@ -311,11 +281,11 @@ func AuthenticateCS3(ctx context.Context, su config.ServiceUser, tm token.Manage
}
func (r CS3Repo) accountURL(id string) string {
return singleJoiningSlash(r.cfg.Repo.CS3.DataURL, path.Join(r.cfg.Repo.CS3.DataPrefix, accountsFolder, id))
return path.Join(accountsFolder, id)
}
func (r CS3Repo) groupURL(id string) string {
return singleJoiningSlash(r.cfg.Repo.CS3.DataURL, path.Join(r.cfg.Repo.CS3.DataPrefix, groupsFolder, id))
return path.Join(groupsFolder, id)
}
func (r CS3Repo) makeRootDirIfNotExist(ctx context.Context, folder string) error {
@@ -349,39 +319,91 @@ func MakeDirIfNotExist(ctx context.Context, sp provider.ProviderAPIClient, folde
return nil
}
// TODO: this is copied from proxy. Find a better solution or move it to ocis-pkg
func singleJoiningSlash(a, b string) string {
aslash := strings.HasSuffix(a, "/")
bslash := strings.HasPrefix(b, "/")
switch {
case aslash && bslash:
return a + b[1:]
case !aslash && !bslash:
return a + "/" + b
func (r CS3Repo) uploadHelper(ctx context.Context, path string, content []byte) error {
ref := provider.InitiateFileUploadRequest{
Ref: &provider.Reference{
Path: path,
},
}
return a + b
}
type dataProviderClient struct {
client http.Client
}
func (d dataProviderClient) put(url string, body io.Reader, token string) (*http.Response, error) {
req, err := http.NewRequest(http.MethodPut, url, body)
res, err := r.storageProvider.InitiateFileUpload(ctx, &ref)
if err != nil {
return nil, err
return err
}
req.Header.Add(revactx.TokenHeader, token)
return d.client.Do(req)
}
var endpoint string
func (d dataProviderClient) get(url string, token string) (*http.Response, error) {
req, err := http.NewRequest(http.MethodGet, url, nil)
for _, proto := range res.GetProtocols() {
if proto.Protocol == "simple" {
endpoint = proto.GetUploadEndpoint()
break
}
}
if endpoint == "" {
return errors.New("metadata storage doesn't support the simple upload protocol")
}
req, err := http.NewRequest(http.MethodPut, endpoint, bytes.NewReader(content))
if err != nil {
return nil, err
return err
}
resp, err := r.dataGatewayClient.Do(req)
if err != nil {
return err
}
if err = resp.Body.Close(); err != nil {
return err
}
return nil
}
func (r CS3Repo) downloadHelper(ctx context.Context, path string) (content []byte, err error) {
ref := provider.InitiateFileDownloadRequest{
Ref: &provider.Reference{
Path: path,
},
}
req.Header.Add(revactx.TokenHeader, token)
return d.client.Do(req)
res, err := r.storageProvider.InitiateFileDownload(ctx, &ref)
if err != nil {
return []byte{}, err
}
var endpoint string
for _, proto := range res.GetProtocols() {
if proto.Protocol == "simple" {
endpoint = proto.GetDownloadEndpoint()
break
}
}
if endpoint == "" {
return []byte{}, errors.New("metadata storage doesn't support the simple download protocol")
}
req, err := http.NewRequest(http.MethodGet, endpoint, nil)
if err != nil {
return []byte{}, err
}
resp, err := r.dataGatewayClient.Do(req)
if err != nil {
return []byte{}, err
}
if resp.StatusCode != http.StatusOK {
return []byte{}, &notFoundErr{}
}
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return []byte{}, err
}
if err = resp.Body.Close(); err != nil {
return []byte{}, err
}
return b, nil
}
-2
View File
@@ -17,8 +17,6 @@ package storage
// Repo: config.Repo{
// CS3: config.CS3{
// ProviderAddr: "0.0.0.0:9215",
// DataURL: "http://localhost:9216",
// DataPrefix: "data",
// },
// },
//}
+1 -1
View File
@@ -8,7 +8,7 @@ type notFoundErr struct {
typ, id string
}
func (e *notFoundErr) Error() string {
func (e notFoundErr) Error() string {
return fmt.Sprintf("%s with id %s not found", e.typ, e.id)
}
+24 -54
View File
@@ -2,8 +2,6 @@ package cs3
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"os"
"path"
@@ -20,7 +18,6 @@ import (
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
revactx "github.com/cs3org/reva/pkg/ctx"
"github.com/cs3org/reva/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/pkg/token"
"github.com/cs3org/reva/pkg/token/manager/jwt"
"google.golang.org/grpc/metadata"
@@ -37,9 +34,7 @@ type Autoincrement struct {
indexBaseDir string
indexRootDir string
tokenManager token.Manager
storageProvider provider.ProviderAPIClient
dataProvider dataProviderClient // Used to create and download data via http, bypassing reva upload protocol
metadataStorage *metadataStorage
cs3conf *Config
bound *option.Bound
@@ -65,17 +60,9 @@ func NewAutoincrementIndex(o ...option.Option) index.Index {
indexRootDir: path.Join(path.Join(opts.DataDir, "index.cs3"), strings.Join([]string{"autoincrement", opts.TypeName, opts.IndexBy}, ".")),
cs3conf: &Config{
ProviderAddr: opts.ProviderAddr,
DataURL: opts.DataURL,
DataPrefix: opts.DataPrefix,
JWTSecret: opts.JWTSecret,
ServiceUser: opts.ServiceUser,
},
dataProvider: dataProviderClient{
baseURL: singleJoiningSlash(opts.DataURL, opts.DataPrefix),
client: http.Client{
Transport: http.DefaultTransport,
},
},
}
return u
@@ -91,25 +78,22 @@ func (idx *Autoincrement) Init() error {
return err
}
idx.tokenManager = tokenManager
client, err := pool.GetStorageProviderServiceClient(idx.cs3conf.ProviderAddr)
if err != nil {
return err
}
idx.storageProvider = client
idx.metadataStorage = &metadataStorage{
tokenManager: tokenManager,
storageProvider: client,
dataGatewayClient: http.DefaultClient,
}
ctx, err := idx.getAuthenticatedContext(context.Background())
if err != nil {
if err := idx.makeDirIfNotExists(idx.indexBaseDir); err != nil {
return err
}
if err := idx.makeDirIfNotExists(ctx, idx.indexBaseDir); err != nil {
return err
}
if err := idx.makeDirIfNotExists(ctx, idx.indexRootDir); err != nil {
if err := idx.makeDirIfNotExists(idx.indexRootDir); err != nil {
return err
}
@@ -175,7 +159,7 @@ func (idx *Autoincrement) Remove(id string, v string) error {
}
deletePath := path.Join("/meta", idx.indexRootDir, v)
resp, err := idx.storageProvider.Delete(ctx, &provider.DeleteRequest{
resp, err := idx.metadataStorage.storageProvider.Delete(ctx, &provider.DeleteRequest{
Ref: &provider.Reference{
Path: deletePath,
},
@@ -213,7 +197,7 @@ func (idx *Autoincrement) Search(pattern string) ([]string, error) {
return nil, err
}
res, err := idx.storageProvider.ListContainer(ctx, &provider.ListContainerRequest{
res, err := idx.metadataStorage.storageProvider.ListContainer(ctx, &provider.ListContainerRequest{
Ref: &provider.Reference{
Path: path.Join("/meta", idx.indexRootDir),
},
@@ -263,7 +247,7 @@ func (idx *Autoincrement) FilesDir() string {
}
func (idx *Autoincrement) createSymlink(oldname, newname string) error {
t, err := idx.authenticate(context.TODO())
ctx, err := idx.getAuthenticatedContext(context.Background())
if err != nil {
return err
}
@@ -272,52 +256,38 @@ func (idx *Autoincrement) createSymlink(oldname, newname string) error {
return os.ErrExist
}
resp, err := idx.dataProvider.put(newname, strings.NewReader(oldname), t)
err = idx.metadataStorage.uploadHelper(ctx, newname, []byte(oldname))
if err != nil {
return err
}
if err = resp.Body.Close(); err != nil {
return err
}
return nil
}
func (idx *Autoincrement) resolveSymlink(name string) (string, error) {
t, err := idx.authenticate(context.TODO())
ctx, err := idx.getAuthenticatedContext(context.Background())
if err != nil {
return "", err
}
resp, err := idx.dataProvider.get(name, t)
b, err := idx.metadataStorage.downloadHelper(ctx, name)
if err != nil {
return "", err
}
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
return "", os.ErrNotExist
}
return "", fmt.Errorf("could not resolve symlink %s, got status %v", name, resp.StatusCode)
}
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
if err = resp.Body.Close(); err != nil {
return "", err
}
return string(b), err
}
func (idx *Autoincrement) makeDirIfNotExists(ctx context.Context, folder string) error {
return storage.MakeDirIfNotExist(ctx, idx.storageProvider, folder)
func (idx *Autoincrement) makeDirIfNotExists(folder string) error {
ctx, err := idx.getAuthenticatedContext(context.Background())
if err != nil {
return err
}
return storage.MakeDirIfNotExist(ctx, idx.metadataStorage.storageProvider, folder)
}
func (idx *Autoincrement) authenticate(ctx context.Context) (token string, err error) {
return storage.AuthenticateCS3(ctx, idx.cs3conf.ServiceUser, idx.tokenManager)
return storage.AuthenticateCS3(ctx, idx.cs3conf.ServiceUser, idx.metadataStorage.tokenManager)
}
func (idx *Autoincrement) next() (int, error) {
@@ -326,7 +296,7 @@ func (idx *Autoincrement) next() (int, error) {
return -1, err
}
res, err := idx.storageProvider.ListContainer(ctx, &provider.ListContainerRequest{
res, err := idx.metadataStorage.storageProvider.ListContainer(ctx, &provider.ListContainerRequest{
Ref: &provider.Reference{
Path: path.Join("/meta", idx.indexRootDir),
},
@@ -375,5 +345,5 @@ func (idx *Autoincrement) Delete() error {
return err
}
return deleteIndexRoot(ctx, idx.storageProvider, idx.indexRootDir)
return deleteIndexRoot(ctx, idx.metadataStorage.storageProvider, idx.indexRootDir)
}
+12
View File
@@ -0,0 +1,12 @@
package cs3
import (
acccfg "github.com/owncloud/ocis/accounts/pkg/config"
)
// Config represents cs3conf. Should be deprecated in favor of config.Config.
type Config struct {
ProviderAddr string
JWTSecret string
ServiceUser acccfg.ServiceUser
}
@@ -1,45 +0,0 @@
package cs3
import (
"io"
"net/http"
"strings"
)
type dataProviderClient struct {
client http.Client
baseURL string
}
func (d dataProviderClient) put(url string, body io.Reader, token string) (*http.Response, error) {
req, err := http.NewRequest(http.MethodPut, singleJoiningSlash(d.baseURL, url), body)
if err != nil {
return nil, err
}
req.Header.Add("x-access-token", token)
return d.client.Do(req)
}
func (d dataProviderClient) get(url string, token string) (*http.Response, error) {
req, err := http.NewRequest(http.MethodGet, singleJoiningSlash(d.baseURL, url), nil)
if err != nil {
return nil, err
}
req.Header.Add("x-access-token", token)
return d.client.Do(req)
}
// TODO: this is copied from proxy. Find a better solution or move it to ocis-pkg
func singleJoiningSlash(a, b string) string {
aslash := strings.HasSuffix(a, "/")
bslash := strings.HasPrefix(b, "/")
switch {
case aslash && bslash:
return a + b[1:]
case !aslash && !bslash:
return a + "/" + b
}
return a + b
}
+26 -63
View File
@@ -2,8 +2,6 @@ package cs3
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"os"
"path"
@@ -16,7 +14,6 @@ import (
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
revactx "github.com/cs3org/reva/pkg/ctx"
"github.com/cs3org/reva/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/pkg/token"
"github.com/cs3org/reva/pkg/token/manager/jwt"
idxerrs "github.com/owncloud/ocis/ocis-pkg/indexer/errors"
"github.com/owncloud/ocis/ocis-pkg/indexer/index"
@@ -38,9 +35,7 @@ type NonUnique struct {
indexBaseDir string
indexRootDir string
tokenManager token.Manager
storageProvider provider.ProviderAPIClient
dataProvider dataProviderClient // Used to create and download data via http, bypassing reva upload protocol
metadataStorage *metadataStorage
cs3conf *Config
}
@@ -69,17 +64,9 @@ func NewNonUniqueIndexWithOptions(o ...option.Option) index.Index {
indexRootDir: path.Join(path.Join(opts.DataDir, "index.cs3"), strings.Join([]string{"non_unique", opts.TypeName, opts.IndexBy}, ".")),
cs3conf: &Config{
ProviderAddr: opts.ProviderAddr,
DataURL: opts.DataURL,
DataPrefix: opts.DataPrefix,
JWTSecret: opts.JWTSecret,
ServiceUser: opts.ServiceUser,
},
dataProvider: dataProviderClient{
baseURL: singleJoiningSlash(opts.DataURL, opts.DataPrefix),
client: http.Client{
Transport: http.DefaultTransport,
},
},
}
}
@@ -93,27 +80,22 @@ func (idx *NonUnique) Init() error {
return err
}
idx.tokenManager = tokenManager
client, err := pool.GetStorageProviderServiceClient(idx.cs3conf.ProviderAddr)
if err != nil {
return err
}
idx.storageProvider = client
ctx := context.Background()
tk, err := idx.authenticate(ctx)
if err != nil {
return err
idx.metadataStorage = &metadataStorage{
tokenManager: tokenManager,
storageProvider: client,
dataGatewayClient: http.DefaultClient,
}
ctx = metadata.AppendToOutgoingContext(ctx, revactx.TokenHeader, tk)
if err := idx.makeDirIfNotExists(ctx, idx.indexBaseDir); err != nil {
if err := idx.makeDirIfNotExists(idx.indexBaseDir); err != nil {
return err
}
if err := idx.makeDirIfNotExists(ctx, idx.indexRootDir); err != nil {
if err := idx.makeDirIfNotExists(idx.indexRootDir); err != nil {
return err
}
@@ -131,7 +113,7 @@ func (idx *NonUnique) Lookup(v string) ([]string, error) {
return nil, err
}
res, err := idx.storageProvider.ListContainer(ctx, &provider.ListContainerRequest{
res, err := idx.metadataStorage.storageProvider.ListContainer(ctx, &provider.ListContainerRequest{
Ref: &provider.Reference{
Path: path.Join("/meta", idx.indexRootDir, v),
},
@@ -156,13 +138,9 @@ func (idx *NonUnique) Add(id, v string) (string, error) {
if idx.caseInsensitive {
v = strings.ToLower(v)
}
ctx, err := idx.getAuthenticatedContext(context.Background())
if err != nil {
return "", err
}
newName := path.Join(idx.indexRootDir, v)
if err := idx.makeDirIfNotExists(ctx, newName); err != nil {
if err := idx.makeDirIfNotExists(newName); err != nil {
return "", err
}
@@ -191,7 +169,7 @@ func (idx *NonUnique) Remove(id string, v string) error {
}
deletePath := path.Join("/meta", idx.indexRootDir, v, id)
resp, err := idx.storageProvider.Delete(ctx, &provider.DeleteRequest{
resp, err := idx.metadataStorage.storageProvider.Delete(ctx, &provider.DeleteRequest{
Ref: &provider.Reference{
Path: deletePath,
},
@@ -206,7 +184,7 @@ func (idx *NonUnique) Remove(id string, v string) error {
}
toStat := path.Join("/meta", idx.indexRootDir, v)
lcResp, err := idx.storageProvider.ListContainer(ctx, &provider.ListContainerRequest{
lcResp, err := idx.metadataStorage.storageProvider.ListContainer(ctx, &provider.ListContainerRequest{
Ref: &provider.Reference{
Path: toStat,
},
@@ -217,7 +195,7 @@ func (idx *NonUnique) Remove(id string, v string) error {
if len(lcResp.Infos) == 0 {
deletePath = path.Join("/meta", idx.indexRootDir, v)
_, err := idx.storageProvider.Delete(ctx, &provider.DeleteRequest{
_, err := idx.metadataStorage.storageProvider.Delete(ctx, &provider.DeleteRequest{
Ref: &provider.Reference{
Path: deletePath,
},
@@ -261,7 +239,7 @@ func (idx *NonUnique) Search(pattern string) ([]string, error) {
foldersMatched := make([]string, 0)
matches := make([]string, 0)
res, err := idx.storageProvider.ListContainer(ctx, &provider.ListContainerRequest{
res, err := idx.metadataStorage.storageProvider.ListContainer(ctx, &provider.ListContainerRequest{
Ref: &provider.Reference{
Path: path.Join("/meta", idx.indexRootDir),
},
@@ -282,7 +260,7 @@ func (idx *NonUnique) Search(pattern string) ([]string, error) {
}
for i := range foldersMatched {
res, _ := idx.storageProvider.ListContainer(ctx, &provider.ListContainerRequest{
res, _ := idx.metadataStorage.storageProvider.ListContainer(ctx, &provider.ListContainerRequest{
Ref: &provider.Reference{
Path: foldersMatched[i],
},
@@ -316,12 +294,16 @@ func (idx *NonUnique) FilesDir() string {
return idx.filesDir
}
func (idx *NonUnique) makeDirIfNotExists(ctx context.Context, folder string) error {
return storage.MakeDirIfNotExist(ctx, idx.storageProvider, folder)
func (idx *NonUnique) makeDirIfNotExists(folder string) error {
ctx, err := idx.getAuthenticatedContext(context.Background())
if err != nil {
return err
}
return storage.MakeDirIfNotExist(ctx, idx.metadataStorage.storageProvider, folder)
}
func (idx *NonUnique) createSymlink(oldname, newname string) error {
t, err := idx.authenticate(context.TODO())
ctx, err := idx.getAuthenticatedContext(context.Background())
if err != nil {
return err
}
@@ -330,43 +312,24 @@ func (idx *NonUnique) createSymlink(oldname, newname string) error {
return os.ErrExist
}
resp, err := idx.dataProvider.put(newname, strings.NewReader(oldname), t)
err = idx.metadataStorage.uploadHelper(ctx, newname, []byte(oldname))
if err != nil {
return err
}
if err = resp.Body.Close(); err != nil {
return err
}
return nil
}
func (idx *NonUnique) resolveSymlink(name string) (string, error) {
t, err := idx.authenticate(context.TODO())
ctx, err := idx.getAuthenticatedContext(context.Background())
if err != nil {
return "", err
}
resp, err := idx.dataProvider.get(name, t)
b, err := idx.metadataStorage.downloadHelper(ctx, name)
if err != nil {
return "", err
}
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
return "", os.ErrNotExist
}
return "", fmt.Errorf("could not resolve symlink %s, got status %v", name, resp.StatusCode)
}
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
if err = resp.Body.Close(); err != nil {
return "", err
}
return string(b), err
}
@@ -386,9 +349,9 @@ func (idx *NonUnique) Delete() error {
return err
}
return deleteIndexRoot(ctx, idx.storageProvider, idx.indexRootDir)
return deleteIndexRoot(ctx, idx.metadataStorage.storageProvider, idx.indexRootDir)
}
func (idx *NonUnique) authenticate(ctx context.Context) (token string, err error) {
return storage.AuthenticateCS3(ctx, idx.cs3conf.ServiceUser, idx.tokenManager)
return storage.AuthenticateCS3(ctx, idx.cs3conf.ServiceUser, idx.metadataStorage.tokenManager)
}
+107
View File
@@ -0,0 +1,107 @@
package cs3
import (
"bytes"
"context"
"errors"
"io/ioutil"
"net/http"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/token"
)
type metadataStorage struct {
tokenManager token.Manager
storageProvider provider.ProviderAPIClient
dataGatewayClient *http.Client
}
func (r metadataStorage) uploadHelper(ctx context.Context, path string, content []byte) error {
ref := provider.InitiateFileUploadRequest{
Ref: &provider.Reference{
Path: path,
},
}
res, err := r.storageProvider.InitiateFileUpload(ctx, &ref)
if err != nil {
return err
}
var endpoint string
for _, proto := range res.GetProtocols() {
if proto.Protocol == "simple" {
endpoint = proto.GetUploadEndpoint()
break
}
}
if endpoint == "" {
return errors.New("metadata storage doesn't support the simple upload protocol")
}
req, err := http.NewRequest(http.MethodPut, endpoint, bytes.NewReader(content))
if err != nil {
return err
}
resp, err := r.dataGatewayClient.Do(req)
if err != nil {
return err
}
if err = resp.Body.Close(); err != nil {
return err
}
return nil
}
func (r metadataStorage) downloadHelper(ctx context.Context, path string) (content []byte, err error) {
ref := provider.InitiateFileDownloadRequest{
Ref: &provider.Reference{
Path: path,
},
}
res, err := r.storageProvider.InitiateFileDownload(ctx, &ref)
if err != nil {
return []byte{}, err
}
var endpoint string
for _, proto := range res.GetProtocols() {
if proto.Protocol == "simple" {
endpoint = proto.GetDownloadEndpoint()
break
}
}
if endpoint == "" {
return []byte{}, errors.New("metadata storage doesn't support the simple download protocol")
}
req, err := http.NewRequest(http.MethodGet, endpoint, nil)
if err != nil {
return []byte{}, err
}
resp, err := r.dataGatewayClient.Do(req)
if err != nil {
return []byte{}, err
}
//if resp.StatusCode != http.StatusOK {
// return []byte{}, &notFoundErr{}
//}
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return []byte{}, err
}
if err = resp.Body.Close(); err != nil {
return []byte{}, err
}
return b, nil
}
+23 -71
View File
@@ -2,8 +2,6 @@ package cs3
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"os"
"path"
@@ -12,13 +10,10 @@ import (
"github.com/owncloud/ocis/accounts/pkg/storage"
acccfg "github.com/owncloud/ocis/accounts/pkg/config"
v1beta11 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
revactx "github.com/cs3org/reva/pkg/ctx"
"github.com/cs3org/reva/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/pkg/token"
"github.com/cs3org/reva/pkg/token/manager/jwt"
idxerrs "github.com/owncloud/ocis/ocis-pkg/indexer/errors"
"github.com/owncloud/ocis/ocis-pkg/indexer/index"
@@ -36,22 +31,11 @@ type Unique struct {
indexBaseDir string
indexRootDir string
tokenManager token.Manager
storageProvider provider.ProviderAPIClient
dataProvider dataProviderClient // Used to create and download data via http, bypassing reva upload protocol
metadataStorage *metadataStorage
cs3conf *Config
}
// Config represents cs3conf. Should be deprecated in favor of config.Config.
type Config struct {
ProviderAddr string
DataURL string
DataPrefix string
JWTSecret string
ServiceUser acccfg.ServiceUser
}
func init() {
registry.IndexConstructorRegistry["cs3"]["unique"] = NewUniqueIndexWithOptions
}
@@ -73,17 +57,9 @@ func NewUniqueIndexWithOptions(o ...option.Option) index.Index {
indexRootDir: path.Join(path.Join(opts.DataDir, "index.cs3"), strings.Join([]string{"unique", opts.TypeName, opts.IndexBy}, ".")),
cs3conf: &Config{
ProviderAddr: opts.ProviderAddr,
DataURL: opts.DataURL,
DataPrefix: opts.DataPrefix,
JWTSecret: opts.JWTSecret,
ServiceUser: opts.ServiceUser,
},
dataProvider: dataProviderClient{
baseURL: singleJoiningSlash(opts.DataURL, opts.DataPrefix),
client: http.Client{
Transport: http.DefaultTransport,
},
},
}
return u
@@ -94,32 +70,26 @@ func (idx *Unique) Init() error {
tokenManager, err := jwt.New(map[string]interface{}{
"secret": idx.cs3conf.JWTSecret,
})
if err != nil {
return err
}
idx.tokenManager = tokenManager
client, err := pool.GetStorageProviderServiceClient(idx.cs3conf.ProviderAddr)
if err != nil {
return err
}
idx.storageProvider = client
ctx := context.Background()
tk, err := idx.authenticate(ctx)
if err != nil {
return err
idx.metadataStorage = &metadataStorage{
tokenManager: tokenManager,
storageProvider: client,
dataGatewayClient: http.DefaultClient,
}
ctx = metadata.AppendToOutgoingContext(ctx, revactx.TokenHeader, tk)
if err := idx.makeDirIfNotExists(ctx, idx.indexBaseDir); err != nil {
if err := idx.makeDirIfNotExists(idx.indexBaseDir); err != nil {
return err
}
if err := idx.makeDirIfNotExists(ctx, idx.indexRootDir); err != nil {
if err := idx.makeDirIfNotExists(idx.indexRootDir); err != nil {
return err
}
@@ -182,15 +152,13 @@ func (idx *Unique) Remove(id string, v string) error {
return err
}
ctx := context.Background()
t, err := idx.authenticate(ctx)
ctx, err := idx.getAuthenticatedContext(context.Background())
if err != nil {
return err
}
deletePath := path.Join("/meta", idx.indexRootDir, v)
ctx = metadata.AppendToOutgoingContext(ctx, revactx.TokenHeader, t)
resp, err := idx.storageProvider.Delete(ctx, &provider.DeleteRequest{
resp, err := idx.metadataStorage.storageProvider.Delete(ctx, &provider.DeleteRequest{
Ref: &provider.Reference{
Path: deletePath,
},
@@ -232,14 +200,12 @@ func (idx *Unique) Search(pattern string) ([]string, error) {
pattern = strings.ToLower(pattern)
}
ctx := context.Background()
t, err := idx.authenticate(ctx)
ctx, err := idx.getAuthenticatedContext(context.Background())
if err != nil {
return nil, err
}
ctx = metadata.AppendToOutgoingContext(ctx, revactx.TokenHeader, t)
res, err := idx.storageProvider.ListContainer(ctx, &provider.ListContainerRequest{
res, err := idx.metadataStorage.storageProvider.ListContainer(ctx, &provider.ListContainerRequest{
Ref: &provider.Reference{
Path: path.Join("/meta", idx.indexRootDir),
},
@@ -289,7 +255,7 @@ func (idx *Unique) FilesDir() string {
}
func (idx *Unique) createSymlink(oldname, newname string) error {
t, err := idx.authenticate(context.TODO())
ctx, err := idx.getAuthenticatedContext(context.Background())
if err != nil {
return err
}
@@ -298,52 +264,38 @@ func (idx *Unique) createSymlink(oldname, newname string) error {
return os.ErrExist
}
resp, err := idx.dataProvider.put(newname, strings.NewReader(oldname), t)
err = idx.metadataStorage.uploadHelper(ctx, newname, []byte(oldname))
if err != nil {
return err
}
if err = resp.Body.Close(); err != nil {
return err
}
return nil
}
func (idx *Unique) resolveSymlink(name string) (string, error) {
t, err := idx.authenticate(context.TODO())
ctx, err := idx.getAuthenticatedContext(context.Background())
if err != nil {
return "", err
}
resp, err := idx.dataProvider.get(name, t)
b, err := idx.metadataStorage.downloadHelper(ctx, name)
if err != nil {
return "", err
}
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
return "", os.ErrNotExist
}
return "", fmt.Errorf("could not resolve symlink %s, got status %v", name, resp.StatusCode)
}
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
if err = resp.Body.Close(); err != nil {
return "", err
}
return string(b), err
}
func (idx *Unique) makeDirIfNotExists(ctx context.Context, folder string) error {
return storage.MakeDirIfNotExist(ctx, idx.storageProvider, folder)
func (idx *Unique) makeDirIfNotExists(folder string) error {
ctx, err := idx.getAuthenticatedContext(context.Background())
if err != nil {
return err
}
return storage.MakeDirIfNotExist(ctx, idx.metadataStorage.storageProvider, folder)
}
func (idx *Unique) authenticate(ctx context.Context) (token string, err error) {
return storage.AuthenticateCS3(ctx, idx.cs3conf.ServiceUser, idx.tokenManager)
return storage.AuthenticateCS3(ctx, idx.cs3conf.ServiceUser, idx.metadataStorage.tokenManager)
}
func (idx *Unique) getAuthenticatedContext(ctx context.Context) (context.Context, error) {
@@ -362,5 +314,5 @@ func (idx *Unique) Delete() error {
return err
}
return deleteIndexRoot(ctx, idx.storageProvider, idx.indexRootDir)
return deleteIndexRoot(ctx, idx.metadataStorage.storageProvider, idx.indexRootDir)
}