diff --git a/server/internal/command/admin/storage/cmd.go b/server/internal/command/admin/storage/cmd.go index 88711194..bf8e3837 100644 --- a/server/internal/command/admin/storage/cmd.go +++ b/server/internal/command/admin/storage/cmd.go @@ -52,7 +52,7 @@ func setupCreateCommand() *cobra.Command { params[paramName] = val } - if err := storage.Get().CreateBackend(db.Get(context.Background()), name, driver, params); err != nil { + if err := storage.InsertBackend(db.Get(context.Background()), name, driver, params); err != nil { logrus.Fatal(err) } @@ -67,7 +67,7 @@ func setupListCommand() *cobra.Command { Short: "List all storage backends", Run: func(cmd *cobra.Command, args []string) { logrus.Info("Available storage backends:") - for k, v := range storage.Get().ListBackends() { + for k, v := range storage.ListBackends() { logrus.Info(fmt.Sprintf("%s: %s", k, v)) } }, diff --git a/server/internal/command/command.go b/server/internal/command/command.go index c47645ba..38ddce61 100644 --- a/server/internal/command/command.go +++ b/server/internal/command/command.go @@ -1,6 +1,7 @@ package command import ( + "context" "embed" "errors" "os" @@ -110,6 +111,8 @@ func SetupCommand() { mail.Cfg = cfg.Mail user.Cfg = cfg.User crypt.Cfg = cfg.Crypt + + storage.Initialize(db.Get(context.Background())) } defer func() { diff --git a/server/internal/command/config.defaults.yml b/server/internal/command/config.defaults.yml index 79a9096b..7c3eee3a 100644 --- a/server/internal/command/config.defaults.yml +++ b/server/internal/command/config.defaults.yml @@ -7,8 +7,8 @@ db: nomigrate: false # Do not auto-migrate schema (dev) trace: false # Trace queries (dev) -storage: - location: storage +fs: + storage: storage user: password: diff --git a/server/internal/core/fs/copy_move.go b/server/internal/core/fs/copy_move.go index 8dc29b14..ebd226a6 100644 --- a/server/internal/core/fs/copy_move.go +++ b/server/internal/core/fs/copy_move.go @@ -7,7 +7,6 @@ import ( "github.com/doug-martin/goqu/v9" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgtype" - "github.com/shroff/phylum/server/internal/core/storage" "github.com/sirupsen/logrus" ) @@ -147,7 +146,7 @@ func (r Resource) Copy(target string, id uuid.UUID, recursive bool, conflictReso } } children := make([]CreateResourcesParams, 0, len(tree)) - var contents []storage.ContentsCopy + var contents []copyParams newIDs := make(map[uuid.UUID]uuid.UUID) var targetRoot Resource @@ -177,7 +176,7 @@ func (r Resource) Copy(target string, id uuid.UUID, recursive bool, conflictReso if version, err := r.LatestVersion(); err != nil { logrus.Warn("Unable to copy " + r.id.String() + ": " + err.Error()) } else { - contents = append(contents, storage.ContentsCopy{ + contents = append(contents, copyParams{ SrcVersion: version.ID, DestResource: id, Size: version.Size, @@ -204,7 +203,7 @@ func (r Resource) Copy(target string, id uuid.UUID, recursive bool, conflictReso if version, err := src.LatestVersion(); err != nil { logrus.Warn("Unable to copy " + src.id.String() + ": " + err.Error()) } else { - contents = append(contents, storage.ContentsCopy{ + contents = append(contents, copyParams{ SrcVersion: version.ID, DestResource: id, Size: version.Size, @@ -230,10 +229,8 @@ func (r Resource) Copy(target string, id uuid.UUID, recursive bool, conflictReso if err == nil { func() { for _, c := range contents { - if id, err := r.f.cs.CopyContents(c); err != nil { + if err := r.f.CopyContents(c); err != nil { logrus.Warn("unable to copy " + c.SrcVersion.String() + " to " + c.DestResource.String() + ": " + err.Error()) - } else if err := r.f.createResourceVersion(c.DestResource, id, c.Size, c.MimeType, c.SHA256); err != nil { - logrus.Warn("unable to create version for " + c.DestResource.String() + ": " + err.Error()) } } @@ -250,3 +247,38 @@ func (r Resource) Copy(target string, id uuid.UUID, recursive bool, conflictReso targetRoot.userPermission = destParent.userPermission return targetRoot, deleted, err } + +type copyParams struct { + SrcVersion uuid.UUID + DestResource uuid.UUID + Size int64 + MimeType string + SHA256 string +} + +func (f filesystem) CopyContents(params copyParams) error { + versionID, _ := uuid.NewV7() + + if err := f.createResourceVersion(params.DestResource, versionID, params.Size, params.MimeType, params.SHA256); err != nil { + return errors.New("failed to create version for " + params.DestResource.String() + ": " + err.Error()) + } + + // TODO: #implement CopyContents + return errors.New("CopyContents not implemented") + + // in, err := f.OpenRead(params.SrcVersion, 0, -1) + // if err != nil { + // return uuid.Nil, errors.New("Unable to open " + params.SrcVersion.String() + ": " + err.Error()) + // } + // defer in.Close() + // id, _ := uuid.NewV7() + + // out, err := s.OpenWrite(id, nil, nil) + // if err != nil { + // return uuid.Nil, errors.New("Unable to open " + id.String() + ": " + err.Error()) + // } + // defer out.Close() + + // _, err = io.Copy(out, in) + // return id, err +} diff --git a/server/internal/core/fs/filesystem.go b/server/internal/core/fs/filesystem.go index 52dec3ce..f64671ac 100644 --- a/server/internal/core/fs/filesystem.go +++ b/server/internal/core/fs/filesystem.go @@ -3,12 +3,10 @@ package fs import ( "github.com/jackc/pgx/v5/pgtype" "github.com/shroff/phylum/server/internal/core/db" - "github.com/shroff/phylum/server/internal/core/storage" ) type filesystem struct { db db.Handler - cs storage.Storage pathRoot pgtype.UUID userID int32 fullAccess bool @@ -21,7 +19,6 @@ func (f filesystem) WithDb(db db.Handler) FileSystem { func (f filesystem) withDb(db db.Handler) filesystem { return filesystem{ db: db, - cs: f.cs, pathRoot: f.pathRoot, userID: f.userID, fullAccess: f.fullAccess, @@ -35,7 +32,6 @@ func (f filesystem) WithPathRoot(pathRoot pgtype.UUID) FileSystem { func (f filesystem) withPathRoot(pathRoot pgtype.UUID) filesystem { return filesystem{ db: f.db, - cs: f.cs, pathRoot: pathRoot, userID: f.userID, fullAccess: f.fullAccess, diff --git a/server/internal/core/fs/fs.go b/server/internal/core/fs/fs.go index 97d0e368..4f113091 100644 --- a/server/internal/core/fs/fs.go +++ b/server/internal/core/fs/fs.go @@ -12,7 +12,6 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/shroff/phylum/server/internal/core/db" - "github.com/shroff/phylum/server/internal/core/storage" "github.com/shroff/phylum/server/internal/core/util/crypt" "github.com/sirupsen/logrus" ) @@ -65,7 +64,6 @@ type FileSystem interface { func Open(ctx context.Context, userID int32, root pgtype.UUID, fullAccess bool) FileSystem { return filesystem{ db: db.Get(ctx), - cs: storage.Get(), userID: userID, pathRoot: root, fullAccess: fullAccess, @@ -75,7 +73,6 @@ func Open(ctx context.Context, userID int32, root pgtype.UUID, fullAccess bool) func OpenOmniscient(db db.Handler) FileSystem { return filesystem{ db: db, - cs: storage.Get(), userID: -1, pathRoot: pgtype.UUID{Bytes: rootID(), Valid: true}, fullAccess: true, @@ -147,7 +144,6 @@ func OpenFromPublink(ctx context.Context, id string, password string, path strin f := filesystem{ db: d, - cs: storage.Get(), pathRoot: pgtype.UUID{Bytes: link.Root, Valid: true}, fullAccess: true, // TODO: #permissions Replace with permissions int } diff --git a/server/internal/core/fs/open.go b/server/internal/core/fs/open.go index 80f8138f..5d7b0529 100644 --- a/server/internal/core/fs/open.go +++ b/server/internal/core/fs/open.go @@ -1,22 +1,35 @@ package fs import ( - "crypto/sha256" + "encoding/hex" + "errors" + "hash" "io" "github.com/doug-martin/goqu/v9" "github.com/google/uuid" "github.com/jackc/pgx/v5" + "github.com/shroff/phylum/server/internal/core/storage" ) func (r Resource) OpenRead(start, length int, versionID uuid.UUID) (io.ReadCloser, error) { if versionID != uuid.Nil { - return r.f.cs.OpenRead(versionID, start, length) + // TODO: #backend + return r.f.openRead("", versionID, start, length) } if version, err := r.LatestVersion(); err != nil { return nil, err } else { - return r.f.cs.OpenRead(version.ID, start, length) + // TODO: #backend + return r.f.openRead("", version.ID, start, length) + } +} + +func (f filesystem) openRead(backend string, versionID uuid.UUID, start, length int) (io.ReadCloser, error) { + if b := storage.GetBackend(backend); b == nil { + return nil, errors.New("storage backend not found: " + backend) + } else { + return b.OpenRead(versionID, start, length) } } @@ -30,14 +43,19 @@ func (r Resource) OpenWrite(versionID uuid.UUID) (io.WriteCloser, error) { if versionID == uuid.Nil { versionID, _ = uuid.NewV7() } - return r.f.cs.OpenWrite(versionID, sha256.New, func(len int, sum, mime string) error { - return r.f.runInTx(func(f filesystem) error { - if err := f.createResourceVersion(r.id, versionID, int64(len), mime, sum); err != nil { - return err - } - return f.updateResourceModified(r.id) - }) - }) + if dest, err := storage.DefaultBackend().OpenWrite(versionID); err != nil { + return nil, err + } else { + return computeProps(dest, func(len int, hash hash.Hash, mimeType string) error { + sum := hex.EncodeToString(hash.Sum(nil)) + return r.f.runInTx(func(f filesystem) error { + if err := f.createResourceVersion(r.id, versionID, int64(len), mimeType, sum); err != nil { + return err + } + return f.updateResourceModified(r.id) + }) + }), nil + } } func (r Resource) ReadDir(recursive bool) ([]Resource, error) { diff --git a/server/internal/core/fs/resource_delete.go b/server/internal/core/fs/resource_delete.go index b555fbc2..7000e358 100644 --- a/server/internal/core/fs/resource_delete.go +++ b/server/internal/core/fs/resource_delete.go @@ -10,7 +10,6 @@ import ( "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" - "github.com/sirupsen/logrus" ) func (r Resource) DeleteChildRecursive(name string) (Resource, error) { @@ -176,10 +175,7 @@ func (f filesystem) deleteRecursive(id, parent uuid.UUID, softDelete, preserveRo }) if err == nil && !softDelete { - errors := f.cs.DeleteAll(ids) - for _, err := range errors { - logrus.Warn(err) - } + deleteResourcesContents(ids) } return err @@ -240,3 +236,14 @@ func (f filesystem) markNotDeleted(id uuid.UUID) ([]int64, error) { } return items, nil } + +func deleteResourcesContents(id uuid.UUIDs) { + // TODO: #implement +} + +func deleteResourceContents(id uuid.UUID) { + // TODO: #implement +} +func deleteVersionContents(id uuid.UUID) { + // TODO: #implement +} diff --git a/server/internal/core/fs/trash_compact.go b/server/internal/core/fs/trash_compact.go index b6091009..a14c6420 100644 --- a/server/internal/core/fs/trash_compact.go +++ b/server/internal/core/fs/trash_compact.go @@ -7,7 +7,6 @@ import ( "github.com/doug-martin/goqu/v9" "github.com/shroff/phylum/server/internal/core/db" - "github.com/shroff/phylum/server/internal/core/storage" "github.com/sirupsen/logrus" ) @@ -31,9 +30,6 @@ func hardDeleteOldResources(ctx context.Context, t time.Time) (int, error) { if err != nil { return 0, err } - errors := storage.Get().DeleteAll(ids) - for _, err := range errors { - logrus.Warn(err) - } + deleteResourcesContents(ids) return count, err } diff --git a/server/internal/core/fs/trash_summary_empty.go b/server/internal/core/fs/trash_summary_empty.go index ff10fd36..60886dfa 100644 --- a/server/internal/core/fs/trash_summary_empty.go +++ b/server/internal/core/fs/trash_summary_empty.go @@ -2,7 +2,6 @@ package fs import ( "github.com/doug-martin/goqu/v9" - "github.com/shroff/phylum/server/internal/core/storage" ) func (f filesystem) selectDeletedResources() *goqu.SelectDataset { @@ -35,6 +34,6 @@ func (f filesystem) TrashEmpty() (int, error) { Returning("id", "dir") query, args, _ := q.ToSQL() count, ids, err := collectNonDirResourceIDs(f.db.Query(query, args...)) - storage.Get().DeleteAll(ids) + deleteResourcesContents(ids) return count, err } diff --git a/server/internal/core/fs/util.go b/server/internal/core/fs/util.go new file mode 100644 index 00000000..85171f1c --- /dev/null +++ b/server/internal/core/fs/util.go @@ -0,0 +1,54 @@ +package fs + +import ( + "crypto/sha256" + "hash" + "io" + + "github.com/gabriel-vasile/mimetype" +) + +type contentPropsComputer struct { + dest io.WriteCloser + len int + sum hash.Hash + contents []byte + successCallback func(int, hash.Hash, string) error +} + +func computeProps(dest io.WriteCloser, successCallback func(int, hash.Hash, string) error) *contentPropsComputer { + return &contentPropsComputer{ + dest: dest, + sum: sha256.New(), + contents: make([]byte, 0, 3072), + successCallback: successCallback, + } +} + +func (c *contentPropsComputer) Write(p []byte) (n int, err error) { + n, err = c.dest.Write(p) + c.len += n + if c.sum != nil { + c.sum.Write(p) + } + remain := min(cap(c.contents)-len(c.contents), len(p)) + if remain > 0 { + c.contents = append(c.contents, p[:remain]...) + } + return +} + +func min(a, b int) int { + if b < a { + return b + } + return a + +} + +func (c *contentPropsComputer) Close() error { + if err := c.dest.Close(); err != nil { + return err + } + return c.successCallback(c.len, c.sum, mimetype.Detect(c.contents).String()) +} diff --git a/server/internal/core/storage/drivers.go b/server/internal/core/storage/drivers.go index 16a8097c..f132455d 100644 --- a/server/internal/core/storage/drivers.go +++ b/server/internal/core/storage/drivers.go @@ -5,7 +5,9 @@ import "errors" var drivers = map[string]Driver{ "local": { Params: []string{"root"}, - Create: createLocalBackend, + Create: func(params map[string]string) (Backend, error) { + return createLocalBackend(params) + }, }, "minio": { Params: []string{"endpoint", "key_id", "access_key", "tmp", "bucket_name", "prefix"}, @@ -15,7 +17,7 @@ var drivers = map[string]Driver{ type Driver struct { Params []string - Create func(params map[string]string, validate bool) (Backend, error) + Create func(params map[string]string) (Backend, error) } func FindDriver(driverName string) (Driver, error) { diff --git a/server/internal/core/storage/io.go b/server/internal/core/storage/io.go index 67560d11..791b5130 100644 --- a/server/internal/core/storage/io.go +++ b/server/internal/core/storage/io.go @@ -1,63 +1,20 @@ package storage import ( - "hash" "io" - - "github.com/gabriel-vasile/mimetype" ) -type hasher struct { - dest io.WriteCloser - len int - sum hash.Hash - contents []byte - closeCallback func(int, hash.Hash, string) error -} - -func (c *hasher) Write(p []byte) (n int, err error) { - n, err = c.dest.Write(p) - c.len += n - if c.sum != nil { - c.sum.Write(p) - } - remain := min(cap(c.contents)-len(c.contents), len(p)) - if remain > 0 { - c.contents = append(c.contents, p[:remain]...) - } - return -} - -func min(a, b int) int { - if b < a { - return b - } - return a - -} - -func (c *hasher) Close() error { - if err := c.dest.Close(); err != nil { - return err - } - return c.closeCallback(c.len, c.sum, mimetype.Detect(c.contents).String()) -} - type callbackWriteCloser struct { dest io.WriteCloser - len int - closeCallback func(int) error + closeCallback func(error) error } func (c *callbackWriteCloser) Write(p []byte) (n int, err error) { n, err = c.dest.Write(p) - c.len += n return } func (c *callbackWriteCloser) Close() error { - if err := c.dest.Close(); err != nil { - return err - } - return c.closeCallback(c.len) + err := c.dest.Close() + return c.closeCallback(err) } diff --git a/server/internal/core/storage/local_storage.go b/server/internal/core/storage/local_storage.go index c4e117fa..da663746 100644 --- a/server/internal/core/storage/local_storage.go +++ b/server/internal/core/storage/local_storage.go @@ -12,19 +12,27 @@ import ( type localStorage struct { root string + tmp string } -func createLocalBackend(params map[string]string, validateParams bool) (Backend, error) { - if validateParams { - if params["root"] == "" { - return nil, errors.New("local storage root not provided") - } - err := os.MkdirAll(params["root"], 0750) - if err != nil { - return nil, err - } +func createLocalBackend(params map[string]string) (LocalBackend, error) { + if params["root"] == "" { + return nil, errors.New("local storage root not provided") } - return localStorage{root: params["root"]}, nil + + l := localStorage{ + root: params["root"], + tmp: filepath.Join(params["root"], "tmp"), + } + + if err := os.RemoveAll(l.tmp); err != nil { + return nil, errors.New("failed to remove tmp directory: " + err.Error()) + } + if err := os.MkdirAll(l.tmp, 0750); err != nil { + return nil, errors.New("failed to create tmp directory: " + err.Error()) + } + + return l, nil } func (l localStorage) OpenRead(id uuid.UUID, start, length int) (io.ReadCloser, error) { @@ -40,7 +48,18 @@ func (l localStorage) OpenRead(id uuid.UUID, start, length int) (io.ReadCloser, } func (l localStorage) OpenWrite(id uuid.UUID) (io.WriteCloser, error) { - return os.OpenFile(l.path(id), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0640) + f, err := os.CreateTemp(l.tmp, "upload-*") + tmpFilePath := filepath.Join(l.tmp, f.Name()) + if err != nil { + return nil, err + } + return &callbackWriteCloser{dest: f, closeCallback: func(err error) error { + if err != nil { + os.Remove(tmpFilePath) + return err + } + return os.Rename(tmpFilePath, l.path(id)) + }}, nil } func (l localStorage) Delete(id uuid.UUID) error { diff --git a/server/internal/core/storage/minio_storage.go b/server/internal/core/storage/minio_storage.go index d09c1348..a669debd 100644 --- a/server/internal/core/storage/minio_storage.go +++ b/server/internal/core/storage/minio_storage.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "os" - "path/filepath" "github.com/google/uuid" "github.com/minio/minio-go/v7" @@ -22,28 +21,27 @@ type minioStorage struct { prefix string } -func createMinioBackend(params map[string]string, validateParams bool) (Backend, error) { - if validateParams { - if params["endpoint"] == "" { - return nil, errors.New("minio endpoint not provided") - } - if params["key_id"] == "" { - return nil, errors.New("minio key id not provided") - } - if params["access_key"] == "" { - return nil, errors.New("minio access key not provided") - } - if params["tmp"] == "" { - return nil, errors.New("minio tmp not provided") - } - if params["bucket_name"] == "" { - return nil, errors.New("minio bucket name not provided") - } - err := os.MkdirAll(params["tmp"], 0750) - if err != nil { - return nil, err - } +func createMinioBackend(params map[string]string) (Backend, error) { + if params["endpoint"] == "" { + return nil, errors.New("minio endpoint not provided") } + if params["key_id"] == "" { + return nil, errors.New("minio key id not provided") + } + if params["access_key"] == "" { + return nil, errors.New("minio access key not provided") + } + if params["tmp"] == "" { + return nil, errors.New("minio tmp not provided") + } + if params["bucket_name"] == "" { + return nil, errors.New("minio bucket name not provided") + } + err := os.MkdirAll(params["tmp"], 0750) + if err != nil { + return nil, err + } + client, err := minio.New(params["endpoint"], &minio.Options{ Creds: credentials.NewStaticV4(params["key_id"], params["access_key"], ""), Secure: true, @@ -63,19 +61,23 @@ func (s minioStorage) OpenRead(id uuid.UUID, start, length int) (io.ReadCloser, } } -func (s minioStorage) OpenWrite(id uuid.UUID) (io.WriteCloser, error) { - path := filepath.Join(s.tmp, id.String()) - file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0640) - if err != nil { - return nil, err - } - return &callbackWriteCloser{dest: file, closeCallback: func(len int) error { - _, err = s.client.FPutObject(context.Background(), s.bucketName, s.prefix+id.String(), path, minio.PutObjectOptions{Progress: &progressReader{len: len, callback: func() { - os.Remove(path) - }}}) - return err - }}, nil -} +// func (s minioStorage) OpenWrite(id uuid.UUID) (io.WriteCloser, error) { +// path := filepath.Join(s.tmp, id.String()) +// file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0640) +// if err != nil { +// return nil, err +// } +// return &callbackWriteCloser{dest: file, closeCallback: func(err error, len int) error { +// if err != nil { +// os.Remove(path) +// return err +// } +// _, err = s.client.FPutObject(context.Background(), s.bucketName, s.prefix+id.String(), path, minio.PutObjectOptions{Progress: &progressReader{len: len, callback: func() { +// os.Remove(path) +// }}}) +// return err +// }}, nil +// } func (s minioStorage) Delete(id uuid.UUID) error { return s.client.RemoveObject(context.Background(), s.bucketName, s.prefix+id.String(), minio.RemoveObjectOptions{}) @@ -103,20 +105,20 @@ func (s minioStorage) String() string { return fmt.Sprintf("minio (endpoint: %s, bucket: %s, prefix: %s, tmp: %s)", s.client.EndpointURL(), s.bucketName, s.prefix, s.tmp) } -type progressReader struct { - cur, len int - callback func() -} +// type progressReader struct { +// cur, len int +// callback func() +// } -func (r *progressReader) Read(p []byte) (n int, err error) { - max := r.len - r.cur - size := len(p) - if size > max { - size = max - } - r.cur += size - if r.cur == r.len && size == 0 { - r.callback() - } - return size, nil -} +// func (r *progressReader) Read(p []byte) (n int, err error) { +// max := r.len - r.cur +// size := len(p) +// if size > max { +// size = max +// } +// r.cur += size +// if r.cur == r.len && size == 0 { +// r.callback() +// } +// return size, nil +// } diff --git a/server/internal/core/storage/storage.go b/server/internal/core/storage/storage.go index df2b5e0b..f1a0f00d 100644 --- a/server/internal/core/storage/storage.go +++ b/server/internal/core/storage/storage.go @@ -2,54 +2,71 @@ package storage import ( "context" - "encoding/hex" "encoding/json" "errors" - "hash" - "io" "os" "path" - "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/shroff/phylum/server/internal/core/db" "github.com/sirupsen/logrus" ) -type Storage struct { - backends map[string]Backend - defaultBackend Backend -} - -var s Storage var Cfg Config +var defaultBackend LocalBackend +var backends map[string]Backend -func Get() Storage { - if s.defaultBackend == nil { - if storage, err := create(Cfg.Location); err != nil { - logrus.Fatal(err) - } else { - s = storage - } - go processBackendUpdates() - } - return s +type BackendConfig struct { + Name string `json:"name"` + Driver string `json:"driver"` + Params map[string]string `json:"params"` } -func create(storageLocation string) (Storage, error) { - if err := os.Chmod(storageLocation, 0700); err != nil { - return Storage{}, errors.New("failed to set permissions: " + err.Error()) +func Initialize(db db.Handler) error { + if err := os.Chmod(Cfg.Location, 0700); err != nil { + return errors.New("failed to set permissions: " + err.Error()) } - if backends, err := restoreBackends(); err != nil { - return Storage{}, errors.New("failed to restore backends: " + err.Error()) - } else if defaultBackend, err := createLocalBackend(map[string]string{"root": path.Join(storageLocation, "default")}, true); err != nil { - return Storage{}, errors.New("failed to create default backend: " + err.Error()) + if restoredBackends, err := restoreBackends(); err != nil { + return errors.New("failed to restore backends: " + err.Error()) + } else if b, err := createLocalBackend(map[string]string{"root": path.Join(Cfg.Location, "default")}); err != nil { + return errors.New("failed to create default backend: " + err.Error()) } else { - return Storage{ - backends: backends, - defaultBackend: defaultBackend, - }, nil + restoredBackends[""] = b + backends = restoredBackends + defaultBackend = b } + + go processBackendUpdates(db) + return nil +} + +func DefaultBackend() LocalBackend { + return defaultBackend +} + +func GetBackend(name string) Backend { + return backends[name] +} + +func ListBackends() map[string]Backend { + return backends +} + +func InsertBackend(db db.Handler, name string, driver Driver, params map[string]string) error { + backend, err := driver.Create(params) + if err != nil { + return nil + } + const q = "INSERT INTO storage_backends(name, driver, params) VALUES ($1::TEXT, $2::TEXT, $3::JSONB)" + p, err := json.Marshal(params) + if err != nil { + return err + } + if _, err := db.Exec(q, name, driver, p); err != nil { + return err + } + backends[name] = backend + return nil } func restoreBackends() (map[string]Backend, error) { @@ -83,8 +100,8 @@ func restoreBackends() (map[string]Backend, error) { } } -func processBackendUpdates() { - sub := db.Get(context.Background()).Notifier().Listen("backend_updates") +func processBackendUpdates(db db.Handler) { + sub := db.Notifier().Listen("backend_updates") for { p := <-sub.NotificationC() var c BackendConfig @@ -93,120 +110,15 @@ func processBackendUpdates() { } else if b, err := c.open(); err != nil { logrus.Warn("failed to open backend: " + err.Error()) } else { - s.backends[c.Name] = b + backends[c.Name] = b } } } -func (s Storage) OpenRead(id uuid.UUID, start, length int) (io.ReadCloser, error) { - if backend, err := s.findStorageBackend(id); err != nil { - return nil, err - } else { - return backend.OpenRead(id, start, length) - } -} - -func (s Storage) OpenWrite(id uuid.UUID, h func() hash.Hash, callback func(int, string, string) error) (io.WriteCloser, error) { - if backend, err := s.findStorageBackend(id); err != nil { - return nil, err - } else if out, err := backend.OpenWrite(id); err != nil { - return nil, err - } else if callback == nil { - return out, nil - } else { - return &hasher{dest: out, sum: h(), contents: make([]byte, 0, 10*1024), closeCallback: func(len int, sum hash.Hash, mime string) error { - etag := hex.EncodeToString(sum.Sum(nil)) - return callback(len, etag, mime) - }}, nil - } -} - -type ContentsCopy struct { - SrcVersion uuid.UUID - DestResource uuid.UUID - Size int64 - MimeType string - SHA256 string -} - -func (s Storage) CopyContents(params ContentsCopy) (uuid.UUID, error) { - in, err := s.OpenRead(params.SrcVersion, 0, -1) - if err != nil { - return uuid.Nil, errors.New("Unable to open " + params.SrcVersion.String() + ": " + err.Error()) - } - defer in.Close() - id, _ := uuid.NewV7() - - out, err := s.OpenWrite(id, nil, nil) - if err != nil { - return uuid.Nil, errors.New("Unable to open " + id.String() + ": " + err.Error()) - } - defer out.Close() - - _, err = io.Copy(out, in) - return id, err - -} - -func (s Storage) Delete(id uuid.UUID) error { - if backend, err := s.findStorageBackend(id); err != nil { - return err - } else { - return backend.Delete(id) - } -} - -func (s Storage) DeleteAll(ids []uuid.UUID) []error { - // TODO: Batch delete - var errs []error - for _, id := range ids { - err := s.Delete(id) - if err != nil { - errs = append(errs, err) - } - } - return errs -} - -func (s Storage) findStorageBackend(id uuid.UUID) (Backend, error) { - // TODO: implement - return s.defaultBackend, nil -} - -func (s Storage) CreateBackend(db db.Handler, name string, driver Driver, params map[string]string) error { - backend, err := driver.Create(params, true) - if err != nil { - return nil - } - const q = "INSERT INTO storage_backends(name, driver, params) VALUES ($1::TEXT, $2::TEXT, $3::JSONB)" - p, err := json.Marshal(params) - if err != nil { - return err - } - if _, err := db.Exec(q, name, driver, p); err != nil { - return err - } - s.backends[name] = backend - return nil -} - -func (s Storage) ListBackends() map[string]Backend { - return s.backends -} - -type BackendConfig struct { - Name string `json:"name"` - Driver string `json:"driver"` - Params map[string]string `json:"params"` -} - func (c BackendConfig) open() (Backend, error) { if driver, err := FindDriver(c.Driver); err != nil { return nil, err - } else if backend, err := driver.Create(c.Params, false); err != nil { - return nil, err } else { - return backend, nil + return driver.Create(c.Params) } - } diff --git a/server/internal/core/storage/storage_backend.go b/server/internal/core/storage/storage_backend.go index c2117d2a..277f0528 100644 --- a/server/internal/core/storage/storage_backend.go +++ b/server/internal/core/storage/storage_backend.go @@ -8,8 +8,12 @@ import ( type Backend interface { OpenRead(id uuid.UUID, start, length int) (io.ReadCloser, error) - OpenWrite(id uuid.UUID) (io.WriteCloser, error) Delete(id uuid.UUID) error DeleteAll(ids uuid.UUIDs) []error String() string } + +type LocalBackend interface { + Backend + OpenWrite(id uuid.UUID) (io.WriteCloser, error) +}