From 74ee1463fe7fe63b76c565e6a1717174487dbf78 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Tue, 9 Dec 2025 15:41:37 +0100 Subject: [PATCH] chore(deps/llama-cpp): bump to '2fa51c19b028180b35d316e9ed06f5f0f7ada2c1' (#7484) Signed-off-by: Ettore Di Giacinto --- backend/cpp/llama-cpp/Makefile | 2 +- backend/cpp/llama-cpp/grpc-server.cpp | 69 ++++++++++----------------- 2 files changed, 26 insertions(+), 45 deletions(-) diff --git a/backend/cpp/llama-cpp/Makefile b/backend/cpp/llama-cpp/Makefile index ff4989293..0d82b83b1 100644 --- a/backend/cpp/llama-cpp/Makefile +++ b/backend/cpp/llama-cpp/Makefile @@ -1,5 +1,5 @@ -LLAMA_VERSION?=db97837385edfbc772230debbd49e5efae843a71 +LLAMA_VERSION?=2fa51c19b028180b35d316e9ed06f5f0f7ada2c1 LLAMA_REPO?=https://github.com/ggerganov/llama.cpp CMAKE_ARGS?= diff --git a/backend/cpp/llama-cpp/grpc-server.cpp b/backend/cpp/llama-cpp/grpc-server.cpp index cb48e172b..aa5ea7e81 100644 --- a/backend/cpp/llama-cpp/grpc-server.cpp +++ b/backend/cpp/llama-cpp/grpc-server.cpp @@ -688,9 +688,8 @@ public: auto completion_id = gen_chatcmplid(); - // need to store the reader as a pointer, so that it won't be destroyed when the handle returns - auto queues = ctx_server.get_queues(); - const auto rd = std::make_shared(queues, 1); // HTTP_POLLING_SECONDS = 1 + // get response reader - it contains references to the queues and will stay valid + auto rd = ctx_server.get_response_reader(); try { std::vector tasks; @@ -1169,7 +1168,6 @@ public: } } - const auto & prompt = prompt_str; const auto type = SERVER_TASK_TYPE_COMPLETION; // TODO: this log can become very long, put it behind a flag or think about a more compact format //SRV_DBG("Prompt: %s\n", prompt.is_string() ? prompt.get().c_str() : prompt.dump(2).c_str()); @@ -1211,12 +1209,10 @@ public: } tasks.reserve(inputs.size()); - std::vector states; - states.reserve(inputs.size()); for (size_t i = 0; i < inputs.size(); i++) { server_task task = server_task(type); - task.id = queues.first.get_new_id(); + task.id = rd.queue_tasks.get_new_id(); task.index = i; task.tokens = std::move(inputs[i]); @@ -1231,20 +1227,16 @@ public: task.params.oaicompat_cmpl_id = completion_id; // oaicompat_model is already populated by params_from_json_cmpl - // Extract oaicompat_chat_syntax for state tracking before moving task - states.push_back(task.params.oaicompat_chat_syntax); - tasks.push_back(std::move(task)); } - rd->set_states(std::move(states)); - rd->post_tasks(std::move(tasks)); + rd.post_tasks(std::move(tasks)); } catch (const std::exception & e) { return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, e.what()); } // Get first result for error checking (following server.cpp pattern) - server_task_result_ptr first_result = rd->next([&context]() { return context->IsCancelled(); }); + server_task_result_ptr first_result = rd.next([&context]() { return context->IsCancelled(); }); if (first_result == nullptr) { // connection is closed return grpc::Status(grpc::StatusCode::CANCELLED, "Request cancelled by client"); @@ -1313,13 +1305,13 @@ public: } // Process subsequent results - while (rd->has_next()) { + while (rd.has_next()) { // Check if context is cancelled before processing result if (context->IsCancelled()) { break; } - auto result = rd->next([&context]() { return context->IsCancelled(); }); + auto result = rd.next([&context]() { return context->IsCancelled(); }); if (result == nullptr) { // connection is closed break; @@ -1402,8 +1394,7 @@ public: } std::cout << "[PREDICT] Received result: " << data.dump(2) << std::endl; auto completion_id = gen_chatcmplid(); - auto queues = ctx_server.get_queues(); - const auto rd = std::make_shared(queues, 1); // HTTP_POLLING_SECONDS = 1 + auto rd = ctx_server.get_response_reader(); try { std::vector tasks; @@ -1907,7 +1898,6 @@ public: } } - const auto & prompt = prompt_str; const auto type = SERVER_TASK_TYPE_COMPLETION; // TODO: this log can become very long, put it behind a flag or think about a more compact format //SRV_DBG("Prompt: %s\n", prompt.is_string() ? prompt.get().c_str() : prompt.dump(2).c_str()); @@ -1952,12 +1942,10 @@ public: } tasks.reserve(inputs.size()); - std::vector states; - states.reserve(inputs.size()); for (size_t i = 0; i < inputs.size(); i++) { server_task task = server_task(type); - task.id = queues.first.get_new_id(); + task.id = rd.queue_tasks.get_new_id(); task.index = i; task.tokens = std::move(inputs[i]); @@ -1972,14 +1960,10 @@ public: task.params.oaicompat_cmpl_id = completion_id; // oaicompat_model is already populated by params_from_json_cmpl - // Extract oaicompat_chat_syntax for state tracking before moving task - states.push_back(task.params.oaicompat_chat_syntax); - tasks.push_back(std::move(task)); } - rd->set_states(std::move(states)); - rd->post_tasks(std::move(tasks)); + rd.post_tasks(std::move(tasks)); } catch (const std::exception & e) { return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, e.what()); } @@ -1988,7 +1972,7 @@ public: std::cout << "[DEBUG] Waiting for results..." << std::endl; // Wait for all results - auto all_results = rd->wait_for_all([&context]() { return context->IsCancelled(); }); + auto all_results = rd.wait_for_all([&context]() { return context->IsCancelled(); }); if (all_results.is_terminated) { return grpc::Status(grpc::StatusCode::CANCELLED, "Request cancelled by client"); @@ -2090,14 +2074,13 @@ public: int embd_normalize = 2; // default to Euclidean/L2 norm // create and queue the task - auto queues = ctx_server.get_queues(); - const auto rd = std::make_shared(queues, 1); // HTTP_POLLING_SECONDS = 1 + auto rd = ctx_server.get_response_reader(); { std::vector tasks; for (size_t i = 0; i < tokenized_prompts.size(); i++) { server_task task = server_task(SERVER_TASK_TYPE_EMBEDDING); - task.id = queues.first.get_new_id(); + task.id = rd.queue_tasks.get_new_id(); task.index = i; task.tokens = std::move(tokenized_prompts[i]); @@ -2106,11 +2089,11 @@ public: tasks.push_back(std::move(task)); } - rd->post_tasks(std::move(tasks)); + rd.post_tasks(std::move(tasks)); } // Wait for all results - auto all_results = rd->wait_for_all([&context]() { return context->IsCancelled(); }); + auto all_results = rd.wait_for_all([&context]() { return context->IsCancelled(); }); if (all_results.is_terminated) { return grpc::Status(grpc::StatusCode::CANCELLED, "Request cancelled by client"); @@ -2173,8 +2156,7 @@ public: } // Create and queue the task - auto queues = ctx_server.get_queues(); - const auto rd = std::make_shared(queues, 1); // HTTP_POLLING_SECONDS = 1 + auto rd = ctx_server.get_response_reader(); { std::vector tasks; std::vector documents; @@ -2186,17 +2168,17 @@ public: for (size_t i = 0; i < documents.size(); i++) { auto tmp = format_prompt_rerank(ctx_server.impl->model, ctx_server.impl->vocab, ctx_server.impl->mctx, request->query(), documents[i]); server_task task = server_task(SERVER_TASK_TYPE_RERANK); - task.id = queues.first.get_new_id(); + task.id = rd.queue_tasks.get_new_id(); task.index = i; task.tokens = std::move(tmp); tasks.push_back(std::move(task)); } - rd->post_tasks(std::move(tasks)); + rd.post_tasks(std::move(tasks)); } // Wait for all results - auto all_results = rd->wait_for_all([&context]() { return context->IsCancelled(); }); + auto all_results = rd.wait_for_all([&context]() { return context->IsCancelled(); }); if (all_results.is_terminated) { return grpc::Status(grpc::StatusCode::CANCELLED, "Request cancelled by client"); @@ -2255,7 +2237,6 @@ public: json tokens_response = json::array(); if (body.count("prompt") != 0) { const bool add_special = json_value(body, "add_special", false); - const bool with_pieces = json_value(body, "with_pieces", false); llama_tokens tokens = tokenize_mixed(ctx_server.impl->vocab, body.at("content"), add_special, true); @@ -2272,18 +2253,18 @@ public: grpc::Status GetMetrics(ServerContext* context, const backend::MetricsRequest* request, backend::MetricsResponse* response) { // request slots data using task queue - auto queues = ctx_server.get_queues(); - int task_id = queues.first.get_new_id(); + auto rd = ctx_server.get_response_reader(); + int task_id = rd.queue_tasks.get_new_id(); { server_task task(SERVER_TASK_TYPE_METRICS); task.id = task_id; - queues.second.add_waiting_task_id(task_id); - queues.first.post(std::move(task), true); // high-priority task + rd.queue_results.add_waiting_task_id(task_id); + rd.queue_tasks.post(std::move(task), true); // high-priority task } // get the result - server_task_result_ptr result = queues.second.recv(task_id); - queues.second.remove_waiting_task_id(task_id); + server_task_result_ptr result = rd.queue_results.recv(task_id); + rd.queue_results.remove_waiting_task_id(task_id); if (result->is_error()) { // Handle case when no active slot exists