mirror of
https://github.com/Zhoros/HexaSend.git
synced 2025-12-17 01:44:27 -06:00
205 lines
7.6 KiB
Python
205 lines
7.6 KiB
Python
from fastapi import FastAPI, UploadFile, File, Form, Request, Response
|
|
from starlette.background import BackgroundTask
|
|
from fastapi.responses import RedirectResponse, FileResponse, PlainTextResponse, StreamingResponse
|
|
from typing import List
|
|
import zipfile
|
|
import os, hashlib, json, io
|
|
from fastapi.templating import Jinja2Templates
|
|
from datetime import datetime
|
|
|
|
app = FastAPI()
|
|
template = Jinja2Templates(directory="./template")
|
|
|
|
bufferSize = 1024 * 64
|
|
mode = "upload"
|
|
uploadType = ""
|
|
textData = ""
|
|
fileName = ""
|
|
password = ""
|
|
passwordHash = ""
|
|
requireAccessPassword = False
|
|
enableLoginPage = True
|
|
|
|
#create folder to store uploaded files
|
|
os.makedirs("./upload", exist_ok=True)
|
|
|
|
with open('./config/config.json', 'r', encoding='utf-8') as file:
|
|
data = json.load(file)
|
|
password = data["password"]
|
|
enableLoginPage = True if data["password"] != "" else False
|
|
passwordHash = hashlib.sha256(("s0mesalt08" + data["password"]).encode()).hexdigest()
|
|
|
|
def sendFiles(path: str):
|
|
if path == "/":
|
|
return FileResponse("./public/index.html", media_type="text/html")
|
|
if path == "/login.html":
|
|
return FileResponse("./public/login.html", media_type="text/html")
|
|
if path == "/style.css":
|
|
return FileResponse("./public/style.css", media_type="text/css")
|
|
return PlainTextResponse("Not found", status_code=404)
|
|
|
|
async def streamFileDownload(buffer: io.BufferedReader, filePath: str):
|
|
|
|
try:
|
|
chunk_size = 1024 * 64
|
|
while True:
|
|
chunk = buffer.read(chunk_size)
|
|
if not chunk:
|
|
break
|
|
yield chunk
|
|
except GeneratorExit:
|
|
print("Client disconnected during download")
|
|
raise
|
|
finally:
|
|
buffer.close()
|
|
os.unlink(filePath)
|
|
|
|
#Handle text upload
|
|
@app.post("/login")
|
|
async def login(request: Request, response: Response):
|
|
|
|
#We are using cookie instead of session, this is by design as session cookies are
|
|
#cleared when the browser is closed and it will be annoying to login again each
|
|
#time you want to upload stuff
|
|
text = (await request.body()).decode("utf-8")
|
|
if text == password:
|
|
response = PlainTextResponse(content="correct")
|
|
response.set_cookie(
|
|
key="loggedIn",
|
|
value=passwordHash, #probably won't matter much but it's better not to store plaintext password in cookie
|
|
max_age=34560000,
|
|
httponly=True,
|
|
secure=False,
|
|
samesite="lax"
|
|
)
|
|
return response
|
|
else:
|
|
return PlainTextResponse(content="wrong")
|
|
|
|
#Handle file, text or URL retrieval
|
|
@app.middleware("http")
|
|
async def download(request: Request, call_next):
|
|
global mode, uploadType, textData, requireAccessPassword
|
|
|
|
if request.method == "GET":
|
|
#We cannot use StaticFile() here because resource request will be treated like text paste request
|
|
#and break the whole of upload -> download -> upload -> download flipping logic
|
|
#so we make our own public file handlers
|
|
|
|
#Handle authentication
|
|
path = request.url.path
|
|
if (not "loggedIn" in request.cookies and (mode == "upload" or requireAccessPassword == True)) and enableLoginPage:
|
|
if path == "/":
|
|
return RedirectResponse(url="/login.html")
|
|
else:
|
|
return sendFiles(path)
|
|
else:
|
|
|
|
if (mode == "upload" or requireAccessPassword == True) and enableLoginPage:
|
|
cookiePasswordHash = request.cookies.get("loggedIn")
|
|
if cookiePasswordHash != passwordHash:
|
|
response = RedirectResponse(url="/login.html")
|
|
response.delete_cookie("loggedIn")
|
|
return response
|
|
|
|
#This part contains actual logic for download & upload
|
|
if mode == "download":
|
|
if request.url.path == "/":
|
|
mode = "upload"
|
|
if uploadType == "text":
|
|
return template.TemplateResponse("copytext.html", {"request": request, "data": textData})
|
|
elif uploadType == "url":
|
|
return RedirectResponse(url=textData)
|
|
elif uploadType == "file":
|
|
filePath = "./upload/" + fileName
|
|
file_size = os.path.getsize(filePath)
|
|
return StreamingResponse(
|
|
content=streamFileDownload(open(filePath, mode="rb"), filePath),
|
|
media_type="application/octet-stream",
|
|
headers={
|
|
"Content-Disposition": f"attachment; filename={fileName}",
|
|
"Content-Length": str(file_size),
|
|
}
|
|
)
|
|
else:
|
|
return sendFiles(path)
|
|
|
|
return await call_next(request)
|
|
|
|
#Handle file uploads
|
|
@app.post("/")
|
|
async def upload(file: List[UploadFile] = File(...), enablePassword: str = Form(...)):
|
|
global mode, fileName, uploadType, requireAccessPassword
|
|
|
|
requireAccessPassword = True if enablePassword == "true" else False
|
|
|
|
#Not actually necessary but it's just to make sure that there is no residual file in case of sudden shutdown
|
|
#or the user upload twice with different browser tabs
|
|
for fileNameToRemove in os.listdir("./upload"):
|
|
file_path = os.path.join("./upload", fileNameToRemove)
|
|
if os.path.isfile(file_path):
|
|
os.remove(file_path)
|
|
|
|
#We have some repetitive variables here but that's by design
|
|
#we don't want to set wrong state in case of upload error
|
|
#or a problem in the zip stream/read stream
|
|
if len(file) > 1:
|
|
with zipfile.ZipFile('./upload/files.zip', 'w') as zipf:
|
|
for f in file:
|
|
if f.filename is not None:
|
|
with zipf.open(f.filename, 'w') as dest:
|
|
while True:
|
|
chunk = await f.read(bufferSize)
|
|
if not chunk:
|
|
break
|
|
dest.write(chunk)
|
|
|
|
mode = "download"
|
|
uploadType = "file"
|
|
fileName = "files.zip"
|
|
|
|
else:
|
|
if file[0].filename != None:
|
|
with open(os.path.join("./upload/", file[0].filename), "wb") as buffer:
|
|
while content := await file[0].read(bufferSize):
|
|
buffer.write(content)
|
|
mode = "download"
|
|
uploadType = "file"
|
|
fileName = file[0].filename
|
|
|
|
#Handle text upload
|
|
@app.post("/text")
|
|
async def text(request: Request):
|
|
global textData, uploadType, mode, requireAccessPassword
|
|
|
|
jsonBody = await request.json()
|
|
|
|
textData = jsonBody.get("text")
|
|
requireAccessPassword = jsonBody.get("enablePassword")
|
|
|
|
if textData.startswith("http:") or textData.startswith("https:"):
|
|
uploadType = "url"
|
|
else:
|
|
uploadType = "text"
|
|
mode = "download"
|
|
|
|
@app.post("/logout")
|
|
async def logout(response: Response):
|
|
response = PlainTextResponse(content="logout")
|
|
response.delete_cookie("loggedIn")
|
|
return response
|
|
|
|
#The site cannot function with cache
|
|
#the reason this middleware is at the bottom is because starlette
|
|
#middleware is a "wrapping" model where it wraps over another middleware defined before it
|
|
#this means that the latest middleware definition will always be called first
|
|
@app.middleware("http")
|
|
async def nocache(request, call_next):
|
|
response = await call_next(request)
|
|
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate"
|
|
response.headers["Pragma"] = "no-cache"
|
|
response.headers["Expires"] = "0"
|
|
del response.headers["ETag"]
|
|
del response.headers["Last-Modified"]
|
|
return response
|