From ab065b3e2ca76dbd54c393726f3b149ad7f06dde Mon Sep 17 00:00:00 2001 From: Abhishek Shroff Date: Sun, 23 Mar 2025 21:50:17 +0530 Subject: [PATCH] [server][core][fs] Call db directly --- server/internal/command/trash/list.go | 2 +- server/internal/core/db/db_handler.go | 27 ++-- server/internal/core/db/schema.go | 11 +- server/internal/core/db/trash.sql.go | 221 -------------------------- server/internal/core/fs/delete.go | 37 ++++- server/internal/core/fs/fs.go | 11 +- server/internal/core/fs/sql_common.go | 30 ++++ server/internal/core/fs/trash.go | 169 ++++++++++++++++---- 8 files changed, 232 insertions(+), 276 deletions(-) delete mode 100644 server/internal/core/db/trash.sql.go create mode 100644 server/internal/core/fs/sql_common.go diff --git a/server/internal/command/trash/list.go b/server/internal/command/trash/list.go index b0f08482..f1a3fc79 100644 --- a/server/internal/command/trash/list.go +++ b/server/internal/command/trash/list.go @@ -15,7 +15,7 @@ func setupListCommand() *cobra.Command { Args: cobra.NoArgs, Run: func(cmd *cobra.Command, args []string) { f := common.UserFileSystem(cmd) - n, _ := cmd.Flags().GetInt("num") + n, _ := cmd.Flags().GetUint("num") cursor, _ := cmd.Flags().GetString("cursor") deleted, cursor, err := f.TrashList(cursor, n) if err != nil { diff --git a/server/internal/core/db/db_handler.go b/server/internal/core/db/db_handler.go index 4e2a2c1e..b527b6ad 100644 --- a/server/internal/core/db/db_handler.go +++ b/server/internal/core/db/db_handler.go @@ -5,9 +5,8 @@ import ( "errors" "fmt" - "github.com/doug-martin/goqu/v9" - _ "github.com/doug-martin/goqu/v9/dialect/postgres" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" "github.com/sirupsen/logrus" ) @@ -31,13 +30,6 @@ var DatabaseURL string var TraceSQL bool var AutoMigrate bool -var pg *goqu.Database - -func init() { - pg = goqu.New("postgres", nil) - goqu.SetDefaultPrepared(true) -} - func Get() *DbHandler { if d == nil { if db, err := create(context.Background(), DatabaseURL, TraceSQL, AutoMigrate); err != nil { @@ -96,7 +88,18 @@ func (d DbHandler) WithTx(ctx context.Context, fn func(*DbHandler) error) error }) } -func (d DbHandler) Exec(ctx context.Context, stmt string, args ...any) error { - _, err := d.Queries.db.Exec(ctx, stmt, args...) - return err +func (d DbHandler) Exec(ctx context.Context, stmt string, args ...interface{}) (pgconn.CommandTag, error) { + return d.Queries.db.Exec(ctx, stmt, args...) +} + +func (d DbHandler) Query(ctx context.Context, stmt string, args ...interface{}) (pgx.Rows, error) { + return d.Queries.db.Query(ctx, stmt, args...) +} + +func (d DbHandler) QueryRow(ctx context.Context, stmt string, args ...interface{}) pgx.Row { + return d.Queries.db.QueryRow(ctx, stmt, args...) +} + +func (d DbHandler) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) { + return d.Queries.db.CopyFrom(ctx, tableName, columnNames, rowSrc) } diff --git a/server/internal/core/db/schema.go b/server/internal/core/db/schema.go index 0e628794..d1ecc530 100644 --- a/server/internal/core/db/schema.go +++ b/server/internal/core/db/schema.go @@ -80,18 +80,19 @@ func (d DbHandler) Migrate(ctx context.Context, version int) error { func (d DbHandler) DeleteSchema(ctx context.Context) error { return d.WithTx(ctx, func(d *DbHandler) (err error) { user := d.pool.Config().ConnConfig.User - if err = d.Exec(ctx, "DROP SCHEMA public CASCADE"); err != nil { + if _, err = d.Exec(ctx, "DROP SCHEMA public CASCADE"); err != nil { return } - if err = d.Exec(ctx, "CREATE SCHEMA public"); err != nil { + if _, err = d.Exec(ctx, "CREATE SCHEMA public"); err != nil { return } - if err = d.Exec(ctx, "GRANT ALL ON SCHEMA public TO "+user); err != nil { + if _, err = d.Exec(ctx, "GRANT ALL ON SCHEMA public TO "+user); err != nil { return } - if err = d.Exec(ctx, "GRANT ALL ON SCHEMA public TO public"); err != nil { + if _, err = d.Exec(ctx, "GRANT ALL ON SCHEMA public TO public"); err != nil { return } - return d.Exec(ctx, "COMMENT ON SCHEMA public IS 'standard public schema'") + _, err = d.Exec(ctx, "COMMENT ON SCHEMA public IS 'standard public schema'") + return }) } diff --git a/server/internal/core/db/trash.sql.go b/server/internal/core/db/trash.sql.go deleted file mode 100644 index 024b30a4..00000000 --- a/server/internal/core/db/trash.sql.go +++ /dev/null @@ -1,221 +0,0 @@ -package db - -import ( - "context" - - "github.com/doug-martin/goqu/v9" - "github.com/doug-martin/goqu/v9/exp" - "github.com/google/uuid" - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgtype" -) - -const emptyTrash = `DELETE FROM resources - WHERE deleted IS NOT NULL - AND CASE - WHEN $1::TEXT IS NULL THEN TRUE - ELSE permissions[$1::TEXT]::integer & 32 <> 0 END - RETURNING id, dir -` - -func (q *Queries) EmptyTrash(ctx context.Context, username pgtype.Text) (count int, ids []uuid.UUID, err error) { - return collectDeletedRows(q.db.Query(ctx, emptyTrash, username)) -} - -func selectDescendentsCTE(id uuid.UUID, includeDeleted bool) (exp.AliasedExpression, *goqu.SelectDataset) { - resTable := goqu.T("resources").As("r") - rec := pg.From(resTable). - Select(resTable.Col("id"), resTable.Col("parent"), resTable.Col("deleted"), goqu.L("? + 1", goqu.I("n.depth"))). - Join(goqu.T("nodes").As("n"), goqu.On(goqu.I("r.parent").Eq(goqu.I("n.id")))) - - if !includeDeleted { - rec = rec.Where( - goqu.Or( - goqu.I("r.deleted").Eq(goqu.I("n.deleted")), - goqu.And( - goqu.I("r.deleted").IsNull(), - goqu.I("n.deleted").IsNull(), - ))) - } - - return resTable, pg.From(resTable).WithRecursive("nodes(id, parent, deleted, depth)", - pg.From(resTable). - Select(resTable.Col("id"), resTable.Col("parent"), resTable.Col("deleted"), goqu.L("0")). - Where(goqu.I("r.id").Eq(id)). - UnionAll(rec)) -} - -func (q *Queries) Delete(ctx context.Context, id uuid.UUID, softDelete, excludeRoot bool) (int, []uuid.UUID, error) { - table, exp := selectDescendentsCTE(id, !softDelete) - - idSelect := pg.From("nodes").Select("id") - if excludeRoot { - idSelect = idSelect.Where(goqu.C("depth").Gt(0)) - } - exp = exp.Where(table.Col("id").Eq(idSelect)) - - // table - var query string - var params []interface{} - if softDelete { - query, params, _ = exp. - Update(). - Set( - goqu.Record{ - "modified": goqu.L("NOW()"), - "deleted": goqu.L("NOW()"), - }). - Returning(table.Col("id"), table.Col("dir")). - ToSQL() - } else { - query, params, _ = exp. - Delete(). - Returning(table.Col("id"), table.Col("dir")). - ToSQL() - } - - return collectDeletedRows(q.db.Query(ctx, query, params...)) -} - -func collectDeletedRows(rows pgx.Rows, e error) (count int, ids []uuid.UUID, err error) { - if e != nil { - return 0, nil, e - } - defer rows.Close() - var id uuid.UUID - var dir bool - for rows.Next() { - if err := rows.Scan(&id, &dir); err != nil { - return count, nil, err - } - if !dir { - ids = append(ids, id) - } - count++ - } - err = rows.Err() - return count, ids, err -} - -func (q *Queries) RestoreDeleted(ctx context.Context, id uuid.UUID) ([]int64, error) { - table, exp := selectDescendentsCTE(id, false) - - idSelect := pg.From("nodes").Select("id") - exp = exp.Where(table.Col("id").Eq(idSelect)) - - // table - var query string - var params []interface{} - query, params, _ = exp. - Update(). - Set( - goqu.Record{ - "modified": goqu.L("NOW()"), - "deleted": nil, - }). - Returning(table.Col("content_length")). - ToSQL() - - rows, err := q.db.Query(ctx, query, params...) - if err != nil { - return nil, err - } - defer rows.Close() - var items []int64 - for rows.Next() { - var content_length int64 - if err := rows.Scan(&content_length); err != nil { - return nil, err - } - items = append(items, content_length) - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -func (q *Queries) TrashCompact(ctx context.Context, t pgtype.Timestamp) (int, uuid.UUIDs, error) { - return collectDeletedRows(q.db.Query(ctx, "DELETE FROM resources WHERE deleted < $1::TIMESTAMP RETURNING id, dir", t)) -} - -type TrashListParams struct { - LastTimestamp pgtype.Timestamp - LastID uuid.UUID - Username pgtype.Text - N uint -} - -func (q *Queries) TrashList(ctx context.Context, arg TrashListParams) ([]Resource, error) { - exp := pg.From("resources") - if arg.LastTimestamp.Valid { - exp = exp.Where( - goqu.Or( - goqu.I("deleted").Lt(goqu.V(arg.LastTimestamp.Time)), - goqu.And( - goqu.I("deleted").Eq(goqu.V(arg.LastTimestamp.Time)), - goqu.I("id").Lt(goqu.V(arg.LastID)), - ))) - } else { - exp = exp.Where(goqu.C("deleted").IsNotNull()) - } - - var query string - var params []interface{} - query, params, _ = exp. - Order(goqu.C("deleted").Desc()). - OrderAppend(goqu.C("id").Desc()). - Limit(arg.N). - ToSQL() - - rows, err := q.db.Query(ctx, query, params...) - if err != nil { - return nil, err - } - defer rows.Close() - var items []Resource - for rows.Next() { - var i Resource - if err := rows.Scan( - &i.ID, - &i.Name, - &i.Parent, - &i.Dir, - &i.Created, - &i.Modified, - &i.Deleted, - &i.ContentLength, - &i.ContentType, - &i.ContentSha256, - &i.Permissions, - &i.Grants, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -func (q *Queries) TrashSummary(ctx context.Context, username pgtype.Text) (int, int, error) { - var whereCond exp.Expression = goqu.C("deleted").IsNotNull() - if username.Valid { - whereCond = goqu.And(whereCond, goqu.L("permissions[?]::INTEGER <> 0", username.String)) - } - exp := pg. - From("resources"). - Select( - goqu.COALESCE(goqu.SUM("content_length"), 0), - goqu.COUNT("*")). - Where(whereCond) - - query, args, _ := exp.ToSQL() - row := d.db.QueryRow(ctx, query, args...) - var size int - var items int - err := row.Scan(&size, &items) - return items, size, err -} diff --git a/server/internal/core/fs/delete.go b/server/internal/core/fs/delete.go index 1eddc7ae..daf33e97 100644 --- a/server/internal/core/fs/delete.go +++ b/server/internal/core/fs/delete.go @@ -4,6 +4,7 @@ import ( "errors" "time" + "github.com/doug-martin/goqu/v9" "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/shroff/phylum/server/internal/core/db" @@ -53,7 +54,7 @@ func (f filesystem) deleteRecursive(id, parent uuid.UUID, softDelete, preserveRo var ids uuid.UUIDs err := f.db.WithTx(f.ctx, func(d *db.DbHandler) error { var err error - if _, ids, err = d.Delete(f.ctx, id, softDelete, preserveRoot); err != nil { + if _, ids, err = f.Delete(id, softDelete, preserveRoot); err != nil { return err } @@ -69,3 +70,37 @@ func (f filesystem) deleteRecursive(id, parent uuid.UUID, softDelete, preserveRo return err } + +func (f filesystem) Delete(id uuid.UUID, softDelete, excludeRoot bool) (int, []uuid.UUID, error) { + idSelect := pg.From("nodes").Select("id") + if excludeRoot { + idSelect = idSelect.Where(goqu.C("depth").Gt(0)) + } + + table := goqu.T("resource").As("r") + nodes := goqu.T("nodes") + q := selectResourceTree(pg.From(table), nodes, id, !softDelete). + Where(table.Col("id").Eq(idSelect)) + + // table + var query string + var params []interface{} + if softDelete { + query, params, _ = q. + Update(). + Set( + goqu.Record{ + "modified": goqu.L("NOW()"), + "deleted": goqu.L("NOW()"), + }). + Returning(table.Col("id"), table.Col("dir")). + ToSQL() + } else { + query, params, _ = q. + Delete(). + Returning(table.Col("id"), table.Col("dir")). + ToSQL() + } + + return collectDeletedRows(f.db.Query(f.ctx, query, params...)) +} diff --git a/server/internal/core/fs/fs.go b/server/internal/core/fs/fs.go index 4936e5e9..96fb4e83 100644 --- a/server/internal/core/fs/fs.go +++ b/server/internal/core/fs/fs.go @@ -4,6 +4,8 @@ import ( "context" "strings" + "github.com/doug-martin/goqu/v9" + _ "github.com/doug-martin/goqu/v9/dialect/postgres" "github.com/google/uuid" "github.com/shroff/phylum/server/internal/core/db" "github.com/shroff/phylum/server/internal/core/storage" @@ -19,6 +21,13 @@ const ( ResourceBindConflictResolutionDelete = 4 // Delete existing resource before creating ) +var pg *goqu.Database + +func init() { + pg = goqu.New("postgres", nil) + goqu.SetDefaultPrepared(true) +} + type FileSystem interface { // filesystem.go RootID() uuid.UUID @@ -38,7 +47,7 @@ type FileSystem interface { Search(query string) ([]Resource, error) // trash.go - TrashList(cursor string, n int) ([]Resource, string, error) + TrashList(cursor string, n uint) ([]Resource, string, error) TrashSummary() (int, int, error) TrashEmpty() (int, error) } diff --git a/server/internal/core/fs/sql_common.go b/server/internal/core/fs/sql_common.go new file mode 100644 index 00000000..1b941b5f --- /dev/null +++ b/server/internal/core/fs/sql_common.go @@ -0,0 +1,30 @@ +package fs + +import ( + "github.com/doug-martin/goqu/v9" + "github.com/doug-martin/goqu/v9/exp" + "github.com/google/uuid" +) + +func selectResourceTree(ds *goqu.SelectDataset, nodes exp.IdentifierExpression, id uuid.UUID, includeDeleted bool) *goqu.SelectDataset { + resTable := goqu.T("resources").As("r") + rec := pg.From(resTable). + Select(resTable.Col("id"), resTable.Col("parent"), resTable.Col("deleted"), goqu.L("? + 1", goqu.I("n.depth"))). + Join(nodes.As("n"), goqu.On(goqu.I("r.parent").Eq(goqu.I("n.id")))) + + if !includeDeleted { + rec = rec.Where( + goqu.Or( + goqu.I("r.deleted").Eq(goqu.I("n.deleted")), + goqu.And( + goqu.I("r.deleted").IsNull(), + goqu.I("n.deleted").IsNull(), + ))) + } + + return ds.WithRecursive(nodes.GetTable()+"(id, parent, deleted, depth)", + pg.From(resTable). + Select(resTable.Col("id"), resTable.Col("parent"), resTable.Col("deleted"), goqu.L("0")). + Where(goqu.I("r.id").Eq(id)). + UnionAll(rec)) +} diff --git a/server/internal/core/fs/trash.go b/server/internal/core/fs/trash.go index b8744249..6d911e09 100644 --- a/server/internal/core/fs/trash.go +++ b/server/internal/core/fs/trash.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/doug-martin/goqu/v9" "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" @@ -19,61 +20,79 @@ import ( const trashDuration = 30 * 24 * time.Hour -func (f filesystem) TrashList(cursor string, n int) ([]Resource, string, error) { +func (f filesystem) TrashList(cursor string, n uint) ([]Resource, string, error) { var username pgtype.Text if !f.fullAccess { username.String = f.username username.Valid = true } - var lastTimestamp pgtype.Timestamp - var lastID uuid.UUID + q := pg.From("resources"). + Where(goqu.C("deleted").IsNotNull()) + if !f.fullAccess { + q = q.Where(goqu.L("permissions[?]::INTEGER <> 0", f.username)) + } if cursor != "" { if d, err := base64.StdEncoding.DecodeString(cursor); err != nil { return nil, "", err } else if len(d) != 24 { return nil, "", fmt.Errorf("illegal cursor. Length %d not expected", len(d)) } else { - lastID, _ = uuid.FromBytes(d[:16]) t := int64(binary.LittleEndian.Uint64(d[16:])) - lastTimestamp.Time = time.Unix(t/1e9, t%1e9).UTC() - lastTimestamp.Valid = true + lastID, _ := uuid.FromBytes(d[:16]) + lastTimestamp := time.Unix(t/1e9, t%1e9).UTC() + q = q.Where( + goqu.Or( + goqu.I("deleted").Lt(goqu.V(lastTimestamp)), + goqu.And( + goqu.I("deleted").Eq(goqu.V(lastTimestamp)), + goqu.I("id").Lt(goqu.V(lastID)), + ))) } } - if res, err := f.db.TrashList(f.ctx, db.TrashListParams{ - Username: username, - N: uint(n), - LastTimestamp: lastTimestamp, - LastID: lastID, - }); err != nil { + query, params, _ := q. + Order(goqu.C("deleted").Desc()). + OrderAppend(goqu.C("id").Desc()). + Limit(n). + ToSQL() + + if rows, err := f.db.Query(f.ctx, query, params...); err != nil { + return nil, "", err + } else if res, err := pgx.CollectRows(rows, scanResourceRow); err != nil { return nil, "", err } else { if res == nil { return nil, "", nil } - deleted := make([]Resource, len(res)) - for i, r := range res { - deleted[i] = ResourceFromDB(r) - } - last := deleted[len(deleted)-1] + last := res[len(res)-1] c := make([]byte, 24) b, _ := last.id.MarshalBinary() copy(c, b) binary.LittleEndian.PutUint64(c[16:], uint64(last.deleted.UnixNano())) - return deleted, base64.StdEncoding.EncodeToString(c), nil + return res, base64.StdEncoding.EncodeToString(c), nil } } -func (f filesystem) TrashSummary() (int, int, error) { - var username pgtype.Text - if !f.fullAccess { - username.String = f.username - username.Valid = true - } - return f.db.TrashSummary(f.ctx, username) +func scanResourceRow(row pgx.CollectableRow) (Resource, error) { + var r Resource + err := row.Scan( + &r.id, + &r.name, + &r.parentID, + &r.dir, + &r.created, + &r.modified, + &r.deleted, + &r.contentLength, + &r.contentType, + &r.contentSHA256, + &r.permissions, + &r.grants, + ) + return r, err } // RestoreDeleted restores a previously deleted resources @@ -146,7 +165,7 @@ func (r Resource) Restore(parentPathOrUUID string, name string, autoRename bool) r.parentID = update.Parent } } - if del, err := f.db.RestoreDeleted(f.ctx, r.id); err != nil { + if del, err := f.markNotDeleted(f.ctx, r.id); err != nil { return err } else { count = len(del) @@ -161,6 +180,39 @@ func (r Resource) Restore(parentPathOrUUID string, name string, autoRename bool) return } +func (f filesystem) markNotDeleted(ctx context.Context, id uuid.UUID) ([]int64, error) { + table := goqu.T("resources").As("r") + nodes := goqu.T("nodes") + q := selectResourceTree(pg.From(table), nodes, id, false). + Where(table.Col("id").Eq(pg.Select("id").From(nodes))). + Update().Set( + goqu.Record{ + "modified": goqu.L("NOW()"), + "deleted": nil, + }). + Returning(table.Col("content_length")) + + query, params, _ := q.ToSQL() + + rows, err := f.db.Query(ctx, query, params...) + if err != nil { + return nil, err + } + defer rows.Close() + var items []int64 + for rows.Next() { + var content_length int64 + if err := rows.Scan(&content_length); err != nil { + return nil, err + } + items = append(items, content_length) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + func SetupTrashCompactor() { ticker := time.NewTimer(24 * time.Hour) go func() { @@ -182,8 +234,13 @@ func TrashCompact() { } func trashCompact(t time.Time) (int, error) { - d := db.Get() - count, ids, err := d.TrashCompact(context.Background(), pgtype.Timestamp{Valid: true, Time: t.UTC()}) + q := pg. + From("resources"). + Where(goqu.C("deleted").Lt(goqu.V(t.UTC()))). + Delete(). + Returning("id", "dir") + query, args, _ := q.ToSQL() + count, ids, err := collectDeletedRows(db.Get().Query(context.Background(), query, args...)) if err != nil { return 0, err } @@ -194,14 +251,56 @@ func trashCompact(t time.Time) (int, error) { return count, err } +func (f filesystem) TrashSummary() (int, int, error) { + q := f.selectDeletedResources(). + Select( + goqu.COALESCE(goqu.SUM("content_length"), 0), + goqu.COUNT("*")) + + query, args, _ := q.ToSQL() + row := f.db.QueryRow(f.ctx, query, args...) + var size int + var items int + err := row.Scan(&size, &items) + return items, size, err +} + func (f filesystem) TrashEmpty() (int, error) { - var username pgtype.Text - if !f.fullAccess { - username.String = f.username - username.Valid = true - } - d := db.Get() - count, ids, err := d.EmptyTrash(f.ctx, username) + q := f.selectDeletedResources(). + Delete(). + Returning("id", "dir") + query, args, _ := q.ToSQL() + count, ids, err := collectDeletedRows(f.db.Query(f.ctx, query, args...)) storage.Get().DeleteAll(ids) return count, err } + +func (f filesystem) selectDeletedResources() *goqu.SelectDataset { + q := pg. + From("resources"). + Where(goqu.C("deleted").IsNotNull()) + if !f.fullAccess { + q = q.Where(goqu.L("permissions[?]::INTEGER <> 0", f.username)) + } + return q +} + +func collectDeletedRows(rows pgx.Rows, e error) (count int, ids []uuid.UUID, err error) { + if e != nil { + return 0, nil, e + } + defer rows.Close() + var id uuid.UUID + var dir bool + for rows.Next() { + if err := rows.Scan(&id, &dir); err != nil { + return count, nil, err + } + if !dir { + ids = append(ids, id) + } + count++ + } + err = rows.Err() + return count, ids, err +}