mirror of
https://codeberg.org/shroff/phylum.git
synced 2026-02-14 15:38:42 -06:00
[server] WIP: partial uploads
This commit is contained in:
@@ -1,93 +0,0 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"codeberg.org/shroff/phylum/server/internal/api/authenticator"
|
||||
"codeberg.org/shroff/phylum/server/internal/api/v1/responses"
|
||||
"codeberg.org/shroff/phylum/server/internal/core"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type createFileParams struct {
|
||||
Path string `json:"path" form:"path" binding:"required"`
|
||||
ID string `json:"id" form:"id" binding:"omitempty,uuid"`
|
||||
VersionID string `json:"version_id" form:"version_id" binding:"omitempty,uuid"`
|
||||
CreateParents bool `json:"create_parents" form:"create_parents"`
|
||||
Conflict core.ResourceBindConflictResolution `json:"conflict" form:"conflict"`
|
||||
SHA256 string `json:"sha256" form:"sha256"`
|
||||
}
|
||||
|
||||
func handleCreateFileRequest(c *gin.Context) {
|
||||
var params createFileParams
|
||||
err := c.ShouldBindQuery(¶ms)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
var id uuid.UUID
|
||||
var versionID uuid.UUID
|
||||
if params.ID != "" {
|
||||
id, err = uuid.Parse(params.ID)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
if params.VersionID != "" {
|
||||
versionID, err = uuid.Parse(params.VersionID)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Calculate and verify sha sum
|
||||
|
||||
f := authenticator.GetFileSystem(c)
|
||||
|
||||
file, err := c.FormFile("contents")
|
||||
if err != nil {
|
||||
if !errors.Is(err, http.ErrMissingFile) {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
r, err := f.CreateResourceByPath(params.Path, id, false, false, params.Conflict)
|
||||
if err != nil && !errors.Is(err, core.ErrIDConflict) {
|
||||
panic(err)
|
||||
}
|
||||
c.JSON(200, responses.ResourceFromFS(r))
|
||||
return
|
||||
}
|
||||
|
||||
err = func() error {
|
||||
// TODO: #perf disk I/O in tx
|
||||
src, err := file.Open()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer src.Close()
|
||||
|
||||
out, err := f.CreateFileByPath(params.Path, id, versionID, params.Conflict)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := io.Copy(out, src); err != nil {
|
||||
out.Close()
|
||||
return err
|
||||
} else {
|
||||
return out.Close()
|
||||
}
|
||||
}()
|
||||
if err != nil && !errors.Is(err, core.ErrIDConflict) {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// id may have changed if this is an overwrite
|
||||
r, err := f.ResourceByPathWithRoot(params.Path)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
c.JSON(200, responses.ResourceFromFS(r))
|
||||
}
|
||||
@@ -27,8 +27,12 @@ func SetupRoutes(r *gin.RouterGroup) {
|
||||
group.POST("/delete_version", handleDeleteVersionRequest)
|
||||
|
||||
group.POST("/mkdir", handleMkdirRequest)
|
||||
group.POST("/create_file", handleCreateFileRequest)
|
||||
group.PUT("/upload", handleUploadRequest)
|
||||
|
||||
partialUploadsGroup := group.Group("/partial_uploads")
|
||||
partialUploadsGroup.POST("/start", handlePartialUploadsCreateRequest)
|
||||
partialUploadsGroup.PUT("/upload", handlePartialUploadsCreateRequest)
|
||||
partialUploadsGroup.POST("/finalize", handlePartialUploadsCreateRequest)
|
||||
|
||||
group.GET("/search", handleSearchRequest)
|
||||
}
|
||||
|
||||
90
server/internal/api/v1/fs/upload_partial.go
Normal file
90
server/internal/api/v1/fs/upload_partial.go
Normal file
@@ -0,0 +1,90 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"codeberg.org/shroff/phylum/server/internal/api/authenticator"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type partialUploadsCreateParams struct {
|
||||
Size int64 `json:"size"`
|
||||
SHA256 string `json:"sha256" form:"sha256"`
|
||||
}
|
||||
|
||||
type partialUploadsUploadParams struct {
|
||||
ID string `json:"id" form:"id" binding:"required,uuid"`
|
||||
Offset int64 `json:"offset" form:"offset" binding:"required"`
|
||||
ChunkSHA256 int `json:"chunk_sha256" form:"chunk_sha256"`
|
||||
}
|
||||
|
||||
func handlePartialUploadsCreateRequest(c *gin.Context) {
|
||||
var params partialUploadsCreateParams
|
||||
err := c.ShouldBindQuery(¶ms)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
userID := authenticator.GetAuth(c).UserID()
|
||||
f := authenticator.GetFileSystem(c)
|
||||
uploadID, err := f.CreatePartialUpload(userID, params.Size, params.SHA256)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
c.JSON(200, gin.H{
|
||||
"upload_id": uploadID,
|
||||
})
|
||||
}
|
||||
|
||||
func handlePartialUploadsUploadRequest(c *gin.Context) {
|
||||
var params partialUploadsUploadParams
|
||||
err := c.ShouldBindQuery(¶ms)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
id, err := uuid.Parse(params.ID)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
f := authenticator.GetFileSystem(c)
|
||||
err = func() error {
|
||||
file, err := c.FormFile("contents")
|
||||
if err != nil {
|
||||
if err == http.ErrMissingFile {
|
||||
return errInvalidParams
|
||||
}
|
||||
return err
|
||||
}
|
||||
// TODO: #perf disk I/O in tx
|
||||
src, err := file.Open()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer src.Close()
|
||||
|
||||
out, err := f.OpenPartialUpload(id, params.Offset, authenticator.GetAuth(c).UserID())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := io.Copy(out, src); err != nil {
|
||||
out.Close()
|
||||
return err
|
||||
} else {
|
||||
return out.Close()
|
||||
}
|
||||
}()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
c.Status(200)
|
||||
}
|
||||
|
||||
func handlePartialUploadsFinalizeRequest(c *gin.Context) {
|
||||
}
|
||||
@@ -54,4 +54,7 @@ var (
|
||||
ErrPublinkNameConflict = NewError(http.StatusPreconditionFailed, "publink_name_conflict", "Another public share with this name already exists")
|
||||
ErrVersionNotFound = NewError(http.StatusNotFound, "version_not_found", "Version Not Foud")
|
||||
ErrResourceVersionLatest = NewError(http.StatusPreconditionFailed, "resource_version_latest", "Cannot delete most recent version of a resource")
|
||||
ErrContentLengthExceeded = NewError(http.StatusBadRequest, "content_length_exceeded", "Content Length Exceeded")
|
||||
ErrContentChecksumMismatch = NewError(http.StatusBadRequest, "content_checksum_mismatch", "Content Checksum Mismatch")
|
||||
ErrContentOffsetExceeded = NewError(http.StatusBadRequest, "content_offset_mismatch", "Content Offset Exceeded")
|
||||
)
|
||||
|
||||
143
server/internal/core/partial_uploads.go
Normal file
143
server/internal/core/partial_uploads.go
Normal file
@@ -0,0 +1,143 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"hash"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"codeberg.org/shroff/phylum/server/internal/db"
|
||||
"github.com/google/uuid"
|
||||
"github.com/jackc/pgx/v5"
|
||||
)
|
||||
|
||||
func (f *FileSystem) CreatePartialUpload(owner int32, size int64, sha256 string) (uuid.UUID, error) {
|
||||
id, _ := uuid.NewV7()
|
||||
|
||||
path := "path"
|
||||
return id, f.db.RunInTx(func(db db.TxHandler) error {
|
||||
if err := insertPartialUpload(db, id, owner, size, sha256); err != nil {
|
||||
return err
|
||||
}
|
||||
if file, err := os.Create(path); err != nil {
|
||||
return err
|
||||
} else {
|
||||
return file.Close()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (f *FileSystem) OpenPartialUpload(id uuid.UUID, offset int64, user int32) (io.WriteCloser, error) {
|
||||
owner, size, uploaded, chunkSHA256, err := queryPartialUpload(f.db, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if user != owner {
|
||||
return nil, ErrInsufficientPermissions
|
||||
}
|
||||
|
||||
path := "path"
|
||||
if stat, err := os.Stat(path); err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
uploaded = 0
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
if stat.Size() < uploaded {
|
||||
uploaded = stat.Size()
|
||||
}
|
||||
}
|
||||
if offset > uploaded {
|
||||
return nil, ErrContentOffsetExceeded
|
||||
}
|
||||
file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0o600)
|
||||
if _, err := file.Seek(int64(offset), 0); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c := &contentUploader{
|
||||
id: id,
|
||||
db: f.db,
|
||||
dest: file,
|
||||
maxBytes: int(size - uploaded),
|
||||
checksum: chunkSHA256,
|
||||
hash: sha256.New(),
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func insertPartialUpload(db db.TxHandler, id uuid.UUID, owner int32, size int64, sha256 string) error {
|
||||
const q = `INSERT INTO partial_uploads(id, owner, size, @uploaded, sha256)
|
||||
VALUES (@id, @owner, @size, 0, @sha256)`
|
||||
|
||||
args := pgx.NamedArgs{
|
||||
"id": id,
|
||||
"owner": owner,
|
||||
"size": size,
|
||||
"sha256": sha256,
|
||||
}
|
||||
_, err := db.Exec(q, args)
|
||||
if err != nil && strings.Contains(err.Error(), "partial_uploads_pkey") {
|
||||
return ErrIDConflict
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func queryPartialUpload(db db.Handler, id uuid.UUID) (owner int32, size, uploaded int64, sha256 string, err error) {
|
||||
const q = `SELECT (owner, size, uploaded, sha256) FROM partial_uploads WHERE id = @id`
|
||||
|
||||
args := pgx.NamedArgs{
|
||||
"id": id,
|
||||
}
|
||||
row := db.QueryRow(q, args)
|
||||
err = row.Scan(owner, size, uploaded, sha256)
|
||||
return
|
||||
}
|
||||
|
||||
func updatePartialUpload(db db.Handler, id uuid.UUID, written int) error {
|
||||
const q = `UPDATE partial_uploads SET uploaded = uploaded + @written WHERE id = @id`
|
||||
|
||||
args := pgx.NamedArgs{
|
||||
"id": id,
|
||||
"written": written,
|
||||
}
|
||||
_, err := db.Exec(q, args)
|
||||
return err
|
||||
}
|
||||
|
||||
type contentUploader struct {
|
||||
id uuid.UUID
|
||||
db db.Handler
|
||||
dest io.WriteCloser
|
||||
maxBytes int
|
||||
n int
|
||||
checksum string
|
||||
hash hash.Hash
|
||||
}
|
||||
|
||||
func (c *contentUploader) Write(p []byte) (n int, err error) {
|
||||
if len(p) > c.maxBytes {
|
||||
return 0, ErrContentLengthExceeded
|
||||
}
|
||||
if c.checksum != "" {
|
||||
if _, err := c.hash.Write(p); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
n, err = c.dest.Write(p)
|
||||
c.n += n
|
||||
return
|
||||
}
|
||||
|
||||
func (c *contentUploader) Close() error {
|
||||
if err := c.dest.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
if c.checksum != "" && c.checksum != hex.EncodeToString(c.hash.Sum(nil)) {
|
||||
return ErrContentChecksumMismatch
|
||||
}
|
||||
return updatePartialUpload(c.db, c.id, c.n)
|
||||
}
|
||||
13
server/internal/db/migrations/027_partial_uploads.sql
Normal file
13
server/internal/db/migrations/027_partial_uploads.sql
Normal file
@@ -0,0 +1,13 @@
|
||||
CREATE TABLE partial_uploads(
|
||||
id UUID PRIMARY KEY,
|
||||
created TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
owner INT NOT NULL REFERENCES users(id) ON UPDATE CASCADE ON DELETE CASCADE
|
||||
size BIGINT NOT NULL,
|
||||
uploaded BIGINT NOT NULL,
|
||||
sha256 TEXT NOT NULL,
|
||||
);
|
||||
|
||||
|
||||
---- create above / drop below ----
|
||||
|
||||
DROP TABLE partial_uploads;
|
||||
Reference in New Issue
Block a user