mirror of
https://codeberg.org/shroff/phylum.git
synced 2026-01-05 19:21:23 -06:00
[server] Always write to default storage backend first
This commit is contained in:
@@ -52,7 +52,7 @@ func setupCreateCommand() *cobra.Command {
|
||||
params[paramName] = val
|
||||
}
|
||||
|
||||
if err := storage.Get().CreateBackend(db.Get(context.Background()), name, driver, params); err != nil {
|
||||
if err := storage.InsertBackend(db.Get(context.Background()), name, driver, params); err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -67,7 +67,7 @@ func setupListCommand() *cobra.Command {
|
||||
Short: "List all storage backends",
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
logrus.Info("Available storage backends:")
|
||||
for k, v := range storage.Get().ListBackends() {
|
||||
for k, v := range storage.ListBackends() {
|
||||
logrus.Info(fmt.Sprintf("%s: %s", k, v))
|
||||
}
|
||||
},
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"context"
|
||||
"embed"
|
||||
"errors"
|
||||
"os"
|
||||
@@ -110,6 +111,8 @@ func SetupCommand() {
|
||||
mail.Cfg = cfg.Mail
|
||||
user.Cfg = cfg.User
|
||||
crypt.Cfg = cfg.Crypt
|
||||
|
||||
storage.Initialize(db.Get(context.Background()))
|
||||
}
|
||||
|
||||
defer func() {
|
||||
|
||||
@@ -7,8 +7,8 @@ db:
|
||||
nomigrate: false # Do not auto-migrate schema (dev)
|
||||
trace: false # Trace queries (dev)
|
||||
|
||||
storage:
|
||||
location: storage
|
||||
fs:
|
||||
storage: storage
|
||||
|
||||
user:
|
||||
password:
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"github.com/doug-martin/goqu/v9"
|
||||
"github.com/google/uuid"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/shroff/phylum/server/internal/core/storage"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@@ -147,7 +146,7 @@ func (r Resource) Copy(target string, id uuid.UUID, recursive bool, conflictReso
|
||||
}
|
||||
}
|
||||
children := make([]CreateResourcesParams, 0, len(tree))
|
||||
var contents []storage.ContentsCopy
|
||||
var contents []copyParams
|
||||
newIDs := make(map[uuid.UUID]uuid.UUID)
|
||||
|
||||
var targetRoot Resource
|
||||
@@ -177,7 +176,7 @@ func (r Resource) Copy(target string, id uuid.UUID, recursive bool, conflictReso
|
||||
if version, err := r.LatestVersion(); err != nil {
|
||||
logrus.Warn("Unable to copy " + r.id.String() + ": " + err.Error())
|
||||
} else {
|
||||
contents = append(contents, storage.ContentsCopy{
|
||||
contents = append(contents, copyParams{
|
||||
SrcVersion: version.ID,
|
||||
DestResource: id,
|
||||
Size: version.Size,
|
||||
@@ -204,7 +203,7 @@ func (r Resource) Copy(target string, id uuid.UUID, recursive bool, conflictReso
|
||||
if version, err := src.LatestVersion(); err != nil {
|
||||
logrus.Warn("Unable to copy " + src.id.String() + ": " + err.Error())
|
||||
} else {
|
||||
contents = append(contents, storage.ContentsCopy{
|
||||
contents = append(contents, copyParams{
|
||||
SrcVersion: version.ID,
|
||||
DestResource: id,
|
||||
Size: version.Size,
|
||||
@@ -230,10 +229,8 @@ func (r Resource) Copy(target string, id uuid.UUID, recursive bool, conflictReso
|
||||
if err == nil {
|
||||
func() {
|
||||
for _, c := range contents {
|
||||
if id, err := r.f.cs.CopyContents(c); err != nil {
|
||||
if err := r.f.CopyContents(c); err != nil {
|
||||
logrus.Warn("unable to copy " + c.SrcVersion.String() + " to " + c.DestResource.String() + ": " + err.Error())
|
||||
} else if err := r.f.createResourceVersion(c.DestResource, id, c.Size, c.MimeType, c.SHA256); err != nil {
|
||||
logrus.Warn("unable to create version for " + c.DestResource.String() + ": " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -250,3 +247,38 @@ func (r Resource) Copy(target string, id uuid.UUID, recursive bool, conflictReso
|
||||
targetRoot.userPermission = destParent.userPermission
|
||||
return targetRoot, deleted, err
|
||||
}
|
||||
|
||||
type copyParams struct {
|
||||
SrcVersion uuid.UUID
|
||||
DestResource uuid.UUID
|
||||
Size int64
|
||||
MimeType string
|
||||
SHA256 string
|
||||
}
|
||||
|
||||
func (f filesystem) CopyContents(params copyParams) error {
|
||||
versionID, _ := uuid.NewV7()
|
||||
|
||||
if err := f.createResourceVersion(params.DestResource, versionID, params.Size, params.MimeType, params.SHA256); err != nil {
|
||||
return errors.New("failed to create version for " + params.DestResource.String() + ": " + err.Error())
|
||||
}
|
||||
|
||||
// TODO: #implement CopyContents
|
||||
return errors.New("CopyContents not implemented")
|
||||
|
||||
// in, err := f.OpenRead(params.SrcVersion, 0, -1)
|
||||
// if err != nil {
|
||||
// return uuid.Nil, errors.New("Unable to open " + params.SrcVersion.String() + ": " + err.Error())
|
||||
// }
|
||||
// defer in.Close()
|
||||
// id, _ := uuid.NewV7()
|
||||
|
||||
// out, err := s.OpenWrite(id, nil, nil)
|
||||
// if err != nil {
|
||||
// return uuid.Nil, errors.New("Unable to open " + id.String() + ": " + err.Error())
|
||||
// }
|
||||
// defer out.Close()
|
||||
|
||||
// _, err = io.Copy(out, in)
|
||||
// return id, err
|
||||
}
|
||||
|
||||
@@ -3,12 +3,10 @@ package fs
|
||||
import (
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/shroff/phylum/server/internal/core/db"
|
||||
"github.com/shroff/phylum/server/internal/core/storage"
|
||||
)
|
||||
|
||||
type filesystem struct {
|
||||
db db.Handler
|
||||
cs storage.Storage
|
||||
pathRoot pgtype.UUID
|
||||
userID int32
|
||||
fullAccess bool
|
||||
@@ -21,7 +19,6 @@ func (f filesystem) WithDb(db db.Handler) FileSystem {
|
||||
func (f filesystem) withDb(db db.Handler) filesystem {
|
||||
return filesystem{
|
||||
db: db,
|
||||
cs: f.cs,
|
||||
pathRoot: f.pathRoot,
|
||||
userID: f.userID,
|
||||
fullAccess: f.fullAccess,
|
||||
@@ -35,7 +32,6 @@ func (f filesystem) WithPathRoot(pathRoot pgtype.UUID) FileSystem {
|
||||
func (f filesystem) withPathRoot(pathRoot pgtype.UUID) filesystem {
|
||||
return filesystem{
|
||||
db: f.db,
|
||||
cs: f.cs,
|
||||
pathRoot: pathRoot,
|
||||
userID: f.userID,
|
||||
fullAccess: f.fullAccess,
|
||||
|
||||
@@ -12,7 +12,6 @@ import (
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/shroff/phylum/server/internal/core/db"
|
||||
"github.com/shroff/phylum/server/internal/core/storage"
|
||||
"github.com/shroff/phylum/server/internal/core/util/crypt"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
@@ -65,7 +64,6 @@ type FileSystem interface {
|
||||
func Open(ctx context.Context, userID int32, root pgtype.UUID, fullAccess bool) FileSystem {
|
||||
return filesystem{
|
||||
db: db.Get(ctx),
|
||||
cs: storage.Get(),
|
||||
userID: userID,
|
||||
pathRoot: root,
|
||||
fullAccess: fullAccess,
|
||||
@@ -75,7 +73,6 @@ func Open(ctx context.Context, userID int32, root pgtype.UUID, fullAccess bool)
|
||||
func OpenOmniscient(db db.Handler) FileSystem {
|
||||
return filesystem{
|
||||
db: db,
|
||||
cs: storage.Get(),
|
||||
userID: -1,
|
||||
pathRoot: pgtype.UUID{Bytes: rootID(), Valid: true},
|
||||
fullAccess: true,
|
||||
@@ -147,7 +144,6 @@ func OpenFromPublink(ctx context.Context, id string, password string, path strin
|
||||
|
||||
f := filesystem{
|
||||
db: d,
|
||||
cs: storage.Get(),
|
||||
pathRoot: pgtype.UUID{Bytes: link.Root, Valid: true},
|
||||
fullAccess: true, // TODO: #permissions Replace with permissions int
|
||||
}
|
||||
|
||||
@@ -1,22 +1,35 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"hash"
|
||||
"io"
|
||||
|
||||
"github.com/doug-martin/goqu/v9"
|
||||
"github.com/google/uuid"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/shroff/phylum/server/internal/core/storage"
|
||||
)
|
||||
|
||||
func (r Resource) OpenRead(start, length int, versionID uuid.UUID) (io.ReadCloser, error) {
|
||||
if versionID != uuid.Nil {
|
||||
return r.f.cs.OpenRead(versionID, start, length)
|
||||
// TODO: #backend
|
||||
return r.f.openRead("", versionID, start, length)
|
||||
}
|
||||
if version, err := r.LatestVersion(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return r.f.cs.OpenRead(version.ID, start, length)
|
||||
// TODO: #backend
|
||||
return r.f.openRead("", version.ID, start, length)
|
||||
}
|
||||
}
|
||||
|
||||
func (f filesystem) openRead(backend string, versionID uuid.UUID, start, length int) (io.ReadCloser, error) {
|
||||
if b := storage.GetBackend(backend); b == nil {
|
||||
return nil, errors.New("storage backend not found: " + backend)
|
||||
} else {
|
||||
return b.OpenRead(versionID, start, length)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,14 +43,19 @@ func (r Resource) OpenWrite(versionID uuid.UUID) (io.WriteCloser, error) {
|
||||
if versionID == uuid.Nil {
|
||||
versionID, _ = uuid.NewV7()
|
||||
}
|
||||
return r.f.cs.OpenWrite(versionID, sha256.New, func(len int, sum, mime string) error {
|
||||
return r.f.runInTx(func(f filesystem) error {
|
||||
if err := f.createResourceVersion(r.id, versionID, int64(len), mime, sum); err != nil {
|
||||
return err
|
||||
}
|
||||
return f.updateResourceModified(r.id)
|
||||
})
|
||||
})
|
||||
if dest, err := storage.DefaultBackend().OpenWrite(versionID); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return computeProps(dest, func(len int, hash hash.Hash, mimeType string) error {
|
||||
sum := hex.EncodeToString(hash.Sum(nil))
|
||||
return r.f.runInTx(func(f filesystem) error {
|
||||
if err := f.createResourceVersion(r.id, versionID, int64(len), mimeType, sum); err != nil {
|
||||
return err
|
||||
}
|
||||
return f.updateResourceModified(r.id)
|
||||
})
|
||||
}), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (r Resource) ReadDir(recursive bool) ([]Resource, error) {
|
||||
|
||||
@@ -10,7 +10,6 @@ import (
|
||||
"github.com/google/uuid"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func (r Resource) DeleteChildRecursive(name string) (Resource, error) {
|
||||
@@ -176,10 +175,7 @@ func (f filesystem) deleteRecursive(id, parent uuid.UUID, softDelete, preserveRo
|
||||
})
|
||||
|
||||
if err == nil && !softDelete {
|
||||
errors := f.cs.DeleteAll(ids)
|
||||
for _, err := range errors {
|
||||
logrus.Warn(err)
|
||||
}
|
||||
deleteResourcesContents(ids)
|
||||
}
|
||||
|
||||
return err
|
||||
@@ -240,3 +236,14 @@ func (f filesystem) markNotDeleted(id uuid.UUID) ([]int64, error) {
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
func deleteResourcesContents(id uuid.UUIDs) {
|
||||
// TODO: #implement
|
||||
}
|
||||
|
||||
func deleteResourceContents(id uuid.UUID) {
|
||||
// TODO: #implement
|
||||
}
|
||||
func deleteVersionContents(id uuid.UUID) {
|
||||
// TODO: #implement
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
|
||||
"github.com/doug-martin/goqu/v9"
|
||||
"github.com/shroff/phylum/server/internal/core/db"
|
||||
"github.com/shroff/phylum/server/internal/core/storage"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@@ -31,9 +30,6 @@ func hardDeleteOldResources(ctx context.Context, t time.Time) (int, error) {
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
errors := storage.Get().DeleteAll(ids)
|
||||
for _, err := range errors {
|
||||
logrus.Warn(err)
|
||||
}
|
||||
deleteResourcesContents(ids)
|
||||
return count, err
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package fs
|
||||
|
||||
import (
|
||||
"github.com/doug-martin/goqu/v9"
|
||||
"github.com/shroff/phylum/server/internal/core/storage"
|
||||
)
|
||||
|
||||
func (f filesystem) selectDeletedResources() *goqu.SelectDataset {
|
||||
@@ -35,6 +34,6 @@ func (f filesystem) TrashEmpty() (int, error) {
|
||||
Returning("id", "dir")
|
||||
query, args, _ := q.ToSQL()
|
||||
count, ids, err := collectNonDirResourceIDs(f.db.Query(query, args...))
|
||||
storage.Get().DeleteAll(ids)
|
||||
deleteResourcesContents(ids)
|
||||
return count, err
|
||||
}
|
||||
|
||||
54
server/internal/core/fs/util.go
Normal file
54
server/internal/core/fs/util.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"hash"
|
||||
"io"
|
||||
|
||||
"github.com/gabriel-vasile/mimetype"
|
||||
)
|
||||
|
||||
type contentPropsComputer struct {
|
||||
dest io.WriteCloser
|
||||
len int
|
||||
sum hash.Hash
|
||||
contents []byte
|
||||
successCallback func(int, hash.Hash, string) error
|
||||
}
|
||||
|
||||
func computeProps(dest io.WriteCloser, successCallback func(int, hash.Hash, string) error) *contentPropsComputer {
|
||||
return &contentPropsComputer{
|
||||
dest: dest,
|
||||
sum: sha256.New(),
|
||||
contents: make([]byte, 0, 3072),
|
||||
successCallback: successCallback,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *contentPropsComputer) Write(p []byte) (n int, err error) {
|
||||
n, err = c.dest.Write(p)
|
||||
c.len += n
|
||||
if c.sum != nil {
|
||||
c.sum.Write(p)
|
||||
}
|
||||
remain := min(cap(c.contents)-len(c.contents), len(p))
|
||||
if remain > 0 {
|
||||
c.contents = append(c.contents, p[:remain]...)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func min(a, b int) int {
|
||||
if b < a {
|
||||
return b
|
||||
}
|
||||
return a
|
||||
|
||||
}
|
||||
|
||||
func (c *contentPropsComputer) Close() error {
|
||||
if err := c.dest.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return c.successCallback(c.len, c.sum, mimetype.Detect(c.contents).String())
|
||||
}
|
||||
@@ -5,7 +5,9 @@ import "errors"
|
||||
var drivers = map[string]Driver{
|
||||
"local": {
|
||||
Params: []string{"root"},
|
||||
Create: createLocalBackend,
|
||||
Create: func(params map[string]string) (Backend, error) {
|
||||
return createLocalBackend(params)
|
||||
},
|
||||
},
|
||||
"minio": {
|
||||
Params: []string{"endpoint", "key_id", "access_key", "tmp", "bucket_name", "prefix"},
|
||||
@@ -15,7 +17,7 @@ var drivers = map[string]Driver{
|
||||
|
||||
type Driver struct {
|
||||
Params []string
|
||||
Create func(params map[string]string, validate bool) (Backend, error)
|
||||
Create func(params map[string]string) (Backend, error)
|
||||
}
|
||||
|
||||
func FindDriver(driverName string) (Driver, error) {
|
||||
|
||||
@@ -1,63 +1,20 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"hash"
|
||||
"io"
|
||||
|
||||
"github.com/gabriel-vasile/mimetype"
|
||||
)
|
||||
|
||||
type hasher struct {
|
||||
dest io.WriteCloser
|
||||
len int
|
||||
sum hash.Hash
|
||||
contents []byte
|
||||
closeCallback func(int, hash.Hash, string) error
|
||||
}
|
||||
|
||||
func (c *hasher) Write(p []byte) (n int, err error) {
|
||||
n, err = c.dest.Write(p)
|
||||
c.len += n
|
||||
if c.sum != nil {
|
||||
c.sum.Write(p)
|
||||
}
|
||||
remain := min(cap(c.contents)-len(c.contents), len(p))
|
||||
if remain > 0 {
|
||||
c.contents = append(c.contents, p[:remain]...)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func min(a, b int) int {
|
||||
if b < a {
|
||||
return b
|
||||
}
|
||||
return a
|
||||
|
||||
}
|
||||
|
||||
func (c *hasher) Close() error {
|
||||
if err := c.dest.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return c.closeCallback(c.len, c.sum, mimetype.Detect(c.contents).String())
|
||||
}
|
||||
|
||||
type callbackWriteCloser struct {
|
||||
dest io.WriteCloser
|
||||
len int
|
||||
closeCallback func(int) error
|
||||
closeCallback func(error) error
|
||||
}
|
||||
|
||||
func (c *callbackWriteCloser) Write(p []byte) (n int, err error) {
|
||||
n, err = c.dest.Write(p)
|
||||
c.len += n
|
||||
return
|
||||
}
|
||||
|
||||
func (c *callbackWriteCloser) Close() error {
|
||||
if err := c.dest.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return c.closeCallback(c.len)
|
||||
err := c.dest.Close()
|
||||
return c.closeCallback(err)
|
||||
}
|
||||
|
||||
@@ -12,19 +12,27 @@ import (
|
||||
|
||||
type localStorage struct {
|
||||
root string
|
||||
tmp string
|
||||
}
|
||||
|
||||
func createLocalBackend(params map[string]string, validateParams bool) (Backend, error) {
|
||||
if validateParams {
|
||||
if params["root"] == "" {
|
||||
return nil, errors.New("local storage root not provided")
|
||||
}
|
||||
err := os.MkdirAll(params["root"], 0750)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
func createLocalBackend(params map[string]string) (LocalBackend, error) {
|
||||
if params["root"] == "" {
|
||||
return nil, errors.New("local storage root not provided")
|
||||
}
|
||||
return localStorage{root: params["root"]}, nil
|
||||
|
||||
l := localStorage{
|
||||
root: params["root"],
|
||||
tmp: filepath.Join(params["root"], "tmp"),
|
||||
}
|
||||
|
||||
if err := os.RemoveAll(l.tmp); err != nil {
|
||||
return nil, errors.New("failed to remove tmp directory: " + err.Error())
|
||||
}
|
||||
if err := os.MkdirAll(l.tmp, 0750); err != nil {
|
||||
return nil, errors.New("failed to create tmp directory: " + err.Error())
|
||||
}
|
||||
|
||||
return l, nil
|
||||
}
|
||||
|
||||
func (l localStorage) OpenRead(id uuid.UUID, start, length int) (io.ReadCloser, error) {
|
||||
@@ -40,7 +48,18 @@ func (l localStorage) OpenRead(id uuid.UUID, start, length int) (io.ReadCloser,
|
||||
}
|
||||
|
||||
func (l localStorage) OpenWrite(id uuid.UUID) (io.WriteCloser, error) {
|
||||
return os.OpenFile(l.path(id), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0640)
|
||||
f, err := os.CreateTemp(l.tmp, "upload-*")
|
||||
tmpFilePath := filepath.Join(l.tmp, f.Name())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &callbackWriteCloser{dest: f, closeCallback: func(err error) error {
|
||||
if err != nil {
|
||||
os.Remove(tmpFilePath)
|
||||
return err
|
||||
}
|
||||
return os.Rename(tmpFilePath, l.path(id))
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func (l localStorage) Delete(id uuid.UUID) error {
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/minio/minio-go/v7"
|
||||
@@ -22,28 +21,27 @@ type minioStorage struct {
|
||||
prefix string
|
||||
}
|
||||
|
||||
func createMinioBackend(params map[string]string, validateParams bool) (Backend, error) {
|
||||
if validateParams {
|
||||
if params["endpoint"] == "" {
|
||||
return nil, errors.New("minio endpoint not provided")
|
||||
}
|
||||
if params["key_id"] == "" {
|
||||
return nil, errors.New("minio key id not provided")
|
||||
}
|
||||
if params["access_key"] == "" {
|
||||
return nil, errors.New("minio access key not provided")
|
||||
}
|
||||
if params["tmp"] == "" {
|
||||
return nil, errors.New("minio tmp not provided")
|
||||
}
|
||||
if params["bucket_name"] == "" {
|
||||
return nil, errors.New("minio bucket name not provided")
|
||||
}
|
||||
err := os.MkdirAll(params["tmp"], 0750)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
func createMinioBackend(params map[string]string) (Backend, error) {
|
||||
if params["endpoint"] == "" {
|
||||
return nil, errors.New("minio endpoint not provided")
|
||||
}
|
||||
if params["key_id"] == "" {
|
||||
return nil, errors.New("minio key id not provided")
|
||||
}
|
||||
if params["access_key"] == "" {
|
||||
return nil, errors.New("minio access key not provided")
|
||||
}
|
||||
if params["tmp"] == "" {
|
||||
return nil, errors.New("minio tmp not provided")
|
||||
}
|
||||
if params["bucket_name"] == "" {
|
||||
return nil, errors.New("minio bucket name not provided")
|
||||
}
|
||||
err := os.MkdirAll(params["tmp"], 0750)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client, err := minio.New(params["endpoint"], &minio.Options{
|
||||
Creds: credentials.NewStaticV4(params["key_id"], params["access_key"], ""),
|
||||
Secure: true,
|
||||
@@ -63,19 +61,23 @@ func (s minioStorage) OpenRead(id uuid.UUID, start, length int) (io.ReadCloser,
|
||||
}
|
||||
}
|
||||
|
||||
func (s minioStorage) OpenWrite(id uuid.UUID) (io.WriteCloser, error) {
|
||||
path := filepath.Join(s.tmp, id.String())
|
||||
file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0640)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &callbackWriteCloser{dest: file, closeCallback: func(len int) error {
|
||||
_, err = s.client.FPutObject(context.Background(), s.bucketName, s.prefix+id.String(), path, minio.PutObjectOptions{Progress: &progressReader{len: len, callback: func() {
|
||||
os.Remove(path)
|
||||
}}})
|
||||
return err
|
||||
}}, nil
|
||||
}
|
||||
// func (s minioStorage) OpenWrite(id uuid.UUID) (io.WriteCloser, error) {
|
||||
// path := filepath.Join(s.tmp, id.String())
|
||||
// file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0640)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// return &callbackWriteCloser{dest: file, closeCallback: func(err error, len int) error {
|
||||
// if err != nil {
|
||||
// os.Remove(path)
|
||||
// return err
|
||||
// }
|
||||
// _, err = s.client.FPutObject(context.Background(), s.bucketName, s.prefix+id.String(), path, minio.PutObjectOptions{Progress: &progressReader{len: len, callback: func() {
|
||||
// os.Remove(path)
|
||||
// }}})
|
||||
// return err
|
||||
// }}, nil
|
||||
// }
|
||||
|
||||
func (s minioStorage) Delete(id uuid.UUID) error {
|
||||
return s.client.RemoveObject(context.Background(), s.bucketName, s.prefix+id.String(), minio.RemoveObjectOptions{})
|
||||
@@ -103,20 +105,20 @@ func (s minioStorage) String() string {
|
||||
return fmt.Sprintf("minio (endpoint: %s, bucket: %s, prefix: %s, tmp: %s)", s.client.EndpointURL(), s.bucketName, s.prefix, s.tmp)
|
||||
}
|
||||
|
||||
type progressReader struct {
|
||||
cur, len int
|
||||
callback func()
|
||||
}
|
||||
// type progressReader struct {
|
||||
// cur, len int
|
||||
// callback func()
|
||||
// }
|
||||
|
||||
func (r *progressReader) Read(p []byte) (n int, err error) {
|
||||
max := r.len - r.cur
|
||||
size := len(p)
|
||||
if size > max {
|
||||
size = max
|
||||
}
|
||||
r.cur += size
|
||||
if r.cur == r.len && size == 0 {
|
||||
r.callback()
|
||||
}
|
||||
return size, nil
|
||||
}
|
||||
// func (r *progressReader) Read(p []byte) (n int, err error) {
|
||||
// max := r.len - r.cur
|
||||
// size := len(p)
|
||||
// if size > max {
|
||||
// size = max
|
||||
// }
|
||||
// r.cur += size
|
||||
// if r.cur == r.len && size == 0 {
|
||||
// r.callback()
|
||||
// }
|
||||
// return size, nil
|
||||
// }
|
||||
|
||||
@@ -2,54 +2,71 @@ package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"hash"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/shroff/phylum/server/internal/core/db"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type Storage struct {
|
||||
backends map[string]Backend
|
||||
defaultBackend Backend
|
||||
}
|
||||
|
||||
var s Storage
|
||||
var Cfg Config
|
||||
var defaultBackend LocalBackend
|
||||
var backends map[string]Backend
|
||||
|
||||
func Get() Storage {
|
||||
if s.defaultBackend == nil {
|
||||
if storage, err := create(Cfg.Location); err != nil {
|
||||
logrus.Fatal(err)
|
||||
} else {
|
||||
s = storage
|
||||
}
|
||||
go processBackendUpdates()
|
||||
}
|
||||
return s
|
||||
type BackendConfig struct {
|
||||
Name string `json:"name"`
|
||||
Driver string `json:"driver"`
|
||||
Params map[string]string `json:"params"`
|
||||
}
|
||||
|
||||
func create(storageLocation string) (Storage, error) {
|
||||
if err := os.Chmod(storageLocation, 0700); err != nil {
|
||||
return Storage{}, errors.New("failed to set permissions: " + err.Error())
|
||||
func Initialize(db db.Handler) error {
|
||||
if err := os.Chmod(Cfg.Location, 0700); err != nil {
|
||||
return errors.New("failed to set permissions: " + err.Error())
|
||||
}
|
||||
if backends, err := restoreBackends(); err != nil {
|
||||
return Storage{}, errors.New("failed to restore backends: " + err.Error())
|
||||
} else if defaultBackend, err := createLocalBackend(map[string]string{"root": path.Join(storageLocation, "default")}, true); err != nil {
|
||||
return Storage{}, errors.New("failed to create default backend: " + err.Error())
|
||||
if restoredBackends, err := restoreBackends(); err != nil {
|
||||
return errors.New("failed to restore backends: " + err.Error())
|
||||
} else if b, err := createLocalBackend(map[string]string{"root": path.Join(Cfg.Location, "default")}); err != nil {
|
||||
return errors.New("failed to create default backend: " + err.Error())
|
||||
} else {
|
||||
return Storage{
|
||||
backends: backends,
|
||||
defaultBackend: defaultBackend,
|
||||
}, nil
|
||||
restoredBackends[""] = b
|
||||
backends = restoredBackends
|
||||
defaultBackend = b
|
||||
}
|
||||
|
||||
go processBackendUpdates(db)
|
||||
return nil
|
||||
}
|
||||
|
||||
func DefaultBackend() LocalBackend {
|
||||
return defaultBackend
|
||||
}
|
||||
|
||||
func GetBackend(name string) Backend {
|
||||
return backends[name]
|
||||
}
|
||||
|
||||
func ListBackends() map[string]Backend {
|
||||
return backends
|
||||
}
|
||||
|
||||
func InsertBackend(db db.Handler, name string, driver Driver, params map[string]string) error {
|
||||
backend, err := driver.Create(params)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
const q = "INSERT INTO storage_backends(name, driver, params) VALUES ($1::TEXT, $2::TEXT, $3::JSONB)"
|
||||
p, err := json.Marshal(params)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := db.Exec(q, name, driver, p); err != nil {
|
||||
return err
|
||||
}
|
||||
backends[name] = backend
|
||||
return nil
|
||||
}
|
||||
|
||||
func restoreBackends() (map[string]Backend, error) {
|
||||
@@ -83,8 +100,8 @@ func restoreBackends() (map[string]Backend, error) {
|
||||
}
|
||||
}
|
||||
|
||||
func processBackendUpdates() {
|
||||
sub := db.Get(context.Background()).Notifier().Listen("backend_updates")
|
||||
func processBackendUpdates(db db.Handler) {
|
||||
sub := db.Notifier().Listen("backend_updates")
|
||||
for {
|
||||
p := <-sub.NotificationC()
|
||||
var c BackendConfig
|
||||
@@ -93,120 +110,15 @@ func processBackendUpdates() {
|
||||
} else if b, err := c.open(); err != nil {
|
||||
logrus.Warn("failed to open backend: " + err.Error())
|
||||
} else {
|
||||
s.backends[c.Name] = b
|
||||
backends[c.Name] = b
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s Storage) OpenRead(id uuid.UUID, start, length int) (io.ReadCloser, error) {
|
||||
if backend, err := s.findStorageBackend(id); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return backend.OpenRead(id, start, length)
|
||||
}
|
||||
}
|
||||
|
||||
func (s Storage) OpenWrite(id uuid.UUID, h func() hash.Hash, callback func(int, string, string) error) (io.WriteCloser, error) {
|
||||
if backend, err := s.findStorageBackend(id); err != nil {
|
||||
return nil, err
|
||||
} else if out, err := backend.OpenWrite(id); err != nil {
|
||||
return nil, err
|
||||
} else if callback == nil {
|
||||
return out, nil
|
||||
} else {
|
||||
return &hasher{dest: out, sum: h(), contents: make([]byte, 0, 10*1024), closeCallback: func(len int, sum hash.Hash, mime string) error {
|
||||
etag := hex.EncodeToString(sum.Sum(nil))
|
||||
return callback(len, etag, mime)
|
||||
}}, nil
|
||||
}
|
||||
}
|
||||
|
||||
type ContentsCopy struct {
|
||||
SrcVersion uuid.UUID
|
||||
DestResource uuid.UUID
|
||||
Size int64
|
||||
MimeType string
|
||||
SHA256 string
|
||||
}
|
||||
|
||||
func (s Storage) CopyContents(params ContentsCopy) (uuid.UUID, error) {
|
||||
in, err := s.OpenRead(params.SrcVersion, 0, -1)
|
||||
if err != nil {
|
||||
return uuid.Nil, errors.New("Unable to open " + params.SrcVersion.String() + ": " + err.Error())
|
||||
}
|
||||
defer in.Close()
|
||||
id, _ := uuid.NewV7()
|
||||
|
||||
out, err := s.OpenWrite(id, nil, nil)
|
||||
if err != nil {
|
||||
return uuid.Nil, errors.New("Unable to open " + id.String() + ": " + err.Error())
|
||||
}
|
||||
defer out.Close()
|
||||
|
||||
_, err = io.Copy(out, in)
|
||||
return id, err
|
||||
|
||||
}
|
||||
|
||||
func (s Storage) Delete(id uuid.UUID) error {
|
||||
if backend, err := s.findStorageBackend(id); err != nil {
|
||||
return err
|
||||
} else {
|
||||
return backend.Delete(id)
|
||||
}
|
||||
}
|
||||
|
||||
func (s Storage) DeleteAll(ids []uuid.UUID) []error {
|
||||
// TODO: Batch delete
|
||||
var errs []error
|
||||
for _, id := range ids {
|
||||
err := s.Delete(id)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
return errs
|
||||
}
|
||||
|
||||
func (s Storage) findStorageBackend(id uuid.UUID) (Backend, error) {
|
||||
// TODO: implement
|
||||
return s.defaultBackend, nil
|
||||
}
|
||||
|
||||
func (s Storage) CreateBackend(db db.Handler, name string, driver Driver, params map[string]string) error {
|
||||
backend, err := driver.Create(params, true)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
const q = "INSERT INTO storage_backends(name, driver, params) VALUES ($1::TEXT, $2::TEXT, $3::JSONB)"
|
||||
p, err := json.Marshal(params)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := db.Exec(q, name, driver, p); err != nil {
|
||||
return err
|
||||
}
|
||||
s.backends[name] = backend
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s Storage) ListBackends() map[string]Backend {
|
||||
return s.backends
|
||||
}
|
||||
|
||||
type BackendConfig struct {
|
||||
Name string `json:"name"`
|
||||
Driver string `json:"driver"`
|
||||
Params map[string]string `json:"params"`
|
||||
}
|
||||
|
||||
func (c BackendConfig) open() (Backend, error) {
|
||||
if driver, err := FindDriver(c.Driver); err != nil {
|
||||
return nil, err
|
||||
} else if backend, err := driver.Create(c.Params, false); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return backend, nil
|
||||
return driver.Create(c.Params)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -8,8 +8,12 @@ import (
|
||||
|
||||
type Backend interface {
|
||||
OpenRead(id uuid.UUID, start, length int) (io.ReadCloser, error)
|
||||
OpenWrite(id uuid.UUID) (io.WriteCloser, error)
|
||||
Delete(id uuid.UUID) error
|
||||
DeleteAll(ids uuid.UUIDs) []error
|
||||
String() string
|
||||
}
|
||||
|
||||
type LocalBackend interface {
|
||||
Backend
|
||||
OpenWrite(id uuid.UUID) (io.WriteCloser, error)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user