Improves and fixes (#19)

* Add support log subsystem

* Replace logger calls to LOGi (LL_INFO)

* Add new global object: server_t g_srv

* Make current_header as static char array

* Make global struct g_cv which contain static Py strings

* Add new const values to global struct g_cv

* Add loglevel option to fastwsgi.py

* Fix bug then response body contain 0x00 symbol

* Add new header file: common.h

* Replace response buffer type to xbuf_t (new expanded buffer)

* Cleanup function build_response (remove waste alloc mem)

* Add extended logging into server.c

* Update submodule llhttp to 8.1.0 release

* Add functions build_response_ex and send_fatal

* Fix send error message then use Keep-Alive connection

* Fix call uv__socket_sockopt (first arg incorrect)
This commit is contained in:
Oleg S
2022-10-17 09:14:46 +03:00
committed by GitHub
parent 6f8791a424
commit 882fe7972f
14 changed files with 624 additions and 249 deletions

View File

@@ -1,3 +1,3 @@
server: fastwsgi/server.c
mkdir -p bin
gcc -Illhttp/include -Ilibuv/include -Ilibuv/src llhttp/src/*.c fastwsgi/request.c fastwsgi/server.c fastwsgi/constants.c -o bin/server -luv -I/usr/include/python3.8 -lpython3.8 -O3
gcc -Illhttp/include -Ilibuv/include -Ilibuv/src llhttp/src/*.c fastwsgi/request.c fastwsgi/server.c fastwsgi/constants.c fastwsgi/logx.c -o bin/server -luv -I/usr/include/python3.8 -lpython3.8 -O3

View File

@@ -10,7 +10,7 @@ NUM_WORKERS = 4
HOST = "0.0.0.0"
PORT = 5000
BACKLOG = 1024
LOGGING = 0
LOGLEVEL = 0
def run_multi_process_server(app):
@@ -60,7 +60,7 @@ def print_server_details(host, port):
@click.version_option(version=get_distribution("fastwsgi").version, message="%(version)s")
@click.option("--host", help="Host the socket is bound to.", type=str, default=HOST, show_default=True)
@click.option("-p", "--port", help="Port the socket is bound to.", type=int, default=PORT, show_default=True)
@click.option("-l", "--logging", help="Enable logging.", default=bool(LOGGING), is_flag=True, type=bool, show_default=True)
@click.option("-l", "--logging", help="Logging level.", type=int, default=LOGLEVEL, show_default=True)
@click.argument(
"wsgi_app_import_string",
type=str,
@@ -78,12 +78,12 @@ def run_from_cli(host, port, wsgi_app_import_string, logging):
print_server_details(host, port)
print(f"Server listening at http://{host}:{port}")
_fastwsgi.run_server(wsgi_app, host, port, BACKLOG, int(logging))
_fastwsgi.run_server(wsgi_app, host, port, BACKLOG, logging)
def run(wsgi_app, host=HOST, port=PORT, backlog=1024):
def run(wsgi_app, host=HOST, port=PORT, backlog=1024, loglevel=LOGLEVEL):
print_server_details(host, port)
print(f"Server listening at http://{host}:{port}")
print(f"Running on PID:", os.getpid())
_fastwsgi.run_server(wsgi_app, host, port, backlog, LOGGING)
_fastwsgi.run_server(wsgi_app, host, port, backlog, loglevel)
# run_multi_process_server(wsgi_app)

30
fastwsgi/common.h Normal file
View File

@@ -0,0 +1,30 @@
#ifndef FASTWSGI_COMMON_H_
#define FASTWSGI_COMMON_H_
#include <Python.h>
#include "uv.h"
#include "uv-common.h"
#if defined(_MSC_VER)
#define vsnprintf _vsnprintf
#define strncasecmp _strnicmp
#define strcasecmp _stricmp
#endif
#if defined(_MSC_VER)
# define INLINE __inline
#elif defined(__GNUC__) || defined(__MVS__)
# define INLINE __inline__
#else
# define INLINE inline
#endif
// forced use this only for alpha version!
#ifndef FASTWSGI_DEBUG
#define FASTWSGI_DEBUG
#endif
#include "logx.h"
#endif

View File

@@ -1,29 +1,40 @@
#include "constants.h"
cvar_t g_cv;
void init_constants() {
REQUEST_METHOD = PyUnicode_FromString("REQUEST_METHOD");
SCRIPT_NAME = PyUnicode_FromString("SCRIPT_NAME");
SERVER_NAME = PyUnicode_FromString("SERVER_NAME");
SERVER_PORT = PyUnicode_FromString("SERVER_PORT");
SERVER_PROTOCOL = PyUnicode_FromString("SERVER_PROTOCOL");
QUERY_STRING = PyUnicode_FromString("QUERY_STRING");
PATH_INFO = Py_BuildValue("s", "PATH_INFO");
HTTP_ = PyUnicode_FromString("HTTP_");
g_cv.REQUEST_METHOD = PyUnicode_FromString("REQUEST_METHOD");
g_cv.SCRIPT_NAME = PyUnicode_FromString("SCRIPT_NAME");
g_cv.SERVER_NAME = PyUnicode_FromString("SERVER_NAME");
g_cv.SERVER_PORT = PyUnicode_FromString("SERVER_PORT");
g_cv.SERVER_PROTOCOL = PyUnicode_FromString("SERVER_PROTOCOL");
g_cv.QUERY_STRING = PyUnicode_FromString("QUERY_STRING");
g_cv.PATH_INFO = Py_BuildValue("s", "PATH_INFO");
g_cv.HTTP_ = PyUnicode_FromString("HTTP_");
wsgi_version = PyUnicode_FromString("wsgi.version");
wsgi_url_scheme = PyUnicode_FromString("wsgi.url_scheme");
wsgi_errors = PyUnicode_FromString("wsgi.errors");
wsgi_run_once = PyUnicode_FromString("wsgi.run_once");
wsgi_multithread = PyUnicode_FromString("wsgi.multithread");
wsgi_multiprocess = PyUnicode_FromString("wsgi.multiprocess");
wsgi_input = PyUnicode_FromString("wsgi.input");
version = PyTuple_Pack(2, PyLong_FromLong(1), PyLong_FromLong(0));
g_cv.wsgi_version = PyUnicode_FromString("wsgi.version");
g_cv.wsgi_url_scheme = PyUnicode_FromString("wsgi.url_scheme");
g_cv.wsgi_errors = PyUnicode_FromString("wsgi.errors");
g_cv.wsgi_run_once = PyUnicode_FromString("wsgi.run_once");
g_cv.wsgi_multithread = PyUnicode_FromString("wsgi.multithread");
g_cv.wsgi_multiprocess = PyUnicode_FromString("wsgi.multiprocess");
g_cv.wsgi_input = PyUnicode_FromString("wsgi.input");
g_cv.version = PyTuple_Pack(2, PyLong_FromLong(1), PyLong_FromLong(0));
http_scheme = PyUnicode_FromString("http");
HTTP_1_1 = PyUnicode_FromString("HTTP/1.1");
HTTP_1_0 = PyUnicode_FromString("HTTP/1.0");
g_cv.http_scheme = PyUnicode_FromString("http");
g_cv.HTTP_1_1 = PyUnicode_FromString("HTTP/1.1");
g_cv.HTTP_1_0 = PyUnicode_FromString("HTTP/1.0");
server_host = PyUnicode_FromString("0.0.0.0");
server_port = PyUnicode_FromString("5000");
empty_string = PyUnicode_FromString("");
}
g_cv.server_host = PyUnicode_FromString("0.0.0.0");
g_cv.server_port = PyUnicode_FromString("5000");
g_cv.empty_string = PyUnicode_FromString("");
g_cv.module_io = PyImport_ImportModule("io");
g_cv.BytesIO = PyUnicode_FromString("BytesIO");
g_cv.write = PyUnicode_FromString("write");
g_cv.read = PyUnicode_FromString("read");
g_cv.truncate = PyUnicode_FromString("truncate");
g_cv.seek = PyUnicode_FromString("seek");
g_cv.getvalue = PyUnicode_FromString("getvalue");
g_cv.comma = PyUnicode_FromString(",");
}

View File

@@ -3,11 +3,44 @@
#include <Python.h>
PyObject* REQUEST_METHOD, * SCRIPT_NAME, * SERVER_NAME, * SERVER_PORT, * SERVER_PROTOCOL, * QUERY_STRING;
PyObject* wsgi_version, * wsgi_url_scheme, * wsgi_errors, * wsgi_run_once, * wsgi_multithread, * wsgi_multiprocess, * version;
PyObject* http_scheme, * HTTP_1_1, * HTTP_1_0;
PyObject* server_host, * server_port, * empty_string;
PyObject* HTTP_, * PATH_INFO, * wsgi_input;
typedef struct {
PyObject* REQUEST_METHOD;
PyObject* SCRIPT_NAME;
PyObject* SERVER_NAME;
PyObject* SERVER_PORT;
PyObject* SERVER_PROTOCOL;
PyObject* QUERY_STRING;
PyObject* PATH_INFO;
PyObject* HTTP_;
PyObject* wsgi_version;
PyObject* wsgi_url_scheme;
PyObject* wsgi_errors;
PyObject* wsgi_run_once;
PyObject* wsgi_multithread;
PyObject* wsgi_multiprocess;
PyObject* wsgi_input;
PyObject* version;
PyObject* http_scheme;
PyObject* HTTP_1_1;
PyObject* HTTP_1_0;
PyObject* server_host;
PyObject* server_port;
PyObject* empty_string;
PyObject* module_io;
PyObject* BytesIO;
PyObject* write;
PyObject* read;
PyObject* truncate;
PyObject* seek;
PyObject* getvalue;
PyObject* comma;
} cvar_t;
extern cvar_t g_cv;
void init_constants();

74
fastwsgi/logx.c Normal file
View File

@@ -0,0 +1,74 @@
#include "logx.h"
#ifdef _WIN32
#include <vadefs.h>
#endif
int g_log_level = 0;
log_type_t g_log_type = FW_LOG_TO_STDOUT;
static const char g_log_level_str[] = LL_STRING "--------------------------";
void set_log_type(int type)
{
g_log_type = (log_type_t)type;
}
void set_log_level(int level)
{
if (level < 0) {
g_log_level = 0;
return;
}
if (level >= 1000) {
int type = level / 1000;
if (type < 9)
set_log_type(type);
level = level % 1000;
}
if (level > LL_TRACE) {
g_log_level = LL_TRACE;
return;
}
g_log_level = level;
}
static const char log_prefix[] = "[FWSGI-X]";
void logmsg(int level, const char * fmt, ...)
{
char buf[4096];
if (level <= g_log_level) {
va_list argptr;
va_start(argptr, fmt);
const size_t prefix_len = sizeof(log_prefix); // include space
memcpy(buf, log_prefix, prefix_len); // add prefix
buf[prefix_len - 3] = g_log_level_str[level]; // replace X to error type
buf[prefix_len - 1] = 0x20; // add space delimiter
int len = vsnprintf(buf + prefix_len, sizeof(buf) - prefix_len - 8, fmt, argptr);
va_end(argptr);
if (len < 0) {
strcat(buf, "<INCORRECT-INPUT-DATA> ");
strcat(buf, fmt);
len = strlen(buf);
} else {
len += prefix_len;
buf[len] = 0;
}
if (buf[len - 1] != '\n') {
buf[len++] = '\n';
buf[len] = 0;
}
if (g_log_type == FW_LOG_TO_SYSLOG) {
#ifdef _WIN32
OutputDebugStringA(buf);
#else
//FIXME: openlog, syslog
#endif
} else {
if (level <= LL_ERROR)
fputs(buf, stderr);
else
fputs(buf, stdout);
}
}
}

108
fastwsgi/logx.h Normal file
View File

@@ -0,0 +1,108 @@
#ifndef FASTWSGI_LOGX_H_
#define FASTWSGI_LOGX_H_
#include "common.h"
// log levels the same as syslog
#define LL_FATAL_ERROR 1
#define LL_CRIT_ERROR 2
#define LL_ERROR 3
#define LL_WARNING 4
#define LL_NOTICE 5
#define LL_INFO 6
#define LL_DEBUG 7
#define LL_TRACE 8
#ifndef MAX_LOG_LEVEL
#ifdef FASTWSGI_DEBUG
#define MAX_LOG_LEVEL LL_TRACE
#else
#define MAX_LOG_LEVEL LL_INFO
#endif
#endif
#define LL_STRING "#FCEwnidt"
typedef enum {
FW_LOG_TO_STDOUT = 0,
FW_LOG_TO_SYSLOG = 1
} log_type_t;
extern int g_log_level;
void set_log_level(int level);
void logmsg(int level, const char * fmt, ...);
#define LOGMSG(_level_, ...) if (_level_ <= g_log_level) logmsg(_level_, __VA_ARGS__)
#define logger(_msg_) LOGMSG(LL_INFO, _msg_)
#define LOGX_IF(_level_, _cond_, ...) if (_cond_) LOGMSG(_level_, __VA_ARGS__)
#define LOGX(_level_, ...) LOGMSG(_level_, __VA_ARGS__)
#if defined(MAX_LOG_LEVEL) && MAX_LOG_LEVEL >= LL_TRACE
#define LOGt(...) LOGX(LL_TRACE, __VA_ARGS__)
#define LOGt_IF(_cond_, ...) LOGX_IF(LL_TRACE, (_cond_), __VA_ARGS__)
#else
#define LOGt(...) do{}while(0)
#define LOGt_IF(_cond_, ...) do{}while(0)
#endif
#if defined(MAX_LOG_LEVEL) && MAX_LOG_LEVEL >= LL_DEBUG
#define LOGd(...) LOGX(LL_DEBUG, __VA_ARGS__)
#define LOGd_IF(_cond_, ...) LOGX_IF(LL_DEBUG, (_cond_), __VA_ARGS__)
#else
#define LOGd(...) do{}while(0)
#define LOGd_IF(_cond_, ...) do{}while(0)
#endif
#if defined(MAX_LOG_LEVEL) && MAX_LOG_LEVEL >= LL_INFO
#define LOGi(...) LOGX(LL_INFO, __VA_ARGS__)
#define LOGi_IF(_cond_, ...) LOGX_IF(LL_INFO, (_cond_), __VA_ARGS__)
#else
#define LOGi(...) do{}while(0)
#define LOGi_IF(_cond_, ...) do{}while(0)
#endif
#if defined(MAX_LOG_LEVEL) && MAX_LOG_LEVEL >= LL_NOTICE
#define LOGn(...) LOGX(LL_NOTICE, __VA_ARGS__)
#define LOGn_IF(_cond_, ...) LOGX_IF(LL_NOTICE, (_cond_), __VA_ARGS__)
#else
#define LOGn(...) do{}while(0)
#define LOGn_IF(_cond_, ...) do{}while(0)
#endif
#if defined(MAX_LOG_LEVEL) && MAX_LOG_LEVEL >= LL_WARNING
#define LOGw(...) LOGX(LL_WARNING, __VA_ARGS__)
#define LOGw_IF(_cond_, ...) LOGX_IF(LL_WARNING, (_cond_), __VA_ARGS__)
#else
#define LOGw(...) do{}while(0)
#define LOGw_IF(_cond_, ...) do{}while(0)
#endif
#if defined(MAX_LOG_LEVEL) && MAX_LOG_LEVEL >= LL_ERROR
#define LOGe(...) LOGX(LL_ERROR, __VA_ARGS__)
#define LOGe_IF(_cond_, ...) LOGX_IF(LL_ERROR, (_cond_), __VA_ARGS__)
#else
#define LOGe(...) do{}while(0)
#define LOGe_IF(_cond_, ...) do{}while(0)
#endif
#if defined(MAX_LOG_LEVEL) && MAX_LOG_LEVEL >= LL_CRIT_ERROR
#define LOGc(...) LOGX(LL_CRIT_ERROR, __VA_ARGS__)
#define LOGc_IF(_cond_, ...) LOGX_IF(LL_CRIT_ERROR, (_cond_), __VA_ARGS__)
#else
#define LOGc(...) do{}while(0)
#define LOGc_IF(_cond_, ...) do{}while(0)
#endif
#if defined(MAX_LOG_LEVEL) && MAX_LOG_LEVEL >= LL_FATAL_ERROR
#define LOGf(...) LOGX(LL_FATAL_ERROR, __VA_ARGS__)
#define LOGf_IF(_cond_, ...) LOGX_IF(LL_FATAL_ERROR, (_cond_), __VA_ARGS__)
#else
#define LOGf(...) do{}while(0)
#define LOGf_IF(_cond_, ...) do{}while(0)
#endif
#endif

View File

@@ -4,32 +4,30 @@
#include "constants.h"
#include "start_response.h"
static void reprint(PyObject* obj) {
PyObject* base_dict = NULL;
void logrepr(int level, PyObject* obj) {
PyObject* repr = PyObject_Repr(obj);
PyObject* str = PyUnicode_AsEncodedString(repr, "utf-8", "~E~");
const char* bytes = PyBytes_AS_STRING(str);
printf("REPR: %s\n", bytes);
LOGX(level, "REPR: %s", bytes);
Py_XDECREF(repr);
Py_XDECREF(str);
}
static void set_header(PyObject* headers, const char* key, const char* value, size_t length) {
logger("setting header");
LOGi("setting header");
int vlen = (length > 0) ? (int)length : (int)strlen(value);
PyObject* item = PyUnicode_FromStringAndSize(value, vlen);
PyObject* existing_item = PyDict_GetItemString(headers, key);
if (existing_item) {
PyObject* comma = PyUnicode_FromString(",");
PyObject* value_list = Py_BuildValue("[SS]", existing_item, item);
PyObject* updated_item = PyUnicode_Join(comma, value_list);
PyObject* updated_item = PyUnicode_Join(g_cv.comma, value_list);
PyDict_SetItemString(headers, key, updated_item);
Py_DECREF(updated_item);
Py_DECREF(value_list);
Py_DECREF(comma);
}
else {
PyDict_SetItemString(headers, key, item);
@@ -38,41 +36,30 @@ static void set_header(PyObject* headers, const char* key, const char* value, si
}
int on_message_begin(llhttp_t* parser) {
logger("on message begin");
LOGi("on message begin");
client_t * client = (client_t *)parser->data;
client->request.state.keep_alive = 0;
client->request.state.error = 0;
if (client->response.buffer.base)
free(client->response.buffer.base);
client->response.buffer.base = NULL;
client->response.buffer.len = 0;
client->response.buf.size = 0;
if (client->request.headers == NULL) {
PyObject* headers = PyDict_Copy(base_dict);
// Sets up base request dict for new incoming requests
// https://www.python.org/dev/peps/pep-3333/#specification-details
PyObject* io = PyImport_ImportModule("io");
PyObject* BytesIO = PyUnicode_FromString("BytesIO");
PyObject* io_BytesIO = PyObject_CallMethodObjArgs(io, BytesIO, NULL);
PyDict_SetItem(headers, wsgi_input, io_BytesIO);
PyObject* io_BytesIO = PyObject_CallMethodObjArgs(g_cv.module_io, g_cv.BytesIO, NULL);
PyDict_SetItem(headers, g_cv.wsgi_input, io_BytesIO);
client->request.headers = headers;
Py_DECREF(BytesIO);
Py_DECREF(io);
} else {
PyObject* input = PyDict_GetItem(client->request.headers, wsgi_input);
PyObject* truncate = PyUnicode_FromString("truncate");
PyObject* result1 = PyObject_CallMethodObjArgs(input, truncate, PyLong_FromLong(0L), NULL);
Py_DECREF(truncate);
PyObject* input = PyDict_GetItem(client->request.headers, g_cv.wsgi_input);
PyObject* result1 = PyObject_CallMethodObjArgs(input, g_cv.truncate, PyLong_FromLong(0L), NULL);
Py_DECREF(result1);
PyObject* seek = PyUnicode_FromString("seek");
PyObject* result2 = PyObject_CallMethodObjArgs(input, seek, PyLong_FromLong(0L), NULL);
Py_DECREF(seek);
PyObject* result2 = PyObject_CallMethodObjArgs(input, g_cv.seek, PyLong_FromLong(0L), NULL);
Py_DECREF(result2);
}
return 0;
};
int on_url(llhttp_t* parser, const char* data, size_t length) {
logger("on url");
LOGi("on url");
client_t * client = (client_t *)parser->data;
char* url = malloc(length + 1);
@@ -91,15 +78,14 @@ int on_url(llhttp_t* parser, const char* data, size_t length) {
};
int on_body(llhttp_t* parser, const char* body, size_t length) {
logger("on body");
LOGi("on body (len = %d)", (int)length);
client_t * client = (client_t *)parser->data;
PyObject* input = PyDict_GetItem(client->request.headers, wsgi_input);
PyObject* input = PyDict_GetItem(client->request.headers, g_cv.wsgi_input);
PyObject* write = PyUnicode_FromString("write");
PyObject* body_content = PyBytes_FromStringAndSize(body, length);
PyObject* result = PyObject_CallMethodObjArgs(input, write, body_content, NULL);
Py_DECREF(write);
LOGREPR(LL_TRACE, body_content);
PyObject* result = PyObject_CallMethodObjArgs(input, g_cv.write, body_content, NULL);
Py_XDECREF(result);
Py_XDECREF(body_content);
@@ -107,14 +93,18 @@ int on_body(llhttp_t* parser, const char* body, size_t length) {
};
int on_header_field(llhttp_t* parser, const char* header, size_t length) {
logger("on header field");
LOGi("on header field");
client_t * client = (client_t *)parser->data;
client->request.current_header[0] = 0; // CVE-2015-0219
char* upperHeader = malloc(length + 1);
const size_t max_len = sizeof(client->request.current_header) - 1;
if (length >= max_len - 8)
return 0;
char upperHeader[sizeof(client->request.current_header)];
for (size_t i = 0; i < length; i++) {
char current = header[i];
if (current == '_') {
client->request.current_header = NULL; // CVE-2015-0219
return 0;
}
if (current == '-') {
@@ -125,32 +115,27 @@ int on_header_field(llhttp_t* parser, const char* header, size_t length) {
}
}
upperHeader[length] = 0;
char* old_header = client->request.current_header;
if ((strcmp(upperHeader, "CONTENT_LENGTH") == 0) || (strcmp(upperHeader, "CONTENT_TYPE") == 0)) {
client->request.current_header = upperHeader;
strcpy(client->request.current_header, upperHeader);
}
else {
client->request.current_header = malloc(strlen(upperHeader) + 5);
sprintf(client->request.current_header, "HTTP_%s", upperHeader);
strcpy(client->request.current_header, "HTTP_");
strcat(client->request.current_header, upperHeader);
}
if (old_header)
free(old_header);
return 0;
};
int on_header_value(llhttp_t* parser, const char* value, size_t length) {
logger("on header value");
LOGi("on header value");
client_t * client = (client_t *)parser->data;
if (client->request.current_header != NULL) {
if (client->request.current_header[0]) {
set_header(client->request.headers, client->request.current_header, value, length);
}
return 0;
};
void set_type_error(char* type) {
void set_type_error(const char* type) {
PyErr_Format(
PyExc_TypeError, "response type should be bytes or a byte iterator, got '%s'", type
);
@@ -197,28 +182,26 @@ PyObject* extract_response(PyObject* wsgi_response) {
}
int on_message_complete(llhttp_t* parser) {
logger("on message complete");
LOGi("on message complete");
client_t * client = (client_t *)parser->data;
PyObject * headers = client->request.headers;
// Sets the input byte stream position back to 0
PyObject* body = PyDict_GetItem(headers, wsgi_input);
PyObject* seek = PyUnicode_FromString("seek");
PyObject* res = PyObject_CallMethodObjArgs(body, seek, PyLong_FromLong(0L), NULL);
PyObject* body = PyDict_GetItem(headers, g_cv.wsgi_input);
PyObject* res = PyObject_CallMethodObjArgs(body, g_cv.seek, PyLong_FromLong(0L), NULL);
Py_DECREF(res);
Py_DECREF(seek);
build_wsgi_environ(parser);
StartResponse* start_response = PyObject_NEW(StartResponse, &StartResponse_Type);
start_response->called = 0;
logger("calling wsgi application");
LOGi("calling wsgi application");
PyObject* wsgi_response;
wsgi_response = PyObject_CallFunctionObjArgs(
wsgi_app, headers, start_response, NULL
g_srv.wsgi_app, headers, start_response, NULL
);
logger("called wsgi application");
LOGi("called wsgi application");
if (PyErr_Occurred()) {
client->request.state.error = 1;
@@ -249,89 +232,95 @@ int on_message_complete(llhttp_t* parser) {
return 0;
};
void build_response(PyObject* response_body, StartResponse* response, llhttp_t* parser) {
// This function needs a clean up
int build_response_ex(void * _client, int flags, int status, const void * headers, const char * body, int body_size) {
client_t * client = (client_t *)_client;
xbuf_t * xbuf = &client->response.buf;
xbuf->size = 0; // reset buffer
StartResponse * response = NULL;
if (flags & RF_HEADERS_PYLIST) {
response = (StartResponse *)headers;
headers = NULL;
}
if (response) {
size_t status_len = 0;
const char * status_code = PyUnicode_AsUTF8AndSize(response->status, &status_len);
status = atoi(status_code);
}
if (status == 204 || status == 304)
body_size = 0; // response has no content
logger("building response");
client_t * client = (client_t *)parser->data;
const char * status_name = llhttp_status_name(status);
if (!status_name)
return -1;
int response_has_no_content = 0;
char * buf = xbuf_expand(xbuf, 128);
xbuf->size += sprintf(buf, "HTTP/1.1 %d %s\r\n", status, status_name);
PyObject* status = PyUnicode_AsUTF8String(response->status);
char* status_code = PyBytes_AS_STRING(status);
if (strncmp(status_code, "204", 3) == 0 || strncmp(status_code, "304", 3) == 0) {
response_has_no_content = 1;
if (flags & RF_SET_KEEP_ALIVE) {
xbuf_add_str(xbuf, "Connection: Keep-Alive\r\n");
} else {
xbuf_add_str(xbuf, "Connection: close\r\n");
}
char* buf = malloc(strlen(status_code) + 10);
sprintf(buf, "HTTP/1.1 %s", status_code);
Py_DECREF(status);
if (response) {
Py_ssize_t hsize = PyList_GET_SIZE(response->headers);
for (Py_ssize_t i = 0; i < hsize; i++) {
PyObject* tuple = PyList_GET_ITEM(response->headers, i);
char* connection_header = "\r\nConnection: close";
if (llhttp_should_keep_alive(parser)) {
connection_header = "\r\nConnection: Keep-Alive";
client->request.state.keep_alive = 1;
}
char* old_buf = buf;
buf = malloc(strlen(old_buf) + strlen(connection_header));
sprintf(buf, "%s%s", old_buf, connection_header);
free(old_buf);
size_t key_len = 0;
const char * key = PyUnicode_AsUTF8AndSize(PyTuple_GET_ITEM(tuple, 0), &key_len);
int content_length_header_present = 0;
for (Py_ssize_t i = 0; i < PyList_GET_SIZE(response->headers); i++) {
PyObject* tuple = PyList_GET_ITEM(response->headers, i);
if (key_len == 14 && key[7] == '-')
if (strcasecmp(key, "Content-Length") == 0)
continue; // skip "Content-Length" header
PyObject* field = PyUnicode_AsUTF8String(PyTuple_GET_ITEM(tuple, 0));
PyObject* value = PyUnicode_AsUTF8String(PyTuple_GET_ITEM(tuple, 1));
size_t value_len = 0;
const char * value = PyUnicode_AsUTF8AndSize(PyTuple_GET_ITEM(tuple, 1), &value_len);
char* header_field = PyBytes_AS_STRING(field);
char* header_value = PyBytes_AS_STRING(value);
xbuf_add(xbuf, key, key_len);
xbuf_add(xbuf, ": ", 2);
xbuf_add(xbuf, value, value_len);
xbuf_add(xbuf, "\r\n", 2);
if (!content_length_header_present)
if (strcasecmp("Content-Length", header_field) == 0)
content_length_header_present = 1;
char* old_buf = buf;
buf = malloc(strlen(old_buf) + strlen(header_field) + strlen(header_value) + 5);
sprintf(buf, "%s\r\n%s: %s", old_buf, header_field, header_value);
free(old_buf);
Py_DECREF(field);
Py_DECREF(value);
logger("added header");
}
if (response_has_no_content) {
char* old_buf = buf;
buf = malloc(strlen(old_buf) + 26);
sprintf(buf, "%s\r\nContent-Length: 0\r\n\r\n", old_buf);
free(old_buf);
}
else {
char* response_body_str = PyBytes_AS_STRING(response_body);
if (content_length_header_present == 0) {
char* old_buf = buf;
buf = malloc(strlen(old_buf) + 32);
sprintf(buf, "%s\r\nContent-Length: %ld", old_buf, strlen(response_body_str));
free(old_buf);
LOGi("added header \"%s: %s\"", key, value);
}
char* old_buf = buf;
buf = malloc(strlen(old_buf) + strlen(response_body_str) + 5);
sprintf(buf, "%s\r\n\r\n%s", old_buf, response_body_str);
free(old_buf);
}
else if (headers) {
xbuf_add_str(xbuf, (const char *)headers);
}
logger(buf);
client->response.buffer.base = buf;
client->response.buffer.len = strlen(buf);
if (!body || body_size == 0) {
xbuf_add_str(xbuf, "Content-Length: 0\r\n");
body_size = 0;
//LOGi("added header \"Content-Length: 0\"");
} else {
char * buf = xbuf_expand(xbuf, 48);
xbuf->size += sprintf(buf, "Content-Length: %d\r\n", (int)body_size);
//LOGi("added header \"Content-Length: %d\"", (int)body_size);
}
xbuf_add(xbuf, "\r\n", 2); // end of headers
if (body && body_size)
xbuf_add(xbuf, body, body_size);
LOGt(xbuf->data);
return xbuf->size;
}
void build_response(PyObject* response_body, StartResponse* response, llhttp_t* parser) {
LOGi("building response");
client_t * client = (client_t *)parser->data;
int flags = RF_HEADERS_PYLIST;
if (client->request.state.keep_alive)
flags |= RF_SET_KEEP_ALIVE;
size_t body_size = PyBytes_GET_SIZE(response_body);
char * body_data = PyBytes_AS_STRING(response_body);
build_response_ex(client, flags, 0, response, body_data, body_size);
}
void build_wsgi_environ(llhttp_t* parser) {
logger("building wsgi environ");
LOGi("building wsgi environ");
client_t * client = (client_t *)parser->data;
PyObject * headers = client->request.headers;
@@ -345,16 +334,16 @@ void build_wsgi_environ(llhttp_t* parser) {
void init_request_dict() {
// only constant values!!!
base_dict = PyDict_New();
PyDict_SetItem(base_dict, SCRIPT_NAME, empty_string);
PyDict_SetItem(base_dict, SERVER_NAME, server_host);
PyDict_SetItem(base_dict, SERVER_PORT, server_port);
//PyDict_SetItem(base_dict, wsgi_input, io_BytesIO); // not const!!!
PyDict_SetItem(base_dict, wsgi_version, version);
PyDict_SetItem(base_dict, wsgi_url_scheme, http_scheme);
PyDict_SetItem(base_dict, wsgi_errors, PySys_GetObject("stderr"));
PyDict_SetItem(base_dict, wsgi_run_once, Py_False);
PyDict_SetItem(base_dict, wsgi_multithread, Py_False);
PyDict_SetItem(base_dict, wsgi_multiprocess, Py_True);
PyDict_SetItem(base_dict, g_cv.SCRIPT_NAME, g_cv.empty_string);
PyDict_SetItem(base_dict, g_cv.SERVER_NAME, g_cv.server_host);
PyDict_SetItem(base_dict, g_cv.SERVER_PORT, g_cv.server_port);
//PyDict_SetItem(base_dict, g_cv.wsgi_input, io_BytesIO); // not const!!!
PyDict_SetItem(base_dict, g_cv.wsgi_version, g_cv.version);
PyDict_SetItem(base_dict, g_cv.wsgi_url_scheme, g_cv.http_scheme);
PyDict_SetItem(base_dict, g_cv.wsgi_errors, PySys_GetObject("stderr"));
PyDict_SetItem(base_dict, g_cv.wsgi_run_once, Py_False);
PyDict_SetItem(base_dict, g_cv.wsgi_multithread, Py_False);
PyDict_SetItem(base_dict, g_cv.wsgi_multiprocess, Py_True);
}
void configure_parser_settings() {

View File

@@ -1,21 +1,28 @@
#ifndef FASTWSGI_REQUEST_H_
#define FASTWSGI_REQUEST_H_
#ifdef _MSC_VER
// strncasecmp is not available on Windows
#define strncasecmp _strnicmp
#define strcasecmp _stricmp
#endif
#include "common.h"
#include "start_response.h"
PyObject* base_dict;
extern PyObject* base_dict;
void init_request_dict();
void build_wsgi_environ(llhttp_t* parser);
typedef enum {
RF_EMPTY = 0x00,
RF_SET_KEEP_ALIVE = 0x01,
RF_HEADERS_PYLIST = 0x02
} response_flag_t;
int build_response_ex(void * client, int flags, int status, const void * headers, const char * body, int body_size);
void build_response(PyObject* wsgi_response, StartResponse* response, llhttp_t* parser);
llhttp_settings_t parser_settings;
void configure_parser_settings();
void logrepr(int level, PyObject* obj);
#define LOGREPR(_level_, _msg_) if (g_log_level >= _level_) logrepr(_level_, _msg_)
#endif

View File

@@ -10,31 +10,14 @@
#include "request.h"
#include "constants.h"
PyObject* wsgi_app;
char* host;
int port;
int backlog;
server_t g_srv;
uv_tcp_t server;
uv_loop_t* loop;
uv_os_fd_t file_descriptor;
struct sockaddr_in addr;
static const char* BAD_REQUEST = "HTTP/1.1 400 Bad Request\r\n\r\n";
static const char* INTERNAL_ERROR = "HTTP/1.1 500 Internal Server Error\r\n\r\n";
void logger(char* message) {
if (LOGGING_ENABLED)
fprintf(stdout, ">>> %s\n", message);
}
void close_cb(uv_handle_t* handle) {
logger("disconnected");
client_t * client = (client_t *)handle->data;
LOGi("disconnected");
client_t * client = (client_t *)handle;
Py_XDECREF(client->request.headers);
if (client->response.buffer.base)
free(client->response.buffer.base);
xbuf_free(&client->response.buf);
free(client);
}
@@ -54,66 +37,97 @@ void shutdown_connection(uv_stream_t* handle) {
}
void write_cb(uv_write_t* req, int status) {
if (status) {
fprintf(stderr, "Write error %s\n", uv_strerror(status));
}
LOGe_IF(status, "Write error %s\n", uv_strerror(status));
//write_req_t* write_req = (write_req_t*)req;
free(req);
}
void stream_write(uv_stream_t* handle, const void* data, size_t size) {
if (!data || size == 0)
return;
void stream_write(client_t * client, const void* data, size_t size) {
if (!data || !size) {
data = (const void*)client->response.buf.data;
size = (size_t)client->response.buf.size;
if (!data || size == 0)
return;
}
size_t req_size = _Py_SIZE_ROUND_UP(sizeof(write_req_t), 16);
write_req_t* req = (write_req_t*)malloc(req_size + size);
req->buf.base = (char *)req + req_size;
req->buf.len = size;
memcpy(req->buf.base, data, size);
uv_write((uv_write_t*)req, handle, &req->buf, 1, write_cb);
uv_write((uv_write_t*)req, (uv_stream_t*)client, &req->buf, 1, write_cb);
}
void send_error(uv_stream_t* handle, const char* error_string) {
stream_write(handle, error_string, strlen(error_string));
shutdown_connection(handle); // fixme: maybe check keep_alive???
void send_fatal(client_t * client, int status, const char* error_string) {
if (!status)
status = HTTP_STATUS_BAD_REQUEST;
LOGe("send_fatal: %d", status);
int body_size = error_string ? strlen(error_string) : 0;
build_response_ex(client, 0, status, NULL, error_string, body_size);
stream_write(client, NULL, 0);
shutdown_connection((uv_stream_t*)client);
}
void send_response(uv_stream_t* handle, client_t* client) {
uv_buf_t * resbuf = &client->response.buffer;
stream_write(handle, resbuf->base, resbuf->len);
void send_error(client_t * client, int status, const char* error_string) {
if (!status)
status = HTTP_STATUS_INTERNAL_SERVER_ERROR;
LOGe("send_error: %d", status);
int flags = (client->request.state.keep_alive) ? RF_SET_KEEP_ALIVE : 0;
int body_size = error_string ? strlen(error_string) : 0;
build_response_ex(client, flags, status, NULL, error_string, body_size);
stream_write(client, NULL, 0);
if (!client->request.state.keep_alive)
shutdown_connection(handle);
shutdown_connection((uv_stream_t*)client);
}
void send_response(client_t * client) {
LOGi("send_response %d bytes", client->response.buf.size);
stream_write(client, NULL, 0);
if (!client->request.state.keep_alive)
shutdown_connection((uv_stream_t*)client);
}
void read_cb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
int continue_read = 0;
client_t* client = (client_t*)handle->data;
client_t * client = (client_t *)handle;
llhttp_t * parser = &client->request.parser;
if (client->response.buf.data == NULL)
xbuf_init(&client->response.buf, NULL, 64*1024);
if (nread > 0) {
if ((ssize_t)buf->len > nread)
buf->base[nread] = 0;
LOGd("read_cb: [nread = %d]", (int)nread);
LOGt(buf->base);
enum llhttp_errno err = llhttp_execute(parser, buf->base, nread);
if (err == HPE_OK) {
logger("Successfully parsed");
if (client->response.buffer.len > 0)
send_response(handle, client);
LOGi("Successfully parsed (response len = %d)", client->response.buf.size);
if (client->response.buf.size > 0)
send_response(client);
else if (client->request.state.error)
send_error(handle, INTERNAL_ERROR);
send_error(client, HTTP_STATUS_INTERNAL_SERVER_ERROR, NULL);
else
continue_read = 1;
}
else {
fprintf(stderr, "Parse error: %s %s\n", llhttp_errno_name(err), client->request.parser.reason);
send_error(handle, BAD_REQUEST);
LOGe("Parse error: %s %s\n", llhttp_errno_name(err), client->request.parser.reason);
send_fatal(client, HTTP_STATUS_BAD_REQUEST, NULL);
}
}
LOGd_IF(!nread, "read_cb: nread = 0");
if (nread < 0) {
char err_name[128];
uv_err_name_r((int)nread, err_name, sizeof(err_name) - 1);
LOGd("read_cb: nread = %d error: %s", (int)nread, err_name);
uv_read_stop(handle);
if (nread == UV_EOF) { // remote peer disconnected
close_connection(handle);
} else {
if (nread != UV_ECONNRESET)
fprintf(stderr, "Read error %s\n", uv_err_name(nread));
LOGe("Read error: %s", err_name);
shutdown_connection(handle);
}
}
@@ -126,20 +140,22 @@ void read_cb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
}
void alloc_cb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
logger("Allocating buffer");
buf->base = (char*)malloc(suggested_size);
LOGi("Allocating buffer (size = %d)", (int)suggested_size);
buf->base = (char*)malloc(suggested_size + 1);
buf->len = suggested_size;
buf->base[suggested_size] = 0;
}
void connection_cb(uv_stream_t* server, int status) {
if (status < 0) {
fprintf(stderr, "Connection error %s\n", uv_strerror(status));
LOGe("Connection error %s\n", uv_strerror(status));
return;
}
LOGi("new connection...");
client_t* client = calloc(1, sizeof(client_t));
client->srv = &g_srv;
uv_tcp_init(loop, &client->handle);
uv_tcp_init(g_srv.loop, &client->handle);
uv_tcp_nodelay(&client->handle, 0);
uv_tcp_keepalive(&client->handle, 1, 60);
@@ -164,57 +180,61 @@ void connection_cb(uv_stream_t* server, int status) {
void signal_handler(uv_signal_t* req, int signum) {
if (signum == SIGINT) {
uv_stop(loop);
uv_stop(g_srv.loop);
uv_signal_stop(req);
exit(0);
}
}
int main() {
loop = uv_default_loop();
g_srv.loop = uv_default_loop();
configure_parser_settings();
init_constants();
init_request_dict();
uv_ip4_addr(host, port, &addr);
struct sockaddr_in addr;
uv_ip4_addr(g_srv.host, g_srv.port, &addr);
uv_tcp_init_ex(loop, &server, AF_INET);
uv_tcp_init_ex(g_srv.loop, &g_srv.server, AF_INET);
uv_fileno((const uv_handle_t*)&server, &file_descriptor);
uv_fileno((const uv_handle_t*)&g_srv.server, &g_srv.file_descriptor);
int enabled = 1;
#ifdef _WIN32
//uv__socket_sockopt((uv_handle_t*)&server, SO_REUSEADDR, &enabled);
//uv__socket_sockopt((uv_handle_t*)&g_srv.server, SO_REUSEADDR, &enabled);
#else
int so_reuseport = 15; // SO_REUSEPORT
uv__socket_sockopt((uv_handle_t*)&server, so_reuseport, &enabled);
uv__socket_sockopt((uv_handle_t*)&g_srv.server, so_reuseport, &enabled);
#endif
int err = uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
int err = uv_tcp_bind(&g_srv.server, (const struct sockaddr*)&addr, 0);
if (err) {
fprintf(stderr, "Bind error %s\n", uv_strerror(err));
LOGe("Bind error %s\n", uv_strerror(err));
return 1;
}
err = uv_listen((uv_stream_t*)&server, backlog, connection_cb);
err = uv_listen((uv_stream_t*)&g_srv.server, g_srv.backlog, connection_cb);
if (err) {
fprintf(stderr, "Listen error %s\n", uv_strerror(err));
LOGe("Listen error %s\n", uv_strerror(err));
return 1;
}
uv_signal_t signal;
uv_signal_init(loop, &signal);
uv_signal_init(g_srv.loop, &signal);
uv_signal_start(&signal, signal_handler, SIGINT);
uv_run(loop, UV_RUN_DEFAULT);
uv_loop_close(loop);
free(loop);
uv_run(g_srv.loop, UV_RUN_DEFAULT);
uv_loop_close(g_srv.loop);
free(g_srv.loop);
return 0;
}
PyObject* run_server(PyObject* self, PyObject* args) {
PyArg_ParseTuple(args, "Osiii", &wsgi_app, &host, &port, &backlog, &LOGGING_ENABLED);
memset(&g_srv, 0, sizeof(g_srv));
int log_level = 0;
PyArg_ParseTuple(args, "Osiii", &g_srv.wsgi_app, &g_srv.host, &g_srv.port, &g_srv.backlog, &log_level);
set_log_level(log_level);
main();
Py_RETURN_NONE;
}

View File

@@ -1,42 +1,48 @@
#ifndef FASTWSGI_SERVER_H_
#define FASTWSGI_SERVER_H_
#include <Python.h>
#include "uv.h"
#include "uv-common.h"
#include "common.h"
#include "llhttp.h"
#include "request.h"
extern PyObject* wsgi_app;
#include "xbuf.h"
typedef struct {
uv_write_t req;
uv_buf_t buf;
} write_req_t;
typedef struct {
uv_tcp_t server;
uv_loop_t* loop;
uv_os_fd_t file_descriptor;
PyObject* wsgi_app;
char* host;
int port;
int backlog;
} server_t;
typedef struct {
int error;
int keep_alive;
} RequestState;
typedef struct {
uv_tcp_t handle; // peer connection
uv_tcp_t handle; // peer connection. Placement strictly at the beginning of the structure!
server_t * srv;
char remote_addr[24];
struct {
PyObject* headers;
char* current_header;
char current_header[128];
llhttp_t parser;
RequestState state;
} request;
struct {
uv_buf_t buffer;
xbuf_t buf;
} response;
} client_t;
extern server_t g_srv;
PyObject* run_server(PyObject* self, PyObject* args);
int LOGGING_ENABLED;
void logger(char* message);
#endif

View File

@@ -1,6 +1,7 @@
#ifndef START_RESPONSE_H_
#define START_RESPONSE_H_
#include <Python.h>
#include "common.h"
typedef struct {
PyObject ob_base;

96
fastwsgi/xbuf.h Normal file
View File

@@ -0,0 +1,96 @@
#ifndef FASTWSGI_XBUF_H_
#define FASTWSGI_XBUF_H_
#include "common.h"
typedef struct {
char * data;
int size;
int capacity;
} xbuf_t;
INLINE
char * xbuf_expand(xbuf_t * buf, size_t expand_size)
{
int need_size = buf->size + (int)expand_size + 8;
if (need_size >= buf->capacity) {
int new_cap = need_size;
if (new_cap < 1*1024*1024) {
new_cap *= 2;
} else {
new_cap += 512*1024;
}
char * new_ptr = (char *)malloc(new_cap + 2);
if (!new_ptr)
return NULL; // error
new_ptr[0] = 0;
if (buf->data) {
if (buf->size) {
memcpy(new_ptr, buf->data, buf->size);
new_ptr[buf->size] = 0;
}
free(buf->data);
}
buf->data = new_ptr;
buf->capacity = new_cap;
}
return buf->data + buf->size;
}
INLINE
int xbuf_init(xbuf_t * buf, const void * data, size_t size)
{
buf->data = NULL;
buf->size = 0;
buf->capacity = 0;
if (size > 0) {
char * ptr = xbuf_expand(buf, size);
if (!ptr)
return -1; // error
if (data) {
memcpy(buf->data, data, size);
buf->data[size] = 0;
buf->size = size;
}
}
return 0;
}
INLINE
int xbuf_init_str(xbuf_t * buf, const char * str)
{
return xbuf_init(buf, str, strlen(str));
}
INLINE
int xbuf_add(xbuf_t * buf, const void * data, size_t size)
{
char * ptr = xbuf_expand(buf, size);
if (!ptr)
return -1; // error
if (data && size) {
memcpy(ptr, data, size);
ptr[size] = 0;
buf->size += size;
}
return buf->size;
}
INLINE
int xbuf_add_str(xbuf_t * buf, const char * str)
{
return xbuf_add(buf, str, strlen(str));
}
INLINE
void xbuf_free(xbuf_t * buf)
{
if (buf->data)
free(buf->data);
buf->data = NULL;
buf->size = 0;
buf->capacity = 0;
}
#endif

2
llhttp

Submodule llhttp updated: 99c1055266...a294239338