mirror of
https://codeberg.org/shroff/phylum.git
synced 2026-05-06 12:19:35 -05:00
[server] Delete version contents job
This commit is contained in:
@@ -0,0 +1,50 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"github.com/google/uuid"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/shroff/phylum/server/internal/db"
|
||||
"github.com/shroff/phylum/server/internal/storage"
|
||||
)
|
||||
|
||||
type versionInfo struct {
|
||||
id uuid.UUID
|
||||
storage string
|
||||
}
|
||||
|
||||
func deleteAllVersionContents(db db.Handler, ids uuid.UUIDs) error {
|
||||
const q = `SELECT v.id, v.storage FROM resources r
|
||||
JOIN resource_versions v ON r.id = v.resource_id
|
||||
WHERE r.id = ANY ($1::UUID[])`
|
||||
if rows, err := db.Query(q, ids); err != nil {
|
||||
return err
|
||||
} else if versions, err := pgx.CollectRows(rows, scanDeletedVersion); err != nil {
|
||||
return err
|
||||
} else {
|
||||
return deleteVersionContents(versions)
|
||||
}
|
||||
}
|
||||
|
||||
func deleteVersionContents(versions []versionInfo) error {
|
||||
idsPerBackend := make(map[string][]string)
|
||||
for _, v := range versions {
|
||||
idsPerBackend[v.storage] = append(idsPerBackend[v.storage], v.id.String())
|
||||
}
|
||||
for k, v := range idsPerBackend {
|
||||
if backend, err := storage.GetBackend(k); err != nil {
|
||||
return err
|
||||
} else {
|
||||
backend.DeleteAll(v)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func scanDeletedVersion(row pgx.CollectableRow) (versionInfo, error) {
|
||||
var v versionInfo
|
||||
err := row.Scan(
|
||||
&v.id,
|
||||
&v.storage,
|
||||
)
|
||||
return v, err
|
||||
}
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func moveVersionToTargetBackend(db db.Handler, versionID uuid.UUID) {
|
||||
func migrateVersionContents(db db.Handler, versionID uuid.UUID) {
|
||||
if err := moveVersionToTargetBackendErr(db, versionID); err != nil {
|
||||
logrus.Warn("failed to move " + versionID.String() + " to target storage: " + err.Error())
|
||||
}
|
||||
@@ -175,7 +175,7 @@ func (f filesystem) deleteRecursive(id, parent uuid.UUID, softDelete, preserveRo
|
||||
})
|
||||
|
||||
if err == nil && !softDelete {
|
||||
deleteResourcesContents(ids)
|
||||
go deleteAllVersionContents(f.db, ids)
|
||||
}
|
||||
|
||||
return err
|
||||
@@ -256,14 +256,3 @@ func collectNonDirResourceIDs(rows pgx.Rows, e error) (total int, ids []uuid.UUI
|
||||
err = rows.Err()
|
||||
return total, ids, err
|
||||
}
|
||||
|
||||
func deleteResourcesContents(id uuid.UUIDs) {
|
||||
// TODO: #implement
|
||||
}
|
||||
|
||||
func deleteResourceContents(id uuid.UUID) {
|
||||
// TODO: #implement
|
||||
}
|
||||
func deleteVersionContents(id uuid.UUID) {
|
||||
// TODO: #implement
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ func (f filesystem) OpenWrite(r Resource, versionID uuid.UUID) (io.WriteCloser,
|
||||
return err
|
||||
}
|
||||
|
||||
go moveVersionToTargetBackend(db.Get(context.Background()), versionID)
|
||||
go migrateVersionContents(db.Get(context.Background()), versionID)
|
||||
return nil
|
||||
}), nil
|
||||
|
||||
|
||||
@@ -108,7 +108,7 @@ func (f filesystem) TrashEmpty() (int, error) {
|
||||
Returning("id", "dir")
|
||||
query, args, _ := q.ToSQL()
|
||||
count, ids, err := collectNonDirResourceIDs(f.db.Query(query, args...))
|
||||
deleteResourcesContents(ids)
|
||||
deleteAllVersionContents(f.db, ids)
|
||||
return count, err
|
||||
}
|
||||
|
||||
@@ -129,10 +129,11 @@ func hardDeleteOldResources(ctx context.Context, t time.Time) (int, error) {
|
||||
Delete().
|
||||
Returning("id", "dir")
|
||||
query, args, _ := q.ToSQL()
|
||||
count, ids, err := collectNonDirResourceIDs(db.Get(ctx).Query(query, args...))
|
||||
db := db.Get(ctx)
|
||||
count, ids, err := collectNonDirResourceIDs(db.Query(query, args...))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
deleteResourcesContents(ids)
|
||||
deleteAllVersionContents(db, ids)
|
||||
return count, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user