[server][core][fs] Call db directly

This commit is contained in:
Abhishek Shroff
2025-03-23 21:50:17 +05:30
parent 0c4b00ef62
commit ab065b3e2c
8 changed files with 232 additions and 276 deletions

View File

@@ -15,7 +15,7 @@ func setupListCommand() *cobra.Command {
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
f := common.UserFileSystem(cmd)
n, _ := cmd.Flags().GetInt("num")
n, _ := cmd.Flags().GetUint("num")
cursor, _ := cmd.Flags().GetString("cursor")
deleted, cursor, err := f.TrashList(cursor, n)
if err != nil {

View File

@@ -5,9 +5,8 @@ import (
"errors"
"fmt"
"github.com/doug-martin/goqu/v9"
_ "github.com/doug-martin/goqu/v9/dialect/postgres"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/sirupsen/logrus"
)
@@ -31,13 +30,6 @@ var DatabaseURL string
var TraceSQL bool
var AutoMigrate bool
var pg *goqu.Database
func init() {
pg = goqu.New("postgres", nil)
goqu.SetDefaultPrepared(true)
}
func Get() *DbHandler {
if d == nil {
if db, err := create(context.Background(), DatabaseURL, TraceSQL, AutoMigrate); err != nil {
@@ -96,7 +88,18 @@ func (d DbHandler) WithTx(ctx context.Context, fn func(*DbHandler) error) error
})
}
func (d DbHandler) Exec(ctx context.Context, stmt string, args ...any) error {
_, err := d.Queries.db.Exec(ctx, stmt, args...)
return err
func (d DbHandler) Exec(ctx context.Context, stmt string, args ...interface{}) (pgconn.CommandTag, error) {
return d.Queries.db.Exec(ctx, stmt, args...)
}
func (d DbHandler) Query(ctx context.Context, stmt string, args ...interface{}) (pgx.Rows, error) {
return d.Queries.db.Query(ctx, stmt, args...)
}
func (d DbHandler) QueryRow(ctx context.Context, stmt string, args ...interface{}) pgx.Row {
return d.Queries.db.QueryRow(ctx, stmt, args...)
}
func (d DbHandler) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) {
return d.Queries.db.CopyFrom(ctx, tableName, columnNames, rowSrc)
}

View File

@@ -80,18 +80,19 @@ func (d DbHandler) Migrate(ctx context.Context, version int) error {
func (d DbHandler) DeleteSchema(ctx context.Context) error {
return d.WithTx(ctx, func(d *DbHandler) (err error) {
user := d.pool.Config().ConnConfig.User
if err = d.Exec(ctx, "DROP SCHEMA public CASCADE"); err != nil {
if _, err = d.Exec(ctx, "DROP SCHEMA public CASCADE"); err != nil {
return
}
if err = d.Exec(ctx, "CREATE SCHEMA public"); err != nil {
if _, err = d.Exec(ctx, "CREATE SCHEMA public"); err != nil {
return
}
if err = d.Exec(ctx, "GRANT ALL ON SCHEMA public TO "+user); err != nil {
if _, err = d.Exec(ctx, "GRANT ALL ON SCHEMA public TO "+user); err != nil {
return
}
if err = d.Exec(ctx, "GRANT ALL ON SCHEMA public TO public"); err != nil {
if _, err = d.Exec(ctx, "GRANT ALL ON SCHEMA public TO public"); err != nil {
return
}
return d.Exec(ctx, "COMMENT ON SCHEMA public IS 'standard public schema'")
_, err = d.Exec(ctx, "COMMENT ON SCHEMA public IS 'standard public schema'")
return
})
}

View File

@@ -1,221 +0,0 @@
package db
import (
"context"
"github.com/doug-martin/goqu/v9"
"github.com/doug-martin/goqu/v9/exp"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
)
const emptyTrash = `DELETE FROM resources
WHERE deleted IS NOT NULL
AND CASE
WHEN $1::TEXT IS NULL THEN TRUE
ELSE permissions[$1::TEXT]::integer & 32 <> 0 END
RETURNING id, dir
`
func (q *Queries) EmptyTrash(ctx context.Context, username pgtype.Text) (count int, ids []uuid.UUID, err error) {
return collectDeletedRows(q.db.Query(ctx, emptyTrash, username))
}
func selectDescendentsCTE(id uuid.UUID, includeDeleted bool) (exp.AliasedExpression, *goqu.SelectDataset) {
resTable := goqu.T("resources").As("r")
rec := pg.From(resTable).
Select(resTable.Col("id"), resTable.Col("parent"), resTable.Col("deleted"), goqu.L("? + 1", goqu.I("n.depth"))).
Join(goqu.T("nodes").As("n"), goqu.On(goqu.I("r.parent").Eq(goqu.I("n.id"))))
if !includeDeleted {
rec = rec.Where(
goqu.Or(
goqu.I("r.deleted").Eq(goqu.I("n.deleted")),
goqu.And(
goqu.I("r.deleted").IsNull(),
goqu.I("n.deleted").IsNull(),
)))
}
return resTable, pg.From(resTable).WithRecursive("nodes(id, parent, deleted, depth)",
pg.From(resTable).
Select(resTable.Col("id"), resTable.Col("parent"), resTable.Col("deleted"), goqu.L("0")).
Where(goqu.I("r.id").Eq(id)).
UnionAll(rec))
}
func (q *Queries) Delete(ctx context.Context, id uuid.UUID, softDelete, excludeRoot bool) (int, []uuid.UUID, error) {
table, exp := selectDescendentsCTE(id, !softDelete)
idSelect := pg.From("nodes").Select("id")
if excludeRoot {
idSelect = idSelect.Where(goqu.C("depth").Gt(0))
}
exp = exp.Where(table.Col("id").Eq(idSelect))
// table
var query string
var params []interface{}
if softDelete {
query, params, _ = exp.
Update().
Set(
goqu.Record{
"modified": goqu.L("NOW()"),
"deleted": goqu.L("NOW()"),
}).
Returning(table.Col("id"), table.Col("dir")).
ToSQL()
} else {
query, params, _ = exp.
Delete().
Returning(table.Col("id"), table.Col("dir")).
ToSQL()
}
return collectDeletedRows(q.db.Query(ctx, query, params...))
}
func collectDeletedRows(rows pgx.Rows, e error) (count int, ids []uuid.UUID, err error) {
if e != nil {
return 0, nil, e
}
defer rows.Close()
var id uuid.UUID
var dir bool
for rows.Next() {
if err := rows.Scan(&id, &dir); err != nil {
return count, nil, err
}
if !dir {
ids = append(ids, id)
}
count++
}
err = rows.Err()
return count, ids, err
}
func (q *Queries) RestoreDeleted(ctx context.Context, id uuid.UUID) ([]int64, error) {
table, exp := selectDescendentsCTE(id, false)
idSelect := pg.From("nodes").Select("id")
exp = exp.Where(table.Col("id").Eq(idSelect))
// table
var query string
var params []interface{}
query, params, _ = exp.
Update().
Set(
goqu.Record{
"modified": goqu.L("NOW()"),
"deleted": nil,
}).
Returning(table.Col("content_length")).
ToSQL()
rows, err := q.db.Query(ctx, query, params...)
if err != nil {
return nil, err
}
defer rows.Close()
var items []int64
for rows.Next() {
var content_length int64
if err := rows.Scan(&content_length); err != nil {
return nil, err
}
items = append(items, content_length)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
func (q *Queries) TrashCompact(ctx context.Context, t pgtype.Timestamp) (int, uuid.UUIDs, error) {
return collectDeletedRows(q.db.Query(ctx, "DELETE FROM resources WHERE deleted < $1::TIMESTAMP RETURNING id, dir", t))
}
type TrashListParams struct {
LastTimestamp pgtype.Timestamp
LastID uuid.UUID
Username pgtype.Text
N uint
}
func (q *Queries) TrashList(ctx context.Context, arg TrashListParams) ([]Resource, error) {
exp := pg.From("resources")
if arg.LastTimestamp.Valid {
exp = exp.Where(
goqu.Or(
goqu.I("deleted").Lt(goqu.V(arg.LastTimestamp.Time)),
goqu.And(
goqu.I("deleted").Eq(goqu.V(arg.LastTimestamp.Time)),
goqu.I("id").Lt(goqu.V(arg.LastID)),
)))
} else {
exp = exp.Where(goqu.C("deleted").IsNotNull())
}
var query string
var params []interface{}
query, params, _ = exp.
Order(goqu.C("deleted").Desc()).
OrderAppend(goqu.C("id").Desc()).
Limit(arg.N).
ToSQL()
rows, err := q.db.Query(ctx, query, params...)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Resource
for rows.Next() {
var i Resource
if err := rows.Scan(
&i.ID,
&i.Name,
&i.Parent,
&i.Dir,
&i.Created,
&i.Modified,
&i.Deleted,
&i.ContentLength,
&i.ContentType,
&i.ContentSha256,
&i.Permissions,
&i.Grants,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
func (q *Queries) TrashSummary(ctx context.Context, username pgtype.Text) (int, int, error) {
var whereCond exp.Expression = goqu.C("deleted").IsNotNull()
if username.Valid {
whereCond = goqu.And(whereCond, goqu.L("permissions[?]::INTEGER <> 0", username.String))
}
exp := pg.
From("resources").
Select(
goqu.COALESCE(goqu.SUM("content_length"), 0),
goqu.COUNT("*")).
Where(whereCond)
query, args, _ := exp.ToSQL()
row := d.db.QueryRow(ctx, query, args...)
var size int
var items int
err := row.Scan(&size, &items)
return items, size, err
}

View File

@@ -4,6 +4,7 @@ import (
"errors"
"time"
"github.com/doug-martin/goqu/v9"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/shroff/phylum/server/internal/core/db"
@@ -53,7 +54,7 @@ func (f filesystem) deleteRecursive(id, parent uuid.UUID, softDelete, preserveRo
var ids uuid.UUIDs
err := f.db.WithTx(f.ctx, func(d *db.DbHandler) error {
var err error
if _, ids, err = d.Delete(f.ctx, id, softDelete, preserveRoot); err != nil {
if _, ids, err = f.Delete(id, softDelete, preserveRoot); err != nil {
return err
}
@@ -69,3 +70,37 @@ func (f filesystem) deleteRecursive(id, parent uuid.UUID, softDelete, preserveRo
return err
}
func (f filesystem) Delete(id uuid.UUID, softDelete, excludeRoot bool) (int, []uuid.UUID, error) {
idSelect := pg.From("nodes").Select("id")
if excludeRoot {
idSelect = idSelect.Where(goqu.C("depth").Gt(0))
}
table := goqu.T("resource").As("r")
nodes := goqu.T("nodes")
q := selectResourceTree(pg.From(table), nodes, id, !softDelete).
Where(table.Col("id").Eq(idSelect))
// table
var query string
var params []interface{}
if softDelete {
query, params, _ = q.
Update().
Set(
goqu.Record{
"modified": goqu.L("NOW()"),
"deleted": goqu.L("NOW()"),
}).
Returning(table.Col("id"), table.Col("dir")).
ToSQL()
} else {
query, params, _ = q.
Delete().
Returning(table.Col("id"), table.Col("dir")).
ToSQL()
}
return collectDeletedRows(f.db.Query(f.ctx, query, params...))
}

View File

@@ -4,6 +4,8 @@ import (
"context"
"strings"
"github.com/doug-martin/goqu/v9"
_ "github.com/doug-martin/goqu/v9/dialect/postgres"
"github.com/google/uuid"
"github.com/shroff/phylum/server/internal/core/db"
"github.com/shroff/phylum/server/internal/core/storage"
@@ -19,6 +21,13 @@ const (
ResourceBindConflictResolutionDelete = 4 // Delete existing resource before creating
)
var pg *goqu.Database
func init() {
pg = goqu.New("postgres", nil)
goqu.SetDefaultPrepared(true)
}
type FileSystem interface {
// filesystem.go
RootID() uuid.UUID
@@ -38,7 +47,7 @@ type FileSystem interface {
Search(query string) ([]Resource, error)
// trash.go
TrashList(cursor string, n int) ([]Resource, string, error)
TrashList(cursor string, n uint) ([]Resource, string, error)
TrashSummary() (int, int, error)
TrashEmpty() (int, error)
}

View File

@@ -0,0 +1,30 @@
package fs
import (
"github.com/doug-martin/goqu/v9"
"github.com/doug-martin/goqu/v9/exp"
"github.com/google/uuid"
)
func selectResourceTree(ds *goqu.SelectDataset, nodes exp.IdentifierExpression, id uuid.UUID, includeDeleted bool) *goqu.SelectDataset {
resTable := goqu.T("resources").As("r")
rec := pg.From(resTable).
Select(resTable.Col("id"), resTable.Col("parent"), resTable.Col("deleted"), goqu.L("? + 1", goqu.I("n.depth"))).
Join(nodes.As("n"), goqu.On(goqu.I("r.parent").Eq(goqu.I("n.id"))))
if !includeDeleted {
rec = rec.Where(
goqu.Or(
goqu.I("r.deleted").Eq(goqu.I("n.deleted")),
goqu.And(
goqu.I("r.deleted").IsNull(),
goqu.I("n.deleted").IsNull(),
)))
}
return ds.WithRecursive(nodes.GetTable()+"(id, parent, deleted, depth)",
pg.From(resTable).
Select(resTable.Col("id"), resTable.Col("parent"), resTable.Col("deleted"), goqu.L("0")).
Where(goqu.I("r.id").Eq(id)).
UnionAll(rec))
}

View File

@@ -9,6 +9,7 @@ import (
"strings"
"time"
"github.com/doug-martin/goqu/v9"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
@@ -19,61 +20,79 @@ import (
const trashDuration = 30 * 24 * time.Hour
func (f filesystem) TrashList(cursor string, n int) ([]Resource, string, error) {
func (f filesystem) TrashList(cursor string, n uint) ([]Resource, string, error) {
var username pgtype.Text
if !f.fullAccess {
username.String = f.username
username.Valid = true
}
var lastTimestamp pgtype.Timestamp
var lastID uuid.UUID
q := pg.From("resources").
Where(goqu.C("deleted").IsNotNull())
if !f.fullAccess {
q = q.Where(goqu.L("permissions[?]::INTEGER <> 0", f.username))
}
if cursor != "" {
if d, err := base64.StdEncoding.DecodeString(cursor); err != nil {
return nil, "", err
} else if len(d) != 24 {
return nil, "", fmt.Errorf("illegal cursor. Length %d not expected", len(d))
} else {
lastID, _ = uuid.FromBytes(d[:16])
t := int64(binary.LittleEndian.Uint64(d[16:]))
lastTimestamp.Time = time.Unix(t/1e9, t%1e9).UTC()
lastTimestamp.Valid = true
lastID, _ := uuid.FromBytes(d[:16])
lastTimestamp := time.Unix(t/1e9, t%1e9).UTC()
q = q.Where(
goqu.Or(
goqu.I("deleted").Lt(goqu.V(lastTimestamp)),
goqu.And(
goqu.I("deleted").Eq(goqu.V(lastTimestamp)),
goqu.I("id").Lt(goqu.V(lastID)),
)))
}
}
if res, err := f.db.TrashList(f.ctx, db.TrashListParams{
Username: username,
N: uint(n),
LastTimestamp: lastTimestamp,
LastID: lastID,
}); err != nil {
query, params, _ := q.
Order(goqu.C("deleted").Desc()).
OrderAppend(goqu.C("id").Desc()).
Limit(n).
ToSQL()
if rows, err := f.db.Query(f.ctx, query, params...); err != nil {
return nil, "", err
} else if res, err := pgx.CollectRows(rows, scanResourceRow); err != nil {
return nil, "", err
} else {
if res == nil {
return nil, "", nil
}
deleted := make([]Resource, len(res))
for i, r := range res {
deleted[i] = ResourceFromDB(r)
}
last := deleted[len(deleted)-1]
last := res[len(res)-1]
c := make([]byte, 24)
b, _ := last.id.MarshalBinary()
copy(c, b)
binary.LittleEndian.PutUint64(c[16:], uint64(last.deleted.UnixNano()))
return deleted, base64.StdEncoding.EncodeToString(c), nil
return res, base64.StdEncoding.EncodeToString(c), nil
}
}
func (f filesystem) TrashSummary() (int, int, error) {
var username pgtype.Text
if !f.fullAccess {
username.String = f.username
username.Valid = true
}
return f.db.TrashSummary(f.ctx, username)
func scanResourceRow(row pgx.CollectableRow) (Resource, error) {
var r Resource
err := row.Scan(
&r.id,
&r.name,
&r.parentID,
&r.dir,
&r.created,
&r.modified,
&r.deleted,
&r.contentLength,
&r.contentType,
&r.contentSHA256,
&r.permissions,
&r.grants,
)
return r, err
}
// RestoreDeleted restores a previously deleted resources
@@ -146,7 +165,7 @@ func (r Resource) Restore(parentPathOrUUID string, name string, autoRename bool)
r.parentID = update.Parent
}
}
if del, err := f.db.RestoreDeleted(f.ctx, r.id); err != nil {
if del, err := f.markNotDeleted(f.ctx, r.id); err != nil {
return err
} else {
count = len(del)
@@ -161,6 +180,39 @@ func (r Resource) Restore(parentPathOrUUID string, name string, autoRename bool)
return
}
func (f filesystem) markNotDeleted(ctx context.Context, id uuid.UUID) ([]int64, error) {
table := goqu.T("resources").As("r")
nodes := goqu.T("nodes")
q := selectResourceTree(pg.From(table), nodes, id, false).
Where(table.Col("id").Eq(pg.Select("id").From(nodes))).
Update().Set(
goqu.Record{
"modified": goqu.L("NOW()"),
"deleted": nil,
}).
Returning(table.Col("content_length"))
query, params, _ := q.ToSQL()
rows, err := f.db.Query(ctx, query, params...)
if err != nil {
return nil, err
}
defer rows.Close()
var items []int64
for rows.Next() {
var content_length int64
if err := rows.Scan(&content_length); err != nil {
return nil, err
}
items = append(items, content_length)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
func SetupTrashCompactor() {
ticker := time.NewTimer(24 * time.Hour)
go func() {
@@ -182,8 +234,13 @@ func TrashCompact() {
}
func trashCompact(t time.Time) (int, error) {
d := db.Get()
count, ids, err := d.TrashCompact(context.Background(), pgtype.Timestamp{Valid: true, Time: t.UTC()})
q := pg.
From("resources").
Where(goqu.C("deleted").Lt(goqu.V(t.UTC()))).
Delete().
Returning("id", "dir")
query, args, _ := q.ToSQL()
count, ids, err := collectDeletedRows(db.Get().Query(context.Background(), query, args...))
if err != nil {
return 0, err
}
@@ -194,14 +251,56 @@ func trashCompact(t time.Time) (int, error) {
return count, err
}
func (f filesystem) TrashSummary() (int, int, error) {
q := f.selectDeletedResources().
Select(
goqu.COALESCE(goqu.SUM("content_length"), 0),
goqu.COUNT("*"))
query, args, _ := q.ToSQL()
row := f.db.QueryRow(f.ctx, query, args...)
var size int
var items int
err := row.Scan(&size, &items)
return items, size, err
}
func (f filesystem) TrashEmpty() (int, error) {
var username pgtype.Text
if !f.fullAccess {
username.String = f.username
username.Valid = true
}
d := db.Get()
count, ids, err := d.EmptyTrash(f.ctx, username)
q := f.selectDeletedResources().
Delete().
Returning("id", "dir")
query, args, _ := q.ToSQL()
count, ids, err := collectDeletedRows(f.db.Query(f.ctx, query, args...))
storage.Get().DeleteAll(ids)
return count, err
}
func (f filesystem) selectDeletedResources() *goqu.SelectDataset {
q := pg.
From("resources").
Where(goqu.C("deleted").IsNotNull())
if !f.fullAccess {
q = q.Where(goqu.L("permissions[?]::INTEGER <> 0", f.username))
}
return q
}
func collectDeletedRows(rows pgx.Rows, e error) (count int, ids []uuid.UUID, err error) {
if e != nil {
return 0, nil, e
}
defer rows.Close()
var id uuid.UUID
var dir bool
for rows.Next() {
if err := rows.Scan(&id, &dir); err != nil {
return count, nil, err
}
if !dir {
ids = append(ids, id)
}
count++
}
err = rows.Err()
return count, ids, err
}