From b5d842c01d64356f1e3f2cdfeb2a880d07315443 Mon Sep 17 00:00:00 2001 From: Abhishek Shroff Date: Wed, 21 Jan 2026 09:45:03 +0530 Subject: [PATCH] [server] WIP: partial uploads --- server/internal/api/v1/fs/create_file.go | 93 ------------ server/internal/api/v1/fs/routes.go | 6 +- server/internal/api/v1/fs/upload_partial.go | 90 +++++++++++ server/internal/core/errors.go | 3 + server/internal/core/partial_uploads.go | 143 ++++++++++++++++++ .../db/migrations/027_partial_uploads.sql | 13 ++ 6 files changed, 254 insertions(+), 94 deletions(-) delete mode 100644 server/internal/api/v1/fs/create_file.go create mode 100644 server/internal/api/v1/fs/upload_partial.go create mode 100644 server/internal/core/partial_uploads.go create mode 100644 server/internal/db/migrations/027_partial_uploads.sql diff --git a/server/internal/api/v1/fs/create_file.go b/server/internal/api/v1/fs/create_file.go deleted file mode 100644 index 0b6c5e98..00000000 --- a/server/internal/api/v1/fs/create_file.go +++ /dev/null @@ -1,93 +0,0 @@ -package fs - -import ( - "errors" - "io" - "net/http" - - "codeberg.org/shroff/phylum/server/internal/api/authenticator" - "codeberg.org/shroff/phylum/server/internal/api/v1/responses" - "codeberg.org/shroff/phylum/server/internal/core" - "github.com/gin-gonic/gin" - "github.com/google/uuid" -) - -type createFileParams struct { - Path string `json:"path" form:"path" binding:"required"` - ID string `json:"id" form:"id" binding:"omitempty,uuid"` - VersionID string `json:"version_id" form:"version_id" binding:"omitempty,uuid"` - CreateParents bool `json:"create_parents" form:"create_parents"` - Conflict core.ResourceBindConflictResolution `json:"conflict" form:"conflict"` - SHA256 string `json:"sha256" form:"sha256"` -} - -func handleCreateFileRequest(c *gin.Context) { - var params createFileParams - err := c.ShouldBindQuery(¶ms) - if err != nil { - panic(err) - } - var id uuid.UUID - var versionID uuid.UUID - if params.ID != "" { - id, err = uuid.Parse(params.ID) - if err != nil { - panic(err) - } - } - if params.VersionID != "" { - versionID, err = uuid.Parse(params.VersionID) - if err != nil { - panic(err) - } - } - - // TODO: Calculate and verify sha sum - - f := authenticator.GetFileSystem(c) - - file, err := c.FormFile("contents") - if err != nil { - if !errors.Is(err, http.ErrMissingFile) { - panic(err) - } - - r, err := f.CreateResourceByPath(params.Path, id, false, false, params.Conflict) - if err != nil && !errors.Is(err, core.ErrIDConflict) { - panic(err) - } - c.JSON(200, responses.ResourceFromFS(r)) - return - } - - err = func() error { - // TODO: #perf disk I/O in tx - src, err := file.Open() - if err != nil { - return err - } - defer src.Close() - - out, err := f.CreateFileByPath(params.Path, id, versionID, params.Conflict) - if err != nil { - return err - } - - if _, err := io.Copy(out, src); err != nil { - out.Close() - return err - } else { - return out.Close() - } - }() - if err != nil && !errors.Is(err, core.ErrIDConflict) { - panic(err) - } - - // id may have changed if this is an overwrite - r, err := f.ResourceByPathWithRoot(params.Path) - if err != nil { - panic(err) - } - c.JSON(200, responses.ResourceFromFS(r)) -} diff --git a/server/internal/api/v1/fs/routes.go b/server/internal/api/v1/fs/routes.go index 534975ca..4dc6f298 100644 --- a/server/internal/api/v1/fs/routes.go +++ b/server/internal/api/v1/fs/routes.go @@ -27,8 +27,12 @@ func SetupRoutes(r *gin.RouterGroup) { group.POST("/delete_version", handleDeleteVersionRequest) group.POST("/mkdir", handleMkdirRequest) - group.POST("/create_file", handleCreateFileRequest) group.PUT("/upload", handleUploadRequest) + partialUploadsGroup := group.Group("/partial_uploads") + partialUploadsGroup.POST("/start", handlePartialUploadsCreateRequest) + partialUploadsGroup.PUT("/upload", handlePartialUploadsCreateRequest) + partialUploadsGroup.POST("/finalize", handlePartialUploadsCreateRequest) + group.GET("/search", handleSearchRequest) } diff --git a/server/internal/api/v1/fs/upload_partial.go b/server/internal/api/v1/fs/upload_partial.go new file mode 100644 index 00000000..ce778952 --- /dev/null +++ b/server/internal/api/v1/fs/upload_partial.go @@ -0,0 +1,90 @@ +package fs + +import ( + "io" + "net/http" + + "codeberg.org/shroff/phylum/server/internal/api/authenticator" + "github.com/gin-gonic/gin" + "github.com/google/uuid" +) + +type partialUploadsCreateParams struct { + Size int64 `json:"size"` + SHA256 string `json:"sha256" form:"sha256"` +} + +type partialUploadsUploadParams struct { + ID string `json:"id" form:"id" binding:"required,uuid"` + Offset int64 `json:"offset" form:"offset" binding:"required"` + ChunkSHA256 int `json:"chunk_sha256" form:"chunk_sha256"` +} + +func handlePartialUploadsCreateRequest(c *gin.Context) { + var params partialUploadsCreateParams + err := c.ShouldBindQuery(¶ms) + if err != nil { + panic(err) + } + + userID := authenticator.GetAuth(c).UserID() + f := authenticator.GetFileSystem(c) + uploadID, err := f.CreatePartialUpload(userID, params.Size, params.SHA256) + if err != nil { + panic(err) + } + + c.JSON(200, gin.H{ + "upload_id": uploadID, + }) +} + +func handlePartialUploadsUploadRequest(c *gin.Context) { + var params partialUploadsUploadParams + err := c.ShouldBindQuery(¶ms) + if err != nil { + panic(err) + } + + id, err := uuid.Parse(params.ID) + if err != nil { + panic(err) + } + + f := authenticator.GetFileSystem(c) + err = func() error { + file, err := c.FormFile("contents") + if err != nil { + if err == http.ErrMissingFile { + return errInvalidParams + } + return err + } + // TODO: #perf disk I/O in tx + src, err := file.Open() + if err != nil { + return err + } + defer src.Close() + + out, err := f.OpenPartialUpload(id, params.Offset, authenticator.GetAuth(c).UserID()) + if err != nil { + return err + } + + if _, err := io.Copy(out, src); err != nil { + out.Close() + return err + } else { + return out.Close() + } + }() + if err != nil { + panic(err) + } + + c.Status(200) +} + +func handlePartialUploadsFinalizeRequest(c *gin.Context) { +} diff --git a/server/internal/core/errors.go b/server/internal/core/errors.go index 55307b59..cd508803 100644 --- a/server/internal/core/errors.go +++ b/server/internal/core/errors.go @@ -54,4 +54,7 @@ var ( ErrPublinkNameConflict = NewError(http.StatusPreconditionFailed, "publink_name_conflict", "Another public share with this name already exists") ErrVersionNotFound = NewError(http.StatusNotFound, "version_not_found", "Version Not Foud") ErrResourceVersionLatest = NewError(http.StatusPreconditionFailed, "resource_version_latest", "Cannot delete most recent version of a resource") + ErrContentLengthExceeded = NewError(http.StatusBadRequest, "content_length_exceeded", "Content Length Exceeded") + ErrContentChecksumMismatch = NewError(http.StatusBadRequest, "content_checksum_mismatch", "Content Checksum Mismatch") + ErrContentOffsetExceeded = NewError(http.StatusBadRequest, "content_offset_mismatch", "Content Offset Exceeded") ) diff --git a/server/internal/core/partial_uploads.go b/server/internal/core/partial_uploads.go new file mode 100644 index 00000000..129a4602 --- /dev/null +++ b/server/internal/core/partial_uploads.go @@ -0,0 +1,143 @@ +package core + +import ( + "crypto/sha256" + "encoding/hex" + "errors" + "hash" + "io" + "os" + "strings" + + "codeberg.org/shroff/phylum/server/internal/db" + "github.com/google/uuid" + "github.com/jackc/pgx/v5" +) + +func (f *FileSystem) CreatePartialUpload(owner int32, size int64, sha256 string) (uuid.UUID, error) { + id, _ := uuid.NewV7() + + path := "path" + return id, f.db.RunInTx(func(db db.TxHandler) error { + if err := insertPartialUpload(db, id, owner, size, sha256); err != nil { + return err + } + if file, err := os.Create(path); err != nil { + return err + } else { + return file.Close() + } + }) +} + +func (f *FileSystem) OpenPartialUpload(id uuid.UUID, offset int64, user int32) (io.WriteCloser, error) { + owner, size, uploaded, chunkSHA256, err := queryPartialUpload(f.db, id) + if err != nil { + return nil, err + } + if user != owner { + return nil, ErrInsufficientPermissions + } + + path := "path" + if stat, err := os.Stat(path); err != nil { + if errors.Is(err, os.ErrNotExist) { + uploaded = 0 + } else { + return nil, err + } + } else { + if stat.Size() < uploaded { + uploaded = stat.Size() + } + } + if offset > uploaded { + return nil, ErrContentOffsetExceeded + } + file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0o600) + if _, err := file.Seek(int64(offset), 0); err != nil { + return nil, err + } + c := &contentUploader{ + id: id, + db: f.db, + dest: file, + maxBytes: int(size - uploaded), + checksum: chunkSHA256, + hash: sha256.New(), + } + return c, nil +} + +func insertPartialUpload(db db.TxHandler, id uuid.UUID, owner int32, size int64, sha256 string) error { + const q = `INSERT INTO partial_uploads(id, owner, size, @uploaded, sha256) + VALUES (@id, @owner, @size, 0, @sha256)` + + args := pgx.NamedArgs{ + "id": id, + "owner": owner, + "size": size, + "sha256": sha256, + } + _, err := db.Exec(q, args) + if err != nil && strings.Contains(err.Error(), "partial_uploads_pkey") { + return ErrIDConflict + } + return err +} + +func queryPartialUpload(db db.Handler, id uuid.UUID) (owner int32, size, uploaded int64, sha256 string, err error) { + const q = `SELECT (owner, size, uploaded, sha256) FROM partial_uploads WHERE id = @id` + + args := pgx.NamedArgs{ + "id": id, + } + row := db.QueryRow(q, args) + err = row.Scan(owner, size, uploaded, sha256) + return +} + +func updatePartialUpload(db db.Handler, id uuid.UUID, written int) error { + const q = `UPDATE partial_uploads SET uploaded = uploaded + @written WHERE id = @id` + + args := pgx.NamedArgs{ + "id": id, + "written": written, + } + _, err := db.Exec(q, args) + return err +} + +type contentUploader struct { + id uuid.UUID + db db.Handler + dest io.WriteCloser + maxBytes int + n int + checksum string + hash hash.Hash +} + +func (c *contentUploader) Write(p []byte) (n int, err error) { + if len(p) > c.maxBytes { + return 0, ErrContentLengthExceeded + } + if c.checksum != "" { + if _, err := c.hash.Write(p); err != nil { + return 0, err + } + } + n, err = c.dest.Write(p) + c.n += n + return +} + +func (c *contentUploader) Close() error { + if err := c.dest.Close(); err != nil { + return err + } + if c.checksum != "" && c.checksum != hex.EncodeToString(c.hash.Sum(nil)) { + return ErrContentChecksumMismatch + } + return updatePartialUpload(c.db, c.id, c.n) +} diff --git a/server/internal/db/migrations/027_partial_uploads.sql b/server/internal/db/migrations/027_partial_uploads.sql new file mode 100644 index 00000000..ddf9c10e --- /dev/null +++ b/server/internal/db/migrations/027_partial_uploads.sql @@ -0,0 +1,13 @@ +CREATE TABLE partial_uploads( + id UUID PRIMARY KEY, + created TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + owner INT NOT NULL REFERENCES users(id) ON UPDATE CASCADE ON DELETE CASCADE + size BIGINT NOT NULL, + uploaded BIGINT NOT NULL, + sha256 TEXT NOT NULL, +); + + +---- create above / drop below ---- + +DROP TABLE partial_uploads;