Generalize PhylumFs

This commit is contained in:
Abhishek Shroff
2024-03-04 00:27:44 +05:30
parent 033db33e09
commit 3ef4e2f54d
10 changed files with 285 additions and 252 deletions

View File

@@ -2,10 +2,8 @@ package cmds
import (
"context"
"fmt"
"github.com/google/uuid"
"github.com/shroff/phylum/server/internal/phylumsql"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
@@ -15,7 +13,10 @@ func setupAdminCommand() *cobra.Command {
Use: "admin",
Short: "Server Administration",
}
cmdServe.AddCommand([]*cobra.Command{setupAdminMkrootCommand()}...)
cmdServe.AddCommand([]*cobra.Command{
setupAdminMkrootCommand(),
setupAdminRmrootCommand(),
}...)
return cmdServe
}
@@ -26,16 +27,21 @@ func setupAdminMkrootCommand() *cobra.Command {
Short: "Create Root Folder",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
name := args[0]
if _, err := queries.FindRoot(context.Background(), name); err == nil {
log.Fatal(fmt.Sprintf("Root directory already exists: %s", name))
} else {
_, err = queries.CreateDirectory(context.Background(), phylumsql.CreateDirectoryParams{ID: uuid.New(), Parent: nil, Name: name})
if err != nil {
log.Fatal(err)
} else {
log.Info(fmt.Sprintf("Root directory created: %s", name))
}
if err := backend.Mkroot(context.Background(), uuid.New(), args[0]); err != nil {
log.Fatal(err)
}
},
}
}
func setupAdminRmrootCommand() *cobra.Command {
return &cobra.Command{
Use: "rmroot name",
Short: "Delete Root Folder",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
if err := backend.Rmroot(context.Background(), args[0]); err != nil {
log.Fatal(err)
}
},
}

View File

@@ -7,6 +7,7 @@ import (
"path"
"github.com/jackc/pgx/v5"
"github.com/shroff/phylum/server/internal/phylumfs"
"github.com/shroff/phylum/server/internal/phylumsql"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@@ -14,7 +15,7 @@ import (
)
var debugMode bool = false
var queries phylumsql.Queries
var backend phylumfs.PhylumFs
func Setup() {
viper.SetEnvPrefix("phylum")
@@ -58,7 +59,8 @@ func Setup() {
if err != nil {
log.Fatal(fmt.Sprintf("Unable to connect to database: %v\n", err))
}
queries = *phylumsql.New(conn)
queries := *phylumsql.New(conn)
backend = phylumfs.New(queries)
}
defer func() {

View File

@@ -54,12 +54,8 @@ func setupServeCommand() *cobra.Command {
func setupWebdav(r *gin.RouterGroup) {
log.Info(fmt.Sprintf("Setting up WebDAV access at %s", r.BasePath()))
cs, err := pgfs.NewLocalFsContentStore("srv")
if err != nil {
panic(err)
}
webdavHandler := webdav.Handler{
FileSystem: pgfs.New(queries, cs, r.BasePath()),
FileSystem: pgfs.New(backend, r.BasePath()),
}
handler := func(c *gin.Context) {
webdavHandler.ServeHTTP(c.Writer, c.Request)
@@ -72,6 +68,7 @@ func setupWebdav(r *gin.RouterGroup) {
r.Handle("DELETE", "/*path", handler)
r.Handle("MOVE", "/*path", handler)
r.Handle("COPY", "/*path", handler)
r.Handle("MKCOL", "/*path", handler)
r.Handle("PROPFIND", "/*path", handler)
r.Handle("PROPPATCH", "/*path", handler)
}

View File

@@ -1,59 +0,0 @@
package pgfs
import (
"crypto/md5"
"hash"
"io"
"os"
"path/filepath"
"github.com/google/uuid"
)
type LocalFsContentStore string
type contentWriter struct {
file *os.File
hash hash.Hash
callback func(hash.Hash, error)
}
func (c contentWriter) Write(p []byte) (n int, err error) {
n, err = c.file.Write(p)
c.hash.Write(p)
return
}
func (c contentWriter) Close() error {
err := c.file.Close()
c.callback(c.hash, err)
return err
}
func NewLocalFsContentStore(root string) (ContentStore, error) {
err := os.MkdirAll(root, 0750)
if err != nil {
return nil, err
}
return LocalFsContentStore(root), nil
}
func (l LocalFsContentStore) Open(id uuid.UUID) (io.ReadCloser, error) {
return os.Open(l.path(id))
}
func (l LocalFsContentStore) Create(id uuid.UUID, callback func(hash.Hash, error)) (io.WriteCloser, error) {
file, err := os.Create(l.path(id))
if err != nil {
return nil, err
}
return contentWriter{file, md5.New(), callback}, nil
}
func (l LocalFsContentStore) Delete(id uuid.UUID) error {
return os.Remove(filepath.Join(string(l), id.String()))
}
func (l LocalFsContentStore) path(id uuid.UUID) string {
return filepath.Join(string(l), id.String())
}

View File

@@ -2,28 +2,23 @@ package pgfs
import (
"context"
"encoding/base64"
"hash"
"io"
"io/fs"
"os"
"strings"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgtype"
"github.com/shroff/phylum/server/internal/phylumsql"
webdav "github.com/emersion/go-webdav"
"github.com/google/uuid"
"github.com/shroff/phylum/server/internal/phylumfs"
"github.com/shroff/phylum/server/internal/phylumsql"
)
type Pgfs struct {
q phylumsql.Queries
cs ContentStore
p phylumfs.PhylumFs
prefix string
}
func New(q phylumsql.Queries, cs ContentStore, prefix string) Pgfs {
return Pgfs{q: q, cs: cs, prefix: prefix}
func New(p phylumfs.PhylumFs, prefix string) Pgfs {
return Pgfs{p: p, prefix: prefix}
}
func (p Pgfs) Open(ctx context.Context, name string) (io.ReadCloser, error) {
@@ -31,8 +26,9 @@ func (p Pgfs) Open(ctx context.Context, name string) (io.ReadCloser, error) {
if err != nil {
return nil, err
}
return p.cs.Open(resource.ID)
return p.p.Open(resource.ID)
}
func (p Pgfs) Stat(ctx context.Context, name string) (*webdav.FileInfo, error) {
resource, err := p.resourceByPath(ctx, name)
if err != nil {
@@ -56,7 +52,7 @@ func (p Pgfs) ReadDir(ctx context.Context, name string, recursive bool) ([]webda
if !dir.Dir {
return nil, fs.ErrInvalid
}
children, err := p.q.ReadDir(ctx, phylumsql.ReadDirParams{ID: dir.ID, Recursive: recursive})
children, err := p.p.ReadDir(ctx, dir.ID, recursive)
if err != nil {
return nil, err
}
@@ -76,47 +72,25 @@ func (p Pgfs) ReadDir(ctx context.Context, name string, recursive bool) ([]webda
return result, nil
}
func (p Pgfs) Create(ctx context.Context, name string) (io.WriteCloser, error) {
resource, _ := p.resourceByPath(ctx, name)
if resource != nil {
return p.cs.Create(resource.ID, func(h hash.Hash, err error) {
if err != nil {
etag := base64.StdEncoding.EncodeToString(h.Sum(nil))
_, err = p.q.UpdateResourceContents(ctx, phylumsql.UpdateResourceContentsParams{
Size: pgtype.Int4{Int32: int32(h.Size()), Valid: true},
Etag: pgtype.Text{String: string(etag), Valid: true},
})
}
if err != nil {
p.cs.Delete(resource.ID)
}
})
var id uuid.UUID
if resource, err := p.resourceByPath(ctx, name); err == nil {
id = resource.ID
} else {
name = strings.TrimRight(name, "/")
index := strings.LastIndex(name, "/")
parentPath := name[0:index]
parent, err := p.resourceByPath(ctx, parentPath)
if err != nil {
return nil, fs.ErrNotExist
}
fileName := name[index+1:]
id = uuid.New()
if err = p.p.CreateResource(ctx, id, parent.ID, fileName, false); err != nil {
return nil, err
}
}
index := strings.LastIndex(name, "/")
parentPath := name[0:index]
parent, err := p.resourceByPath(ctx, parentPath)
if err != nil {
return nil, fs.ErrNotExist
}
fileName := name[index+1:]
id := uuid.New()
return p.cs.Create(id, func(h hash.Hash, err error) {
etag := base64.StdEncoding.EncodeToString(h.Sum(nil))
if err != nil {
_, err = p.q.CreateFile(
ctx,
phylumsql.CreateFileParams{
ID: id,
Parent: &parent.ID,
Name: fileName,
Size: pgtype.Int4{Int32: int32(h.Size()), Valid: true},
Etag: pgtype.Text{String: string(etag), Valid: true},
})
}
if err != nil {
p.cs.Delete(id)
}
})
return p.p.UpdateContents(ctx, id)
}
func (p Pgfs) RemoveAll(ctx context.Context, name string) error {
@@ -124,7 +98,8 @@ func (p Pgfs) RemoveAll(ctx context.Context, name string) error {
if resource == nil {
return fs.ErrNotExist
}
return p.q.DeleteRecursive(ctx, resource.ID)
err := p.p.DeleteRecursive(ctx, resource.ID)
return err
}
func (p Pgfs) Mkdir(ctx context.Context, name string) error {
@@ -132,6 +107,7 @@ func (p Pgfs) Mkdir(ctx context.Context, name string) error {
if resource != nil {
return fs.ErrExist
}
name = strings.TrimRight(name, "/")
index := strings.LastIndex(name, "/")
parentPath := name[0:index]
parent, err := p.resourceByPath(ctx, parentPath)
@@ -139,7 +115,7 @@ func (p Pgfs) Mkdir(ctx context.Context, name string) error {
return fs.ErrNotExist
}
dirName := name[index+1:]
_, err = p.q.CreateDirectory(ctx, phylumsql.CreateDirectoryParams{ID: uuid.New(), Parent: &parent.ID, Name: dirName})
err = p.p.CreateResource(ctx, uuid.New(), parent.ID, dirName, true)
return err
}
func (p Pgfs) Copy(ctx context.Context, name, dest string, options *webdav.CopyOptions) (created bool, err error) {
@@ -151,23 +127,6 @@ func (p Pgfs) Move(ctx context.Context, name, dest string, options *webdav.MoveO
return false, nil
}
func (p Pgfs) resourceByPath(ctx context.Context, name string) (*phylumsql.ResourceByPathRow, error) {
path := strings.TrimPrefix(name, p.prefix+"/")
segments := strings.Split(strings.TrimRight(path, "/"), "/")
if len(segments) == 0 {
return nil, os.ErrInvalid
}
root, err := p.q.FindRoot(ctx, segments[0])
if err != nil {
return nil, os.ErrNotExist
}
//TODO: Permissions checks
res, err := p.q.ResourceByPath(ctx, phylumsql.ResourceByPathParams{Search: segments, Root: root.ID})
if err != nil {
return nil, os.ErrNotExist
}
return &res, nil
func (p Pgfs) resourceByPath(ctx context.Context, name string) (res *phylumsql.ResourceByPathRow, err error) {
return p.p.ResourceByPath(ctx, strings.TrimPrefix(name, p.prefix+"/"))
}

View File

@@ -1,4 +1,4 @@
package pgfs
package phylumfs
import (
"hash"
@@ -9,6 +9,6 @@ import (
type ContentStore interface {
Open(id uuid.UUID) (io.ReadCloser, error)
Create(id uuid.UUID, callback func(hash.Hash, error)) (io.WriteCloser, error)
Create(id uuid.UUID, callback func(hash.Hash, int, error)) (io.WriteCloser, error)
Delete(id uuid.UUID) error
}

View File

@@ -0,0 +1,68 @@
package phylumfs
import (
"crypto/md5"
"hash"
"io"
"os"
"path/filepath"
"github.com/google/uuid"
)
type LocalContentStore string
type contentWriter struct {
file *os.File
hash hash.Hash
len **int
callback func(hash.Hash, int, error)
}
func (c contentWriter) Write(p []byte) (n int, err error) {
n, err = c.file.Write(p)
len := **c.len + n
*c.len = &len
c.hash.Write(p)
return
}
func (c contentWriter) Close() error {
err := c.file.Sync()
if err != nil {
return err
}
err = c.file.Close()
c.callback(c.hash, **c.len, err)
return err
}
func NewLocalContentStore(root string) (ContentStore, error) {
err := os.MkdirAll(root, 0750)
if err != nil {
return nil, err
}
return LocalContentStore(root), nil
}
func (l LocalContentStore) Open(id uuid.UUID) (io.ReadCloser, error) {
return os.Open(l.path(id))
}
func (l LocalContentStore) Create(id uuid.UUID, callback func(hash.Hash, int, error)) (io.WriteCloser, error) {
file, err := os.Create(l.path(id))
if err != nil {
return nil, err
}
len := 0
addr := &len
return contentWriter{file, md5.New(), &addr, callback}, nil
}
func (l LocalContentStore) Delete(id uuid.UUID) error {
return os.Remove(filepath.Join(string(l), id.String()))
}
func (l LocalContentStore) path(id uuid.UUID) string {
return filepath.Join(string(l), id.String())
}

View File

@@ -0,0 +1,101 @@
package phylumfs
import (
"context"
"encoding/base64"
"fmt"
"hash"
"io"
"io/fs"
"strings"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgtype"
"github.com/shroff/phylum/server/internal/phylumsql"
"github.com/sirupsen/logrus"
)
type PhylumFs struct {
q phylumsql.Queries
cs ContentStore
}
func New(q phylumsql.Queries) PhylumFs {
cs, err := NewLocalContentStore("srv")
if err != nil {
panic(err)
}
return PhylumFs{q: q, cs: cs}
}
func (p PhylumFs) Mkroot(ctx context.Context, id uuid.UUID, name string) error {
if _, err := p.q.FindRoot(ctx, name); err == nil {
return fmt.Errorf("root directory already exists: %s", name)
} else {
return p.q.CreateResource(ctx, phylumsql.CreateResourceParams{ID: id, Parent: nil, Name: name, Dir: true})
}
}
func (p PhylumFs) Rmroot(ctx context.Context, name string) error {
if root, err := p.q.FindRoot(ctx, name); err == nil {
return p.DeleteRecursive(ctx, root.ID)
} else {
return fmt.Errorf("root directory does not exist: %s", name)
}
}
func (p PhylumFs) Open(id uuid.UUID) (io.ReadCloser, error) {
return p.cs.Open(id)
}
func (p PhylumFs) ReadDir(ctx context.Context, id uuid.UUID, recursive bool) ([]phylumsql.ReadDirRow, error) {
return p.q.ReadDir(ctx, phylumsql.ReadDirParams{ID: id, Recursive: recursive})
}
func (p PhylumFs) DeleteRecursive(ctx context.Context, id uuid.UUID) error {
_, err := p.q.DeleteRecursive(ctx, id)
// TODO: Delete Contents
return err
}
func (p PhylumFs) CreateResource(ctx context.Context, id uuid.UUID, parent uuid.UUID, name string, dir bool) error {
return p.q.CreateResource(ctx, phylumsql.CreateResourceParams{ID: id, Parent: &parent, Name: name, Dir: dir})
}
func (p PhylumFs) UpdateContents(ctx context.Context, id uuid.UUID) (io.WriteCloser, error) {
return p.cs.Create(id, func(h hash.Hash, len int, err error) {
if err == nil {
etag := base64.StdEncoding.EncodeToString(h.Sum(nil))
err = p.q.UpdateResourceContents(ctx, phylumsql.UpdateResourceContentsParams{
ID: id,
Size: pgtype.Int4{Int32: int32(len), Valid: true},
Etag: pgtype.Text{String: string(etag), Valid: true},
})
}
if err != nil {
logrus.Warn(fmt.Sprintf("Unable to update contents: %s", err))
p.cs.Delete(id)
}
})
}
func (p PhylumFs) ResourceByPath(ctx context.Context, path string) (*phylumsql.ResourceByPathRow, error) {
segments := strings.Split(strings.TrimRight(path, "/"), "/")
if len(segments) == 0 {
return nil, fs.ErrInvalid
}
root, err := p.q.FindRoot(ctx, segments[0])
if err != nil {
return nil, fs.ErrNotExist
}
res, err := p.q.ResourceByPath(ctx, phylumsql.ResourceByPathParams{Search: segments, Root: root.ID})
if err != nil {
return nil, fs.ErrNotExist
}
//TODO: Permissions checks
return &res, nil
}

View File

@@ -12,79 +12,32 @@ import (
"github.com/jackc/pgx/v5/pgtype"
)
const createDirectory = `-- name: CreateDirectory :one
const createResource = `-- name: CreateResource :exec
INSERT INTO resources(
id, parent, name, dir, created, modified
) VALUES (
$1, $2, $3, true, NOW(), NOW()
$1, $2, $3, $4, NOW(), NOW()
)
RETURNING id, parent, name, dir, created, modified, deleted, size, etag
`
type CreateDirectoryParams struct {
type CreateResourceParams struct {
ID uuid.UUID
Parent *uuid.UUID
Name string
Dir bool
}
func (q *Queries) CreateDirectory(ctx context.Context, arg CreateDirectoryParams) (Resource, error) {
row := q.db.QueryRow(ctx, createDirectory, arg.ID, arg.Parent, arg.Name)
var i Resource
err := row.Scan(
&i.ID,
&i.Parent,
&i.Name,
&i.Dir,
&i.Created,
&i.Modified,
&i.Deleted,
&i.Size,
&i.Etag,
)
return i, err
}
const createFile = `-- name: CreateFile :one
INSERT INTO resources(
id, parent, name, dir, created, modified, size, etag
) VALUES (
$1, $2, $3, false, NOW(), NOW(), $4, $5
)
RETURNING id, parent, name, dir, created, modified, deleted, size, etag
`
type CreateFileParams struct {
ID uuid.UUID
Parent *uuid.UUID
Name string
Size pgtype.Int4
Etag pgtype.Text
}
func (q *Queries) CreateFile(ctx context.Context, arg CreateFileParams) (Resource, error) {
row := q.db.QueryRow(ctx, createFile,
func (q *Queries) CreateResource(ctx context.Context, arg CreateResourceParams) error {
_, err := q.db.Exec(ctx, createResource,
arg.ID,
arg.Parent,
arg.Name,
arg.Size,
arg.Etag,
arg.Dir,
)
var i Resource
err := row.Scan(
&i.ID,
&i.Parent,
&i.Name,
&i.Dir,
&i.Created,
&i.Modified,
&i.Deleted,
&i.Size,
&i.Etag,
)
return i, err
return err
}
const deleteRecursive = `-- name: DeleteRecursive :exec
const deleteRecursive = `-- name: DeleteRecursive :many
WITH RECURSIVE nodes(id, parent) AS (
SELECT r.id, r.parent
FROM resources r WHERE r.id = $1::uuid
@@ -93,12 +46,40 @@ WITH RECURSIVE nodes(id, parent) AS (
FROM resources r JOIN nodes n on r.parent = n.id
WHERE deleted IS NULL
)
UPDATE resources SET deleted = NOW() WHERE id in (SELECT id FROM nodes)
UPDATE resources
SET deleted = NOW()
WHERE id in (SELECT id FROM nodes)
RETURNING id, parent, name, dir, created, modified, deleted, size, etag
`
func (q *Queries) DeleteRecursive(ctx context.Context, id uuid.UUID) error {
_, err := q.db.Exec(ctx, deleteRecursive, id)
return err
func (q *Queries) DeleteRecursive(ctx context.Context, id uuid.UUID) ([]Resource, error) {
rows, err := q.db.Query(ctx, deleteRecursive, id)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Resource
for rows.Next() {
var i Resource
if err := rows.Scan(
&i.ID,
&i.Parent,
&i.Name,
&i.Dir,
&i.Created,
&i.Modified,
&i.Deleted,
&i.Size,
&i.Etag,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const findResource = `-- name: FindResource :one
@@ -151,8 +132,7 @@ WITH RECURSIVE nodes(id, parent, name, dir, created, modified, size, etag, depth
SELECT r.id, r.parent, r.name, r.dir, r.created, r.modified, r.size, r.etag, n.depth + 1, concat(n.path, '/', r.name)
FROM resources r JOIN nodes n on r.parent = n.id
WHERE deleted IS NULL
-- AND depth < @max_depth::int
AND depth < CASE WHEN $2::boolean THEN 1000 ELSE 1 END
AND CASE WHEN $2::boolean THEN true ELSE depth < 1 END
)
SELECT id, parent, name, dir, created, modified, size, etag, depth, path from nodes
`
@@ -257,14 +237,13 @@ func (q *Queries) ResourceByPath(ctx context.Context, arg ResourceByPathParams)
return i, err
}
const updateResourceContents = `-- name: UpdateResourceContents :one
const updateResourceContents = `-- name: UpdateResourceContents :exec
UPDATE resources
SET
size = $1,
etag = $2,
modified = NOW()
WHERE id = $3
RETURNING id, parent, name, dir, created, modified, deleted, size, etag
`
type UpdateResourceContentsParams struct {
@@ -273,19 +252,7 @@ type UpdateResourceContentsParams struct {
ID uuid.UUID
}
func (q *Queries) UpdateResourceContents(ctx context.Context, arg UpdateResourceContentsParams) (Resource, error) {
row := q.db.QueryRow(ctx, updateResourceContents, arg.Size, arg.Etag, arg.ID)
var i Resource
err := row.Scan(
&i.ID,
&i.Parent,
&i.Name,
&i.Dir,
&i.Created,
&i.Modified,
&i.Deleted,
&i.Size,
&i.Etag,
)
return i, err
func (q *Queries) UpdateResourceContents(ctx context.Context, arg UpdateResourceContentsParams) error {
_, err := q.db.Exec(ctx, updateResourceContents, arg.Size, arg.Etag, arg.ID)
return err
}

View File

@@ -4,30 +4,20 @@ SELECT * from resources WHERE id = $1;
-- name: FindRoot :one
SELECT * from resources WHERE deleted IS NULL AND parent IS NULL AND name = $1;
-- name: CreateDirectory :one
-- name: CreateResource :exec
INSERT INTO resources(
id, parent, name, dir, created, modified
) VALUES (
$1, $2, $3, true, NOW(), NOW()
)
RETURNING *;
$1, $2, $3, $4, NOW(), NOW()
);
-- name: CreateFile :one
INSERT INTO resources(
id, parent, name, dir, created, modified, size, etag
) VALUES (
$1, $2, $3, false, NOW(), NOW(), $4, $5
)
RETURNING *;
-- name: UpdateResourceContents :one
-- name: UpdateResourceContents :exec
UPDATE resources
SET
size = $1,
etag = $2,
modified = NOW()
WHERE id = $3
RETURNING *;
WHERE id = $3;
-- name: ReadDir :many
@@ -38,8 +28,7 @@ WITH RECURSIVE nodes(id, parent, name, dir, created, modified, size, etag, depth
SELECT r.id, r.parent, r.name, r.dir, r.created, r.modified, r.size, r.etag, n.depth + 1, concat(n.path, '/', r.name)
FROM resources r JOIN nodes n on r.parent = n.id
WHERE deleted IS NULL
-- AND depth < @max_depth::int
AND depth < CASE WHEN @recursive::boolean THEN 1 ELSE 1000 END
AND CASE WHEN @recursive::boolean THEN true ELSE depth < 1 END
)
SELECT * from nodes;
@@ -55,7 +44,7 @@ WITH RECURSIVE nodes(id, parent, name, dir, created, modified, size, etag, depth
)
SELECT * FROM nodes WHERE cardinality(search) = depth + 1;
-- name: DeleteRecursive :exec
-- name: DeleteRecursive :many
WITH RECURSIVE nodes(id, parent) AS (
SELECT r.id, r.parent
FROM resources r WHERE r.id = @id::uuid
@@ -64,4 +53,7 @@ WITH RECURSIVE nodes(id, parent) AS (
FROM resources r JOIN nodes n on r.parent = n.id
WHERE deleted IS NULL
)
UPDATE resources SET deleted = NOW() WHERE id in (SELECT id FROM nodes);
UPDATE resources
SET deleted = NOW()
WHERE id in (SELECT id FROM nodes)
RETURNING *;