[server] finalize partial uploads

This commit is contained in:
Abhishek Shroff
2026-01-24 08:52:05 +05:30
parent 72d105db83
commit ddbea7ef6b
3 changed files with 106 additions and 6 deletions

View File

@@ -1,10 +1,13 @@
package fs
import (
"errors"
"io"
"net/http"
"codeberg.org/shroff/phylum/server/internal/api/authenticator"
"codeberg.org/shroff/phylum/server/internal/api/v1/responses"
"codeberg.org/shroff/phylum/server/internal/core"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
@@ -24,6 +27,15 @@ type partialUploadsUploadParams struct {
SHA256 string `json:"sha256" form:"sha256"`
}
type partialUploadsFinalizeParams struct {
UploadID string `json:"upload_id" form:"upload_id" binding:"required,uuid"`
Path string `json:"path" form:"path" binding:"required"`
ResourceID string `json:"resource_id" form:"resource_id" binding:"omitempty,uuid"`
VersionID string `json:"version_id" form:"version_id" binding:"omitempty,uuid"`
CreateParents bool `json:"create_parents" form:"create_parents"`
Conflict core.ResourceBindConflictResolution `json:"conflict" form:"conflict"`
}
func handlePartialUploadsCreateRequest(c *gin.Context) {
var params partialUploadsCreateParams
err := c.ShouldBind(&params)
@@ -125,4 +137,63 @@ func handlePartialUploadsUploadRequest(c *gin.Context) {
}
func handlePartialUploadsFinalizeRequest(c *gin.Context) {
var params partialUploadsFinalizeParams
err := c.ShouldBind(&params)
if err != nil {
err = c.BindQuery(&params)
if err != nil {
panic(errInvalidParams)
}
}
var uploadID, resourceID, versionID uuid.UUID
if params.UploadID != "" {
uploadID, err = uuid.Parse(params.UploadID)
if err != nil {
panic(err)
}
}
if params.ResourceID != "" {
resourceID, err = uuid.Parse(params.ResourceID)
if err != nil {
panic(err)
}
}
if params.VersionID != "" {
versionID, err = uuid.Parse(params.VersionID)
if err != nil {
panic(err)
}
}
f := authenticator.GetFileSystem(c)
err = func() error {
// TODO: #perf disk I/O in tx
src, err := f.ReadPartialUpload(authenticator.GetAuth(c).UserID(), uploadID)
if err != nil {
return err
}
defer src.Close()
out, err := f.CreateFileByPath(params.Path, resourceID, versionID, params.Conflict)
if err != nil {
return err
}
if _, err := io.Copy(out, src); err != nil {
out.Close()
return err
} else {
return out.Close()
}
}()
if err != nil && !errors.Is(err, core.ErrIDConflict) {
panic(err)
}
// id may have changed if this is an overwrite
r, err := f.ResourceByPathWithRoot(params.Path)
if err != nil {
panic(err)
}
c.JSON(200, responses.ResourceFromFS(r))
}

View File

@@ -54,7 +54,8 @@ var (
ErrPublinkNameConflict = NewError(http.StatusPreconditionFailed, "publink_name_conflict", "Another public share with this name already exists")
ErrVersionNotFound = NewError(http.StatusNotFound, "version_not_found", "Version Not Foud")
ErrResourceVersionLatest = NewError(http.StatusPreconditionFailed, "resource_version_latest", "Cannot delete most recent version of a resource")
ErrContentLengthExceeded = NewError(http.StatusBadRequest, "content_length_exceeded", "Content Length Exceeded")
ErrContentChecksumMismatch = NewError(http.StatusBadRequest, "content_checksum_mismatch", "Content Checksum Mismatch")
ErrContentOffsetExceeded = NewError(http.StatusBadRequest, "content_offset_mismatch", "Content Offset Exceeded")
ErrUploadLengthExceeded = NewError(http.StatusBadRequest, "upload_length_exceeded", "Upload Length Exceeded")
ErrUploadChecksumMismatch = NewError(http.StatusBadRequest, "upload_checksum_mismatch", "Upload Checksum Mismatch")
ErrUploadOffsetExceeded = NewError(http.StatusBadRequest, "upload_offset_exceeded", "Upload Offset Exceeded")
ErrUploadIncomplete = NewError(http.StatusBadRequest, "upload_incomplete", "Upload Incomplete")
)

View File

@@ -54,7 +54,7 @@ func (f *FileSystem) OpenPartialUpload(user int32, id uuid.UUID, offset int64) (
}
}
if offset > uploaded {
return nil, ErrContentOffsetExceeded
return nil, ErrUploadOffsetExceeded
}
file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0o600)
if _, err := file.Seek(int64(offset), 0); err != nil {
@@ -99,6 +99,34 @@ func (f *FileSystem) QueryPartialUpload(user int32, id uuid.UUID) (size, uploade
return
}
func (f *FileSystem) ReadPartialUpload(user int32, id uuid.UUID) (io.ReadCloser, error) {
owner, size, uploaded, _, err := queryPartialUpload(f.db, id)
if err != nil {
return nil, err
}
if user != owner {
return nil, ErrResourceNotFound
}
path := uploadFilePath(user, id)
if stat, err := os.Stat(path); err != nil {
if errors.Is(err, os.ErrNotExist) {
uploaded = 0
} else {
return nil, err
}
} else {
if stat.Size() < uploaded {
uploaded = stat.Size()
}
}
if size != uploaded {
return nil, ErrUploadIncomplete
}
file, err := os.Open(path)
return file, nil
}
func insertPartialUpload(db db.TxHandler, id uuid.UUID, owner int32, size int64, checksum string) error {
const q = `INSERT INTO partial_uploads(id, owner, size, uploaded, checksum)
VALUES (@id, @owner, @size, 0, @checksum)`
@@ -154,7 +182,7 @@ type contentUploader struct {
func (c *contentUploader) Write(p []byte) (n int, err error) {
if len(p) > c.maxBytes {
return 0, ErrContentLengthExceeded
return 0, ErrUploadLengthExceeded
}
if c.checksum != "" {
if _, err := c.hash.Write(p); err != nil {
@@ -171,7 +199,7 @@ func (c *contentUploader) Close() error {
return err
}
if c.checksum != "" && c.checksum != hex.EncodeToString(c.hash.Sum(nil)) {
return ErrContentChecksumMismatch
return ErrUploadChecksumMismatch
}
return updatePartialUpload(c.db, c.id, c.n)
}