mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-01-08 13:19:58 -06:00
Merge pull request #10133 from owncloud/gtw-selector
[full-ci] fix: always select next gateway client
This commit is contained in:
5
changelog/unreleased/fix-select-next-gateway-client.md
Normal file
5
changelog/unreleased/fix-select-next-gateway-client.md
Normal file
@@ -0,0 +1,5 @@
|
||||
Bugfix: Always select next gateway client
|
||||
|
||||
We now use the gateway selector to always select the next gateway client. This ensures that we can always connect to the gateway during up- and downscaling.
|
||||
|
||||
https://github.com/owncloud/ocis/pull/10133
|
||||
@@ -15,3 +15,10 @@ packages:
|
||||
dir: "mocks"
|
||||
interfaces:
|
||||
LockParser:
|
||||
github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool:
|
||||
config:
|
||||
dir: "mocks"
|
||||
interfaces:
|
||||
Selectable:
|
||||
config:
|
||||
filename: "gateway_selector.go"
|
||||
|
||||
104
services/collaboration/mocks/gateway_selector.go
Normal file
104
services/collaboration/mocks/gateway_selector.go
Normal file
@@ -0,0 +1,104 @@
|
||||
// Code generated by mockery v2.43.2. DO NOT EDIT.
|
||||
|
||||
package mocks
|
||||
|
||||
import (
|
||||
pool "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
// Selectable is an autogenerated mock type for the Selectable type
|
||||
type Selectable[T interface{}] struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
type Selectable_Expecter[T interface{}] struct {
|
||||
mock *mock.Mock
|
||||
}
|
||||
|
||||
func (_m *Selectable[T]) EXPECT() *Selectable_Expecter[T] {
|
||||
return &Selectable_Expecter[T]{mock: &_m.Mock}
|
||||
}
|
||||
|
||||
// Next provides a mock function with given fields: opts
|
||||
func (_m *Selectable[T]) Next(opts ...pool.Option) (T, error) {
|
||||
_va := make([]interface{}, len(opts))
|
||||
for _i := range opts {
|
||||
_va[_i] = opts[_i]
|
||||
}
|
||||
var _ca []interface{}
|
||||
_ca = append(_ca, _va...)
|
||||
ret := _m.Called(_ca...)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for Next")
|
||||
}
|
||||
|
||||
var r0 T
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(...pool.Option) (T, error)); ok {
|
||||
return rf(opts...)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(...pool.Option) T); ok {
|
||||
r0 = rf(opts...)
|
||||
} else {
|
||||
r0 = ret.Get(0).(T)
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(...pool.Option) error); ok {
|
||||
r1 = rf(opts...)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// Selectable_Next_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Next'
|
||||
type Selectable_Next_Call[T interface{}] struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// Next is a helper method to define mock.On call
|
||||
// - opts ...pool.Option
|
||||
func (_e *Selectable_Expecter[T]) Next(opts ...interface{}) *Selectable_Next_Call[T] {
|
||||
return &Selectable_Next_Call[T]{Call: _e.mock.On("Next",
|
||||
append([]interface{}{}, opts...)...)}
|
||||
}
|
||||
|
||||
func (_c *Selectable_Next_Call[T]) Run(run func(opts ...pool.Option)) *Selectable_Next_Call[T] {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
variadicArgs := make([]pool.Option, len(args)-0)
|
||||
for i, a := range args[0:] {
|
||||
if a != nil {
|
||||
variadicArgs[i] = a.(pool.Option)
|
||||
}
|
||||
}
|
||||
run(variadicArgs...)
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *Selectable_Next_Call[T]) Return(_a0 T, _a1 error) *Selectable_Next_Call[T] {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *Selectable_Next_Call[T]) RunAndReturn(run func(...pool.Option) (T, error)) *Selectable_Next_Call[T] {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// NewSelectable creates a new instance of Selectable. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
// The first argument is typically a *testing.T value.
|
||||
func NewSelectable[T interface{}](t interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}) *Selectable[T] {
|
||||
mock := &Selectable[T]{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
@@ -5,8 +5,10 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
||||
"github.com/oklog/run"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
|
||||
registry "github.com/owncloud/ocis/v2/ocis-pkg/registry"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
|
||||
"github.com/owncloud/ocis/v2/services/collaboration/pkg/config"
|
||||
"github.com/owncloud/ocis/v2/services/collaboration/pkg/config/parser"
|
||||
@@ -44,7 +46,17 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
return err
|
||||
}
|
||||
|
||||
gwc, err := helpers.GetCS3apiClient(cfg, false)
|
||||
tm, err := pool.StringToTLSMode(cfg.CS3Api.GRPCClientTLS.Mode)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
gatewaySelector, err := pool.GatewaySelector(
|
||||
cfg.CS3Api.Gateway.Name,
|
||||
pool.WithTLSCACert(cfg.CS3Api.GRPCClientTLS.CACert),
|
||||
pool.WithTLSMode(tm),
|
||||
pool.WithRegistry(registry.GetRegistry()),
|
||||
pool.WithTracerProvider(traceProvider),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -54,7 +66,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := helpers.RegisterAppProvider(ctx, cfg, logger, gwc, appUrls); err != nil {
|
||||
if err := helpers.RegisterAppProvider(ctx, cfg, logger, gatewaySelector, appUrls); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -101,7 +113,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
|
||||
// start HTTP server
|
||||
httpServer, err := http.Server(
|
||||
http.Adapter(connector.NewHttpAdapter(gwc, cfg)),
|
||||
http.Adapter(connector.NewHttpAdapter(gatewaySelector, cfg)),
|
||||
http.Logger(logger),
|
||||
http.Config(cfg),
|
||||
http.Context(ctx),
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
package config
|
||||
|
||||
import "github.com/owncloud/ocis/v2/ocis-pkg/shared"
|
||||
|
||||
// CS3Api defines the available configuration in order to access to the CS3 gateway.
|
||||
type CS3Api struct {
|
||||
Gateway Gateway `yaml:"gateway"`
|
||||
DataGateway DataGateway `yaml:"datagateway"`
|
||||
Gateway Gateway `yaml:"gateway"`
|
||||
DataGateway DataGateway `yaml:"datagateway"`
|
||||
GRPCClientTLS *shared.GRPCClientTLS `yaml:"grpc_client_tls"`
|
||||
}
|
||||
|
||||
// Gateway defines the available configuration for the CS3 API gateway
|
||||
|
||||
@@ -2,6 +2,7 @@ package defaults
|
||||
|
||||
import (
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/shared"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/structs"
|
||||
"github.com/owncloud/ocis/v2/services/collaboration/pkg/config"
|
||||
)
|
||||
|
||||
@@ -93,6 +94,9 @@ func EnsureDefaults(cfg *config.Config) {
|
||||
} else if cfg.TokenManager == nil {
|
||||
cfg.TokenManager = &config.TokenManager{}
|
||||
}
|
||||
if cfg.CS3Api.GRPCClientTLS == nil && cfg.Commons != nil {
|
||||
cfg.CS3Api.GRPCClientTLS = structs.CopyOrZeroValue(cfg.Commons.GRPCClientTLS)
|
||||
}
|
||||
}
|
||||
|
||||
// Sanitize sanitized the configuration
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
providerv1beta1 "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
|
||||
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
|
||||
revactx "github.com/cs3org/reva/v2/pkg/ctx"
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
|
||||
"github.com/owncloud/ocis/v2/services/collaboration/pkg/config"
|
||||
"github.com/owncloud/ocis/v2/services/collaboration/pkg/middleware"
|
||||
@@ -42,14 +43,14 @@ type ContentConnectorService interface {
|
||||
// uploads (PutFile)
|
||||
// Note that operations might return any kind of error, not just ConnectorError
|
||||
type ContentConnector struct {
|
||||
gwc gatewayv1beta1.GatewayAPIClient
|
||||
gws pool.Selectable[gatewayv1beta1.GatewayAPIClient]
|
||||
cfg *config.Config
|
||||
}
|
||||
|
||||
// NewContentConnector creates a new content connector
|
||||
func NewContentConnector(gwc gatewayv1beta1.GatewayAPIClient, cfg *config.Config) *ContentConnector {
|
||||
func NewContentConnector(gws pool.Selectable[gatewayv1beta1.GatewayAPIClient], cfg *config.Config) *ContentConnector {
|
||||
return &ContentConnector{
|
||||
gwc: gwc,
|
||||
gws: gws,
|
||||
cfg: cfg,
|
||||
}
|
||||
}
|
||||
@@ -76,7 +77,11 @@ func (c *ContentConnector) GetFile(ctx context.Context, w http.ResponseWriter) e
|
||||
Logger()
|
||||
logger.Debug().Msg("GetFile: start")
|
||||
|
||||
sResp, err := c.gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
gwc, err := c.gws.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sResp, err := gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
Ref: wopiContext.FileReference,
|
||||
})
|
||||
if err != nil {
|
||||
@@ -94,7 +99,11 @@ func (c *ContentConnector) GetFile(ctx context.Context, w http.ResponseWriter) e
|
||||
if wopiContext.ViewMode == appproviderv1beta1.ViewMode_VIEW_MODE_VIEW_ONLY && wopiContext.ViewOnlyToken != "" {
|
||||
ctx = revactx.ContextSetToken(ctx, wopiContext.ViewOnlyToken)
|
||||
}
|
||||
resp, err := c.gwc.InitiateFileDownload(ctx, req)
|
||||
gwc, err = c.gws.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := gwc.InitiateFileDownload(ctx, req)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("GetFile: InitiateFileDownload failed")
|
||||
return err
|
||||
@@ -227,9 +236,13 @@ func (c *ContentConnector) PutFile(ctx context.Context, stream io.Reader, stream
|
||||
Logger()
|
||||
logger.Debug().Msg("PutFile: start")
|
||||
|
||||
gwc, err := c.gws.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// We need a stat call on the target file in order to get both the lock
|
||||
// (if any) and the current size of the file
|
||||
statRes, err := c.gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
statRes, err := gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
Ref: wopiContext.FileReference,
|
||||
})
|
||||
if err != nil {
|
||||
@@ -286,8 +299,12 @@ func (c *ContentConnector) PutFile(ctx context.Context, stream io.Reader, stream
|
||||
},
|
||||
}
|
||||
|
||||
gwc, err = c.gws.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Initiate the upload request
|
||||
resp, err := c.gwc.InitiateFileUpload(ctx, req)
|
||||
resp, err := gwc.InitiateFileUpload(ctx, req)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("UploadHelper: InitiateFileUpload failed")
|
||||
return nil, err
|
||||
@@ -383,9 +400,13 @@ func (c *ContentConnector) PutFile(ctx context.Context, stream io.Reader, stream
|
||||
Msg("UploadHelper: Put request to the upload endpoint failed with unexpected status")
|
||||
return NewResponse(500), nil
|
||||
}
|
||||
gwc, err = c.gws.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// We need a stat call on the target file after the upload to get the
|
||||
// new mtime
|
||||
statResAfter, err := c.gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
statResAfter, err := gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
Ref: wopiContext.FileReference,
|
||||
})
|
||||
if err != nil {
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/cs3org/reva/v2/pkg/utils"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
"github.com/owncloud/ocis/v2/services/collaboration/mocks"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
appproviderv1beta1 "github.com/cs3org/go-cs3apis/cs3/app/provider/v1beta1"
|
||||
@@ -27,10 +28,11 @@ import (
|
||||
|
||||
var _ = Describe("ContentConnector", func() {
|
||||
var (
|
||||
cc *connector.ContentConnector
|
||||
gatewayClient *cs3mocks.GatewayAPIClient
|
||||
cfg *config.Config
|
||||
wopiCtx middleware.WopiContext
|
||||
cc *connector.ContentConnector
|
||||
gatewayClient *cs3mocks.GatewayAPIClient
|
||||
gatewaySelector *mocks.Selectable[gateway.GatewayAPIClient]
|
||||
cfg *config.Config
|
||||
wopiCtx middleware.WopiContext
|
||||
|
||||
srv *httptest.Server
|
||||
srvReqHeader http.Header
|
||||
@@ -40,8 +42,11 @@ var _ = Describe("ContentConnector", func() {
|
||||
BeforeEach(func() {
|
||||
// contentConnector only uses "cfg.CS3Api.DataGateway.Insecure", which is irrelevant for the tests
|
||||
cfg = &config.Config{}
|
||||
gatewayClient = &cs3mocks.GatewayAPIClient{}
|
||||
cc = connector.NewContentConnector(gatewayClient, cfg)
|
||||
gatewayClient = cs3mocks.NewGatewayAPIClient(GinkgoT())
|
||||
|
||||
gatewaySelector = mocks.NewSelectable[gateway.GatewayAPIClient](GinkgoT())
|
||||
gatewaySelector.On("Next").Return(gatewayClient, nil)
|
||||
cc = connector.NewContentConnector(gatewaySelector, cfg)
|
||||
|
||||
wopiCtx = middleware.WopiContext{
|
||||
AccessToken: "abcdef123456",
|
||||
@@ -91,6 +96,8 @@ var _ = Describe("ContentConnector", func() {
|
||||
}, nil)
|
||||
})
|
||||
It("No valid context", func() {
|
||||
gatewaySelector.EXPECT().Next().Unset()
|
||||
gatewayClient.EXPECT().Stat(mock.Anything, mock.Anything).Unset()
|
||||
sb := httptest.NewRecorder()
|
||||
ctx := context.Background()
|
||||
err := cc.GetFile(ctx, sb)
|
||||
@@ -226,6 +233,7 @@ var _ = Describe("ContentConnector", func() {
|
||||
|
||||
Describe("PutFile", func() {
|
||||
It("No valid context", func() {
|
||||
gatewaySelector.EXPECT().Next().Unset()
|
||||
reader := strings.NewReader("Content to upload is here!")
|
||||
ctx := context.Background()
|
||||
response, err := cc.PutFile(ctx, reader, reader.Size(), "notARandomLockId")
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
providerv1beta1 "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
|
||||
typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
|
||||
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
||||
"github.com/cs3org/reva/v2/pkg/storagespace"
|
||||
"github.com/cs3org/reva/v2/pkg/utils"
|
||||
"github.com/google/uuid"
|
||||
@@ -94,14 +95,14 @@ type FileConnectorService interface {
|
||||
// Currently, it handles file locks and getting the file info.
|
||||
// Note that operations might return any kind of error, not just ConnectorError
|
||||
type FileConnector struct {
|
||||
gwc gatewayv1beta1.GatewayAPIClient
|
||||
gws pool.Selectable[gatewayv1beta1.GatewayAPIClient]
|
||||
cfg *config.Config
|
||||
}
|
||||
|
||||
// NewFileConnector creates a new file connector
|
||||
func NewFileConnector(gwc gatewayv1beta1.GatewayAPIClient, cfg *config.Config) *FileConnector {
|
||||
func NewFileConnector(gws pool.Selectable[gatewayv1beta1.GatewayAPIClient], cfg *config.Config) *FileConnector {
|
||||
return &FileConnector{
|
||||
gwc: gwc,
|
||||
gws: gws,
|
||||
cfg: cfg,
|
||||
}
|
||||
}
|
||||
@@ -128,7 +129,11 @@ func (f *FileConnector) GetLock(ctx context.Context) (*ConnectorResponse, error)
|
||||
Ref: wopiContext.FileReference,
|
||||
}
|
||||
|
||||
resp, err := f.gwc.GetLock(ctx, req)
|
||||
gwc, err := f.gws.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := gwc.GetLock(ctx, req)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("GetLock failed")
|
||||
return nil, err
|
||||
@@ -209,7 +214,11 @@ func (f *FileConnector) Lock(ctx context.Context, lockID, oldLockID string) (*Co
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := f.gwc.SetLock(ctx, req)
|
||||
gwc, err := f.gws.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := gwc.SetLock(ctx, req)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("SetLock failed")
|
||||
return nil, err
|
||||
@@ -232,7 +241,11 @@ func (f *FileConnector) Lock(ctx context.Context, lockID, oldLockID string) (*Co
|
||||
ExistingLockId: oldLockID,
|
||||
}
|
||||
|
||||
resp, err := f.gwc.RefreshLock(ctx, req)
|
||||
gwc, err := f.gws.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := gwc.RefreshLock(ctx, req)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("UnlockAndRefresh failed")
|
||||
return nil, err
|
||||
@@ -240,7 +253,11 @@ func (f *FileConnector) Lock(ctx context.Context, lockID, oldLockID string) (*Co
|
||||
setOrRefreshStatus = resp.GetStatus()
|
||||
}
|
||||
|
||||
statResp, err := f.gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
gwc, err := f.gws.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
statResp, err := gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
Ref: wopiContext.FileReference,
|
||||
})
|
||||
if err != nil {
|
||||
@@ -270,7 +287,11 @@ func (f *FileConnector) Lock(ctx context.Context, lockID, oldLockID string) (*Co
|
||||
Ref: wopiContext.FileReference,
|
||||
}
|
||||
|
||||
resp, err := f.gwc.GetLock(ctx, req)
|
||||
gwc, err = f.gws.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := gwc.GetLock(ctx, req)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("SetLock failed, fallback to GetLock failed too")
|
||||
return nil, err
|
||||
@@ -362,13 +383,21 @@ func (f *FileConnector) RefreshLock(ctx context.Context, lockID string) (*Connec
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := f.gwc.RefreshLock(ctx, req)
|
||||
gwc, err := f.gws.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := gwc.RefreshLock(ctx, req)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("RefreshLock failed")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
statResp, err := f.gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
gwc, err = f.gws.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
statResp, err := gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
Ref: wopiContext.FileReference,
|
||||
})
|
||||
if err != nil {
|
||||
@@ -409,7 +438,11 @@ func (f *FileConnector) RefreshLock(ctx context.Context, lockID string) (*Connec
|
||||
Ref: wopiContext.FileReference,
|
||||
}
|
||||
|
||||
resp, err := f.gwc.GetLock(ctx, req)
|
||||
gwc, err = f.gws.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := gwc.GetLock(ctx, req)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("RefreshLock failed trying to get the current lock")
|
||||
return nil, err
|
||||
@@ -486,13 +519,21 @@ func (f *FileConnector) UnLock(ctx context.Context, lockID string) (*ConnectorRe
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := f.gwc.Unlock(ctx, req)
|
||||
gwc, err := f.gws.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := gwc.Unlock(ctx, req)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("Unlock failed")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
statResp, err := f.gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
gwc, err = f.gws.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
statResp, err := gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
Ref: wopiContext.FileReference,
|
||||
})
|
||||
if err != nil {
|
||||
@@ -521,7 +562,11 @@ func (f *FileConnector) UnLock(ctx context.Context, lockID string) (*ConnectorRe
|
||||
Ref: wopiContext.FileReference,
|
||||
}
|
||||
|
||||
resp, err := f.gwc.GetLock(ctx, req)
|
||||
gwc, err = f.gws.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := gwc.GetLock(ctx, req)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("Unlock failed trying to get the current lock")
|
||||
return nil, err
|
||||
@@ -598,8 +643,12 @@ func (f *FileConnector) PutRelativeFileSuggested(ctx context.Context, ccs Conten
|
||||
Str("PutTarget", target).
|
||||
Logger()
|
||||
|
||||
gwc, err := f.gws.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// stat the current file in order to get the reference of the parent folder
|
||||
oldStatRes, err := f.gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
oldStatRes, err := gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
Ref: wopiContext.FileReference,
|
||||
})
|
||||
if err != nil {
|
||||
@@ -715,8 +764,12 @@ func (f *FileConnector) PutRelativeFileRelative(ctx context.Context, ccs Content
|
||||
Str("PutTarget", target).
|
||||
Logger()
|
||||
|
||||
gwc, err := f.gws.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// stat the current file in order to get the reference of the parent folder
|
||||
oldStatRes, err := f.gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
oldStatRes, err := gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
Ref: wopiContext.FileReference,
|
||||
})
|
||||
if err != nil {
|
||||
@@ -845,7 +898,11 @@ func (f *FileConnector) DeleteFile(ctx context.Context, lockID string) (*Connect
|
||||
|
||||
// we'll retry the request after a while if we get a "TOO_EARLY" code
|
||||
for retries := 0; deleteRes == nil || deleteRes.GetStatus().GetCode() == rpcv1beta1.Code_CODE_TOO_EARLY; retries++ {
|
||||
deleteRes, err = f.gwc.Delete(ctx, deleteReq)
|
||||
gwc, err := f.gws.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
deleteRes, err = gwc.Delete(ctx, deleteReq)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("DeleteFile: stat failed")
|
||||
return nil, err
|
||||
@@ -888,7 +945,11 @@ func (f *FileConnector) DeleteFile(ctx context.Context, lockID string) (*Connect
|
||||
Ref: wopiContext.FileReference,
|
||||
}
|
||||
|
||||
resp, err2 := f.gwc.GetLock(ctx, req)
|
||||
gwc, err := f.gws.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err2 := gwc.GetLock(ctx, req)
|
||||
if err2 != nil {
|
||||
logger.Error().Err(err2).Msg("DeleteFile: GetLock failed")
|
||||
return nil, err2
|
||||
@@ -942,8 +1003,12 @@ func (f *FileConnector) RenameFile(ctx context.Context, lockID, target string) (
|
||||
Str("RenameTarget", target).
|
||||
Logger()
|
||||
|
||||
gwc, err := f.gws.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// stat the current file in order to get the reference of the parent folder
|
||||
oldStatRes, err := f.gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
oldStatRes, err := gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
Ref: wopiContext.FileReference,
|
||||
})
|
||||
if err != nil {
|
||||
@@ -978,8 +1043,12 @@ func (f *FileConnector) RenameFile(ctx context.Context, lockID, target string) (
|
||||
// add the new file reference to the log context
|
||||
newLogger := logger.With().Str("NewFileReference", targetFileReference.String()).Logger()
|
||||
|
||||
gwc, err = f.gws.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// try to put the file. It mustn't return a 400 or 409
|
||||
moveRes, err := f.gwc.Move(ctx, &providerv1beta1.MoveRequest{
|
||||
moveRes, err := gwc.Move(ctx, &providerv1beta1.MoveRequest{
|
||||
Source: wopiContext.FileReference,
|
||||
Destination: targetFileReference,
|
||||
LockId: lockID,
|
||||
@@ -1045,7 +1114,11 @@ func (f *FileConnector) CheckFileInfo(ctx context.Context) (*ConnectorResponse,
|
||||
|
||||
logger := zerolog.Ctx(ctx)
|
||||
|
||||
statRes, err := f.gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
gwc, err := f.gws.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
statRes, err := gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
Ref: wopiContext.FileReference,
|
||||
})
|
||||
if err != nil {
|
||||
@@ -1261,9 +1334,13 @@ func (f *FileConnector) generateWOPISrc(wopiContext middleware.WopiContext, logg
|
||||
}
|
||||
|
||||
func (f *FileConnector) adjustWopiReference(ctx context.Context, wopiContext *middleware.WopiContext, logger zerolog.Logger) error {
|
||||
gwc, err := f.gws.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// using resourceid + path won't do for WOPI, we need just the resource if of the new file
|
||||
// the wopicontext has resourceid + path, which is good enough for the stat request
|
||||
newStatRes, err := f.gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
newStatRes, err := gwc.Stat(ctx, &providerv1beta1.StatRequest{
|
||||
Ref: wopiContext.FileReference,
|
||||
})
|
||||
if err != nil {
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"strings"
|
||||
|
||||
appproviderv1beta1 "github.com/cs3org/go-cs3apis/cs3/app/provider/v1beta1"
|
||||
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
|
||||
userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
|
||||
providerv1beta1 "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
|
||||
typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
|
||||
@@ -23,16 +24,18 @@ import (
|
||||
"github.com/owncloud/ocis/v2/services/collaboration/pkg/connector"
|
||||
"github.com/owncloud/ocis/v2/services/collaboration/pkg/connector/fileinfo"
|
||||
"github.com/owncloud/ocis/v2/services/collaboration/pkg/middleware"
|
||||
"github.com/owncloud/ocis/v2/services/graph/mocks"
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
var _ = Describe("FileConnector", func() {
|
||||
var (
|
||||
fc *connector.FileConnector
|
||||
ccs *collabmocks.ContentConnectorService
|
||||
gatewayClient *cs3mocks.GatewayAPIClient
|
||||
cfg *config.Config
|
||||
wopiCtx middleware.WopiContext
|
||||
fc *connector.FileConnector
|
||||
ccs *collabmocks.ContentConnectorService
|
||||
gatewayClient *cs3mocks.GatewayAPIClient
|
||||
gatewaySelector *mocks.Selectable[gateway.GatewayAPIClient]
|
||||
cfg *config.Config
|
||||
wopiCtx middleware.WopiContext
|
||||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
@@ -49,8 +52,12 @@ var _ = Describe("FileConnector", func() {
|
||||
},
|
||||
}
|
||||
ccs = &collabmocks.ContentConnectorService{}
|
||||
gatewayClient = &cs3mocks.GatewayAPIClient{}
|
||||
fc = connector.NewFileConnector(gatewayClient, cfg)
|
||||
|
||||
gatewayClient = cs3mocks.NewGatewayAPIClient(GinkgoT())
|
||||
|
||||
gatewaySelector = mocks.NewSelectable[gateway.GatewayAPIClient](GinkgoT())
|
||||
gatewaySelector.On("Next").Return(gatewayClient, nil)
|
||||
fc = connector.NewFileConnector(gatewaySelector, cfg)
|
||||
|
||||
wopiCtx = middleware.WopiContext{
|
||||
// a real token is needed for the PutRelativeFileSuggested tests
|
||||
@@ -70,6 +77,7 @@ var _ = Describe("FileConnector", func() {
|
||||
|
||||
Describe("GetLock", func() {
|
||||
It("No valid context", func() {
|
||||
gatewaySelector.EXPECT().Next().Unset()
|
||||
ctx := context.Background()
|
||||
response, err := fc.GetLock(ctx)
|
||||
Expect(err).To(HaveOccurred())
|
||||
@@ -125,6 +133,7 @@ var _ = Describe("FileConnector", func() {
|
||||
Describe("Lock", func() {
|
||||
Describe("Lock", func() {
|
||||
It("No valid context", func() {
|
||||
gatewaySelector.EXPECT().Next().Unset()
|
||||
ctx := context.Background()
|
||||
response, err := fc.Lock(ctx, "newLock", "")
|
||||
Expect(err).To(HaveOccurred())
|
||||
@@ -132,6 +141,7 @@ var _ = Describe("FileConnector", func() {
|
||||
})
|
||||
|
||||
It("Empty lockId", func() {
|
||||
gatewaySelector.EXPECT().Next().Unset()
|
||||
ctx := middleware.WopiContextToCtx(context.Background(), wopiCtx)
|
||||
|
||||
response, err := fc.Lock(ctx, "", "")
|
||||
@@ -314,6 +324,7 @@ var _ = Describe("FileConnector", func() {
|
||||
|
||||
Describe("Unlock and relock", func() {
|
||||
It("No valid context", func() {
|
||||
gatewaySelector.EXPECT().Next().Unset()
|
||||
ctx := context.Background()
|
||||
response, err := fc.Lock(ctx, "newLock", "oldLock")
|
||||
Expect(err).To(HaveOccurred())
|
||||
@@ -321,6 +332,7 @@ var _ = Describe("FileConnector", func() {
|
||||
})
|
||||
|
||||
It("Empty lockId", func() {
|
||||
gatewaySelector.EXPECT().Next().Unset()
|
||||
ctx := middleware.WopiContextToCtx(context.Background(), wopiCtx)
|
||||
|
||||
response, err := fc.Lock(ctx, "", "oldLock")
|
||||
@@ -496,6 +508,7 @@ var _ = Describe("FileConnector", func() {
|
||||
|
||||
Describe("RefreshLock", func() {
|
||||
It("No valid context", func() {
|
||||
gatewaySelector.EXPECT().Next().Unset()
|
||||
ctx := context.Background()
|
||||
|
||||
response, err := fc.RefreshLock(ctx, "")
|
||||
@@ -504,6 +517,7 @@ var _ = Describe("FileConnector", func() {
|
||||
})
|
||||
|
||||
It("Empty lockId", func() {
|
||||
gatewaySelector.EXPECT().Next().Unset()
|
||||
ctx := middleware.WopiContextToCtx(context.Background(), wopiCtx)
|
||||
|
||||
response, err := fc.RefreshLock(ctx, "")
|
||||
@@ -668,6 +682,7 @@ var _ = Describe("FileConnector", func() {
|
||||
|
||||
Describe("Unlock", func() {
|
||||
It("No valid context", func() {
|
||||
gatewaySelector.EXPECT().Next().Unset()
|
||||
ctx := context.Background()
|
||||
|
||||
response, err := fc.UnLock(ctx, "")
|
||||
@@ -676,6 +691,7 @@ var _ = Describe("FileConnector", func() {
|
||||
})
|
||||
|
||||
It("Empty lockId", func() {
|
||||
gatewaySelector.EXPECT().Next().Unset()
|
||||
ctx := middleware.WopiContextToCtx(context.Background(), wopiCtx)
|
||||
|
||||
response, err := fc.UnLock(ctx, "")
|
||||
@@ -845,6 +861,7 @@ var _ = Describe("FileConnector", func() {
|
||||
|
||||
Describe("PutRelativeFileSuggested", func() {
|
||||
It("No valid context", func() {
|
||||
gatewaySelector.EXPECT().Next().Unset()
|
||||
ctx := context.Background()
|
||||
stream := strings.NewReader("This is the content of a file")
|
||||
response, err := fc.PutRelativeFileSuggested(ctx, ccs, stream, int64(stream.Len()), "newFile.txt")
|
||||
@@ -1098,6 +1115,7 @@ var _ = Describe("FileConnector", func() {
|
||||
|
||||
Describe("PutRelativeFileRelative", func() {
|
||||
It("No valid context", func() {
|
||||
gatewaySelector.EXPECT().Next().Unset()
|
||||
ctx := context.Background()
|
||||
stream := strings.NewReader("This is the content of a file")
|
||||
response, err := fc.PutRelativeFileRelative(ctx, ccs, stream, int64(stream.Len()), "newFile.txt")
|
||||
@@ -1285,6 +1303,7 @@ var _ = Describe("FileConnector", func() {
|
||||
|
||||
Describe("DeleteFile", func() {
|
||||
It("No valid context", func() {
|
||||
gatewaySelector.EXPECT().Next().Unset()
|
||||
ctx := context.Background()
|
||||
response, err := fc.DeleteFile(ctx, "lock")
|
||||
Expect(err).To(HaveOccurred())
|
||||
@@ -1299,9 +1318,7 @@ var _ = Describe("FileConnector", func() {
|
||||
Status: status.NewInternal(ctx, "something failed"),
|
||||
}, targetErr)
|
||||
|
||||
gatewayClient.On("Stat", mock.Anything, mock.Anything).Times(1).Return(&providerv1beta1.StatResponse{
|
||||
Status: status.NewInternal(ctx, "something failed"),
|
||||
}, targetErr)
|
||||
gatewayClient.EXPECT().Stat(mock.Anything, mock.Anything).Unset()
|
||||
|
||||
response, err := fc.DeleteFile(ctx, "newlock")
|
||||
Expect(err).To(HaveOccurred())
|
||||
@@ -1312,11 +1329,11 @@ var _ = Describe("FileConnector", func() {
|
||||
It("Delete fails status not ok, get lock fails", func() {
|
||||
ctx := middleware.WopiContextToCtx(context.Background(), wopiCtx)
|
||||
|
||||
targetErr := errors.New("Something went wrong")
|
||||
gatewayClient.On("Delete", mock.Anything, mock.Anything).Times(1).Return(&providerv1beta1.DeleteResponse{
|
||||
Status: status.NewInternal(ctx, "something failed"),
|
||||
}, nil)
|
||||
|
||||
targetErr := errors.New("Something went wrong")
|
||||
gatewayClient.On("GetLock", mock.Anything, mock.Anything).Times(1).Return(&providerv1beta1.GetLockResponse{
|
||||
Status: status.NewInternal(ctx, "something failed"),
|
||||
}, targetErr)
|
||||
@@ -1330,15 +1347,10 @@ var _ = Describe("FileConnector", func() {
|
||||
It("Delete fails file missing", func() {
|
||||
ctx := middleware.WopiContextToCtx(context.Background(), wopiCtx)
|
||||
|
||||
targetErr := errors.New("Something went wrong")
|
||||
gatewayClient.On("Delete", mock.Anything, mock.Anything).Times(1).Return(&providerv1beta1.DeleteResponse{
|
||||
Status: status.NewNotFound(ctx, "something failed"),
|
||||
}, nil)
|
||||
|
||||
gatewayClient.On("GetLock", mock.Anything, mock.Anything).Times(1).Return(&providerv1beta1.GetLockResponse{
|
||||
Status: status.NewInternal(ctx, "something failed"),
|
||||
}, targetErr)
|
||||
|
||||
response, err := fc.DeleteFile(ctx, "newlock")
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(response.Status).To(Equal(404))
|
||||
@@ -1416,6 +1428,7 @@ var _ = Describe("FileConnector", func() {
|
||||
|
||||
Describe("RenameFile", func() {
|
||||
It("No valid context", func() {
|
||||
gatewaySelector.EXPECT().Next().Unset()
|
||||
ctx := context.Background()
|
||||
response, err := fc.RenameFile(ctx, "lockid", "newFile.doc")
|
||||
Expect(err).To(HaveOccurred())
|
||||
@@ -1601,6 +1614,7 @@ var _ = Describe("FileConnector", func() {
|
||||
|
||||
Describe("CheckFileInfo", func() {
|
||||
It("No valid context", func() {
|
||||
gatewaySelector.EXPECT().Next().Unset()
|
||||
ctx := context.Background()
|
||||
response, err := fc.CheckFileInfo(ctx)
|
||||
Expect(err).To(HaveOccurred())
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"strconv"
|
||||
|
||||
gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
||||
"github.com/owncloud/ocis/v2/services/collaboration/pkg/config"
|
||||
"github.com/owncloud/ocis/v2/services/collaboration/pkg/connector/utf7"
|
||||
"github.com/owncloud/ocis/v2/services/collaboration/pkg/locks"
|
||||
@@ -43,11 +44,11 @@ type HttpAdapter struct {
|
||||
|
||||
// NewHttpAdapter will create a new HTTP adapter. A new connector using the
|
||||
// provided gateway API client and configuration will be used in the adapter
|
||||
func NewHttpAdapter(gwc gatewayv1beta1.GatewayAPIClient, cfg *config.Config) *HttpAdapter {
|
||||
func NewHttpAdapter(gws pool.Selectable[gatewayv1beta1.GatewayAPIClient], cfg *config.Config) *HttpAdapter {
|
||||
httpAdapter := &HttpAdapter{
|
||||
con: NewConnector(
|
||||
NewFileConnector(gwc, cfg),
|
||||
NewContentConnector(gwc, cfg),
|
||||
NewFileConnector(gws, cfg),
|
||||
NewContentConnector(gws, cfg),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
@@ -4,33 +4,10 @@ import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
|
||||
gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
|
||||
providerv1beta1 "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
||||
"github.com/cs3org/reva/v2/pkg/storagespace"
|
||||
"github.com/owncloud/ocis/v2/services/collaboration/pkg/config"
|
||||
)
|
||||
|
||||
var commonCS3ApiClient gatewayv1beta1.GatewayAPIClient
|
||||
|
||||
// GatewayAPIClient gets an instance based on the provided configuration.
|
||||
// The instance will be cached and returned if possible, unless the "forceNew"
|
||||
// parameter is set to true. In this case, the old instance will be replaced
|
||||
// with the new one if there is no error.
|
||||
func GetCS3apiClient(cfg *config.Config, forceNew bool) (gatewayv1beta1.GatewayAPIClient, error) {
|
||||
// establish a connection to the cs3 api endpoint
|
||||
// in this case a REVA gateway, started by oCIS
|
||||
if commonCS3ApiClient != nil && !forceNew {
|
||||
return commonCS3ApiClient, nil
|
||||
}
|
||||
|
||||
client, err := pool.GetGatewayServiceClient(cfg.CS3Api.Gateway.Name)
|
||||
if err == nil {
|
||||
commonCS3ApiClient = client
|
||||
}
|
||||
return client, err
|
||||
}
|
||||
|
||||
// HashResourceId builds a urlsafe and stable file reference that can be used for proxy routing,
|
||||
// so that all sessions on one file end on the same office server
|
||||
func HashResourceId(resourceId *providerv1beta1.ResourceId) string {
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
|
||||
rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
|
||||
"github.com/cs3org/reva/v2/pkg/mime"
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/version"
|
||||
@@ -32,7 +33,7 @@ func RegisterAppProvider(
|
||||
ctx context.Context,
|
||||
cfg *config.Config,
|
||||
logger log.Logger,
|
||||
gwc gatewayv1beta1.GatewayAPIClient,
|
||||
gws pool.Selectable[gatewayv1beta1.GatewayAPIClient],
|
||||
appUrls map[string]map[string]string,
|
||||
) error {
|
||||
mimeTypesMap := make(map[string]bool)
|
||||
@@ -65,7 +66,10 @@ func RegisterAppProvider(
|
||||
MimeTypes: mimeTypes,
|
||||
},
|
||||
}
|
||||
|
||||
gwc, err := gws.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := gwc.AddAppProvider(ctx, req)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("AddAppProvider failed")
|
||||
|
||||
Reference in New Issue
Block a user