Added support generator for response body (#33)

* Added function create_start_response

* Added support generator for WSGI response body

* Fix memory leak (wsgi_input_empty)

* Fix memory leak (shutdown_connection)
This commit is contained in:
Oleg S
2023-03-15 23:32:57 +03:00
committed by GitHub
parent d3ddd6a9f8
commit 53cb01a248
3 changed files with 62 additions and 6 deletions

View File

@@ -455,7 +455,7 @@ int on_message_complete(llhttp_t * parser)
client->request.wsgi_input_size = 0;
if (client->request.wsgi_input_empty == NULL) {
wsgi_input = PyObject_CallMethodObjArgs(g_cv.module_io, g_cv.BytesIO, NULL);
client->request.wsgi_input = wsgi_input; // object cached
client->request.wsgi_input_empty = wsgi_input; // object cached
} else {
wsgi_input = client->request.wsgi_input_empty;
}
@@ -491,8 +491,7 @@ int call_wsgi_app(client_t * client)
{
PyObject * headers = client->request.headers;
StartResponse * start_response = PyObject_NEW(StartResponse, &StartResponse_Type);
start_response->called = 0;
StartResponse * start_response = create_start_response();
client->start_response = start_response;
LOGi("calling wsgi application...");
@@ -524,7 +523,31 @@ int process_wsgi_response(client_t * client)
if (PyBytes_CheckExact(wsgi_body)) {
LOGd("wsgi_body: is PyBytes (size = %d)", (int)PyBytes_GET_SIZE(wsgi_body));
} else {
if (PyIter_Check(wsgi_body)) {
if (PyGen_Check(wsgi_body)) {
LOGd("wsgi_body: is GENERATOR '%s'", body_type);
/*
* If the application we called was a generator, we have to call .next() on
* it before we do anything else because that may execute code that
* invokes `start_response` (which might not have been invoked yet).
* Think of the following scenario:
*
* def app(environ, start_response):
* start_response('200 Ok', ...)
* yield 'Hello World'
*/
client->response.body_iterator = wsgi_body;
PyObject * item = wsgi_iterator_get_next_chunk(client, 1);
if (!item && client->error) {
err = HTTP_STATUS_INTERNAL_SERVER_ERROR;
goto fin;
}
Py_ssize_t size = (item == NULL) ? 0 : PyBytes_GET_SIZE(item);
client->response.body[0] = item;
client->response.body_chunk_num = (item == NULL) ? 0 : 1;
client->response.body_total_size = -111; // generator
client->response.body_preloaded_size = size;
}
else if (PyIter_Check(wsgi_body)) {
LOGd("wsgi_body: is ITERATOR '%s'", body_type);
client->response.body_iterator = wsgi_body;
} else {
@@ -754,6 +777,22 @@ PyObject* wsgi_iterator_get_next_chunk(client_t * client, int outpyerr)
{
if (client->response.body_iterator == NULL)
return NULL;
if (client->response.body_total_size == -111) { // generator
client->response.body_total_size = 0;
int64_t size = client->response.body_preloaded_size;
client->response.body_preloaded_size = 0;
if (client->response.body_chunk_num > 0) {
client->response.body_chunk_num = 0;
PyObject * item = client->response.body[0];
if (item && size == 0) {
Py_DECREF(item); // skip empty items
return NULL;
}
return item;
}
return NULL;
}
PyObject* item;
while (item = PyIter_Next(client->response.body_iterator)) {
if (!PyBytes_Check(item)) {

View File

@@ -73,7 +73,13 @@ void shutdown_cb(uv_shutdown_t * req, int status)
void shutdown_connection(client_t * client)
{
uv_shutdown_t* shutdown = malloc(sizeof(uv_shutdown_t));
uv_shutdown(shutdown, (uv_stream_t *)client, shutdown_cb);
if (shutdown) {
int rc = uv_shutdown(shutdown, (uv_stream_t *)client, shutdown_cb);
if (rc == 0)
return;
free(shutdown); // uv_shutdown returned UV_ENOTCONN
}
close_connection(client);
}
typedef struct {

View File

@@ -13,9 +13,20 @@ typedef struct {
extern PyTypeObject StartResponse_Type;
INLINE
static StartResponse * create_start_response(void)
{
StartResponse * response = PyObject_NEW(StartResponse, &StartResponse_Type);
response->status = NULL;
response->headers = NULL;
response->exc_info = NULL;
response->called = 0;
return response;
}
void set_status_error();
void set_header_tuple_error();
void set_header_list_error(PyObject* headers);
void exc_info_error(PyObject* exc_info);
void set_exc_info_type_error(PyObject* exc_info);
#endif