Files
HexaSend/main.py
2025-05-19 11:42:50 +07:00

205 lines
7.6 KiB
Python

from fastapi import FastAPI, UploadFile, File, Form, Request, Response
from starlette.background import BackgroundTask
from fastapi.responses import RedirectResponse, FileResponse, PlainTextResponse, StreamingResponse
from typing import List
import zipfile
import os, hashlib, json, io
from fastapi.templating import Jinja2Templates
from datetime import datetime
app = FastAPI()
template = Jinja2Templates(directory="./template")
bufferSize = 1024 * 64
mode = "upload"
uploadType = ""
textData = ""
fileName = ""
password = ""
passwordHash = ""
requireAccessPassword = False
enableLoginPage = True
#create folder to store uploaded files
os.makedirs("./upload", exist_ok=True)
with open('./config/config.json', 'r', encoding='utf-8') as file:
data = json.load(file)
password = data["password"]
enableLoginPage = True if data["password"] != "" else False
passwordHash = hashlib.sha256(("s0mesalt08" + data["password"]).encode()).hexdigest()
def sendFiles(path: str):
if path == "/":
return FileResponse("./public/index.html", media_type="text/html")
if path == "/login.html":
return FileResponse("./public/login.html", media_type="text/html")
if path == "/style.css":
return FileResponse("./public/style.css", media_type="text/css")
return PlainTextResponse("Not found", status_code=404)
async def streamFileDownload(buffer: io.BufferedReader, filePath: str):
try:
chunk_size = 1024 * 64
while True:
chunk = buffer.read(chunk_size)
if not chunk:
break
yield chunk
except GeneratorExit:
print("Client disconnected during download")
raise
finally:
buffer.close()
os.unlink(filePath)
#Handle text upload
@app.post("/login")
async def login(request: Request, response: Response):
#We are using cookie instead of session, this is by design as session cookies are
#cleared when the browser is closed and it will be annoying to login again each
#time you want to upload stuff
text = (await request.body()).decode("utf-8")
if text == password:
response = PlainTextResponse(content="correct")
response.set_cookie(
key="loggedIn",
value=passwordHash, #probably won't matter much but it's better not to store plaintext password in cookie
max_age=34560000,
httponly=True,
secure=False,
samesite="lax"
)
return response
else:
return PlainTextResponse(content="wrong")
#Handle file, text or URL retrieval
@app.middleware("http")
async def download(request: Request, call_next):
global mode, uploadType, textData, requireAccessPassword
if request.method == "GET":
#We cannot use StaticFile() here because resource request will be treated like text paste request
#and break the whole of upload -> download -> upload -> download flipping logic
#so we make our own public file handlers
#Handle authentication
path = request.url.path
if (not "loggedIn" in request.cookies and (mode == "upload" or requireAccessPassword == True)) and enableLoginPage:
if path == "/":
return RedirectResponse(url="/login.html")
else:
return sendFiles(path)
else:
if (mode == "upload" or requireAccessPassword == True) and enableLoginPage:
cookiePasswordHash = request.cookies.get("loggedIn")
if cookiePasswordHash != passwordHash:
response = RedirectResponse(url="/login.html")
response.delete_cookie("loggedIn")
return response
#This part contains actual logic for download & upload
if mode == "download":
if request.url.path == "/":
mode = "upload"
if uploadType == "text":
return template.TemplateResponse("copytext.html", {"request": request, "data": textData})
elif uploadType == "url":
return RedirectResponse(url=textData)
elif uploadType == "file":
filePath = "./upload/" + fileName
file_size = os.path.getsize(filePath)
return StreamingResponse(
content=streamFileDownload(open(filePath, mode="rb"), filePath),
media_type="application/octet-stream",
headers={
"Content-Disposition": f"attachment; filename={fileName}",
"Content-Length": str(file_size),
}
)
else:
return sendFiles(path)
return await call_next(request)
#Handle file uploads
@app.post("/")
async def upload(file: List[UploadFile] = File(...), enablePassword: str = Form(...)):
global mode, fileName, uploadType, requireAccessPassword
requireAccessPassword = True if enablePassword == "true" else False
#Not actually necessary but it's just to make sure that there is no residual file in case of sudden shutdown
#or the user upload twice with different browser tabs
for fileNameToRemove in os.listdir("./upload"):
file_path = os.path.join("./upload", fileNameToRemove)
if os.path.isfile(file_path):
os.remove(file_path)
#We have some repetitive variables here but that's by design
#we don't want to set wrong state in case of upload error
#or a problem in the zip stream/read stream
if len(file) > 1:
with zipfile.ZipFile('./upload/files.zip', 'w') as zipf:
for f in file:
if f.filename is not None:
with zipf.open(f.filename, 'w') as dest:
while True:
chunk = await f.read(bufferSize)
if not chunk:
break
dest.write(chunk)
mode = "download"
uploadType = "file"
fileName = "files.zip"
else:
if file[0].filename != None:
with open(os.path.join("./upload/", file[0].filename), "wb") as buffer:
while content := await file[0].read(bufferSize):
buffer.write(content)
mode = "download"
uploadType = "file"
fileName = file[0].filename
#Handle text upload
@app.post("/text")
async def text(request: Request):
global textData, uploadType, mode, requireAccessPassword
jsonBody = await request.json()
textData = jsonBody.get("text")
requireAccessPassword = jsonBody.get("enablePassword")
if textData.startswith("http:") or textData.startswith("https:"):
uploadType = "url"
else:
uploadType = "text"
mode = "download"
@app.post("/logout")
async def logout(response: Response):
response = PlainTextResponse(content="logout")
response.delete_cookie("loggedIn")
return response
#The site cannot function with cache
#the reason this middleware is at the bottom is because starlette
#middleware is a "wrapping" model where it wraps over another middleware defined before it
#this means that the latest middleware definition will always be called first
@app.middleware("http")
async def nocache(request, call_next):
response = await call_next(request)
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate"
response.headers["Pragma"] = "no-cache"
response.headers["Expires"] = "0"
del response.headers["ETag"]
del response.headers["Last-Modified"]
return response