Merge topic 'ctest-jobserver-client'

80fe56c481 ctest: Add support for running under a make job server on POSIX systems
5396f4a9a3 cmUVJobServerClient: Add libuv-based job server integration client

Acked-by: Kitware Robot <kwrobot@kitware.com>
Tested-by: buildbot <buildbot@kitware.com>
Merge-request: !9021
This commit is contained in:
Brad King
2023-12-04 14:47:05 +00:00
committed by Kitware Robot
18 changed files with 933 additions and 0 deletions
+3
View File
@@ -1091,6 +1091,9 @@ add_library(
CTest/cmCTestP4.cxx
CTest/cmCTestP4.h
CTest/cmUVJobServerClient.cxx
CTest/cmUVJobServerClient.h
LexerParser/cmCTestResourceGroupsLexer.cxx
LexerParser/cmCTestResourceGroupsLexer.h
LexerParser/cmCTestResourceGroupsLexer.in.l
@@ -40,6 +40,7 @@
#include "cmRange.h"
#include "cmStringAlgorithms.h"
#include "cmSystemTools.h"
#include "cmUVJobServerClient.h"
#include "cmWorkingDirectory.h"
namespace cmsys {
@@ -130,10 +131,19 @@ void cmCTestMultiProcessHandler::InitializeLoop()
this->Loop.init();
this->StartNextTestsOnIdle_.init(*this->Loop, this);
this->StartNextTestsOnTimer_.init(*this->Loop, this);
this->JobServerClient = cmUVJobServerClient::Connect(
*this->Loop, /*onToken=*/[this]() { this->JobServerReceivedToken(); },
/*onDisconnect=*/nullptr);
if (this->JobServerClient) {
cmCTestLog(this->CTest, OUTPUT,
"Connected to MAKE jobserver" << std::endl);
}
}
void cmCTestMultiProcessHandler::FinalizeLoop()
{
this->JobServerClient.reset();
this->StartNextTestsOnTimer_.reset();
this->StartNextTestsOnIdle_.reset();
this->Loop.reset();
@@ -461,6 +471,26 @@ std::string cmCTestMultiProcessHandler::GetName(int test)
void cmCTestMultiProcessHandler::StartTest(int test)
{
if (this->JobServerClient) {
// There is a job server. Request a token and queue the test to run
// when a token is received. Note that if we do not get a token right
// away it's possible that the system load will be higher when the
// token is received and we may violate the test-load limit. However,
// this is unlikely because if we do not get a token right away, some
// other job that's currently running must finish before we get one.
this->JobServerClient->RequestToken();
this->JobServerQueuedTests.emplace_back(test);
} else {
// There is no job server. Start the test now.
this->StartTestProcess(test);
}
}
void cmCTestMultiProcessHandler::JobServerReceivedToken()
{
assert(!this->JobServerQueuedTests.empty());
int test = this->JobServerQueuedTests.front();
this->JobServerQueuedTests.pop_front();
this->StartTestProcess(test);
}
@@ -692,6 +722,9 @@ void cmCTestMultiProcessHandler::FinishTestProcess(
runner.reset();
if (this->JobServerClient) {
this->JobServerClient->ReleaseToken();
}
this->StartNextTestsOnIdle();
}
+10
View File
@@ -19,6 +19,7 @@
#include "cmCTestResourceSpec.h"
#include "cmCTestTestHandler.h"
#include "cmUVHandlePtr.h"
#include "cmUVJobServerClient.h"
struct cmCTestBinPackerAllocation;
class cmCTestRunTest;
@@ -204,6 +205,15 @@ protected:
cmCTestResourceAllocator ResourceAllocator;
std::vector<cmCTestTestHandler::cmCTestTestResult>* TestResults;
size_t ParallelLevel; // max number of process that can be run at once
// 'make' jobserver client. If connected, we acquire a token
// for each test before running its process.
cm::optional<cmUVJobServerClient> JobServerClient;
// List of tests that are queued to run when a token is available.
std::list<int> JobServerQueuedTests;
// Callback invoked when a token is received.
void JobServerReceivedToken();
unsigned long TestLoad;
unsigned long FakeLoadForTesting;
cm::uv_loop_ptr Loop;
+518
View File
@@ -0,0 +1,518 @@
/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying
file Copyright.txt or https://cmake.org/licensing for details. */
#include "cmUVJobServerClient.h"
#include <cassert>
#include <utility>
#ifndef _WIN32
# include <cstdio>
# include <string>
# include <vector>
# include <fcntl.h>
# include <unistd.h>
#endif
#include <cm/memory>
#include <cm/optional>
#include <cm/string_view>
#include "cmRange.h"
#include "cmStringAlgorithms.h"
#include "cmSystemTools.h"
#include "cmUVHandlePtr.h"
class cmUVJobServerClient::Impl
{
public:
uv_loop_t& Loop;
cm::uv_idle_ptr ImplicitToken;
std::function<void()> OnToken;
std::function<void(int)> OnDisconnect;
// The number of tokens held by this client.
unsigned int HeldTokens = 0;
// The number of tokens we need to receive from the job server.
unsigned int NeedTokens = 0;
Impl(uv_loop_t& loop);
virtual ~Impl();
virtual void SendToken() = 0;
virtual void StartReceivingTokens() = 0;
virtual void StopReceivingTokens() = 0;
void RequestToken();
void ReleaseToken();
void RequestExplicitToken();
void DecrementNeedTokens();
void HoldToken();
void RequestImplicitToken();
void ReleaseImplicitToken();
void ReceivedToken();
void Disconnected(int status);
};
cmUVJobServerClient::Impl::Impl(uv_loop_t& loop)
: Loop(loop)
{
this->ImplicitToken.init(this->Loop, this);
}
cmUVJobServerClient::Impl::~Impl() = default;
void cmUVJobServerClient::Impl::RequestToken()
{
if (this->HeldTokens == 0 && !uv_is_active(this->ImplicitToken)) {
this->RequestImplicitToken();
} else {
this->RequestExplicitToken();
}
}
void cmUVJobServerClient::Impl::ReleaseToken()
{
assert(this->HeldTokens > 0);
--this->HeldTokens;
if (this->HeldTokens == 0) {
// This was the token implicitly owned by our process.
this->ReleaseImplicitToken();
} else {
// This was a token we received from the job server. Send it back.
this->SendToken();
}
}
void cmUVJobServerClient::Impl::RequestExplicitToken()
{
++this->NeedTokens;
this->StartReceivingTokens();
}
void cmUVJobServerClient::Impl::DecrementNeedTokens()
{
assert(this->NeedTokens > 0);
--this->NeedTokens;
if (this->NeedTokens == 0) {
this->StopReceivingTokens();
}
}
void cmUVJobServerClient::Impl::HoldToken()
{
++this->HeldTokens;
if (this->OnToken) {
this->OnToken();
} else {
this->ReleaseToken();
}
}
void cmUVJobServerClient::Impl::RequestImplicitToken()
{
assert(this->HeldTokens == 0);
this->ImplicitToken.start([](uv_idle_t* handle) {
uv_idle_stop(handle);
auto* self = static_cast<Impl*>(handle->data);
self->HoldToken();
});
}
void cmUVJobServerClient::Impl::ReleaseImplicitToken()
{
assert(this->HeldTokens == 0);
// Use the implicit token in place of receiving one from the job server.
if (this->NeedTokens > 0) {
this->DecrementNeedTokens();
this->RequestImplicitToken();
}
}
void cmUVJobServerClient::Impl::ReceivedToken()
{
this->DecrementNeedTokens();
this->HoldToken();
}
void cmUVJobServerClient::Impl::Disconnected(int status)
{
if (this->OnDisconnect) {
this->OnDisconnect(status);
}
}
//---------------------------------------------------------------------------
// Implementation on POSIX platforms.
// https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html
#ifndef _WIN32
namespace {
class ImplPosix : public cmUVJobServerClient::Impl
{
public:
enum class Connection
{
None,
FDs,
FIFO,
};
Connection Conn = Connection::None;
cm::uv_pipe_ptr ConnRead;
cm::uv_pipe_ptr ConnWrite;
cm::uv_pipe_ptr ConnFIFO;
std::shared_ptr<std::function<void(int)>> OnWrite;
void Connect();
void ConnectFDs(int rfd, int wfd);
void ConnectFIFO(const char* path);
void Disconnect(int status);
cm::uv_pipe_ptr OpenFD(int fd);
uv_stream_t* GetWriter() const;
uv_stream_t* GetReader() const;
static void OnAllocateCB(uv_handle_t* handle, size_t suggested_size,
uv_buf_t* buf);
static void OnReadCB(uv_stream_t* stream, ssize_t nread,
const uv_buf_t* buf);
void OnAllocate(size_t suggested_size, uv_buf_t* buf);
void OnRead(ssize_t nread, const uv_buf_t* buf);
char ReadBuf = '.';
bool ReceivingTokens = false;
bool IsConnected() const;
void SendToken() override;
void StartReceivingTokens() override;
void StopReceivingTokens() override;
ImplPosix(uv_loop_t& loop);
~ImplPosix() override;
};
ImplPosix::ImplPosix(uv_loop_t& loop)
: Impl(loop)
, OnWrite(std::make_shared<std::function<void(int)>>([this](int status) {
if (status != 0) {
this->Disconnect(status);
}
}))
{
this->Connect();
}
ImplPosix::~ImplPosix()
{
this->Disconnect(0);
}
void ImplPosix::Connect()
{
// --jobserver-auth= for gnu make versions >= 4.2
// --jobserver-fds= for gnu make versions < 4.2
// -J for bsd make
static const std::vector<cm::string_view> prefixes = {
"--jobserver-auth=", "--jobserver-fds=", "-J"
};
cm::optional<std::string> makeflags = cmSystemTools::GetEnvVar("MAKEFLAGS");
if (!makeflags) {
return;
}
// Look for the *last* occurrence of jobserver flags.
cm::optional<std::string> auth;
std::vector<std::string> args;
cmSystemTools::ParseUnixCommandLine(makeflags->c_str(), args);
for (cm::string_view arg : cmReverseRange(args)) {
for (cm::string_view prefix : prefixes) {
if (cmHasPrefix(arg, prefix)) {
auth = cmTrimWhitespace(arg.substr(prefix.length()));
break;
}
}
if (auth) {
break;
}
}
if (!auth) {
return;
}
// fifo:PATH
if (cmHasLiteralPrefix(*auth, "fifo:")) {
ConnectFIFO(auth->substr(cmStrLen("fifo:")).c_str());
return;
}
// reader,writer
int reader;
int writer;
if (std::sscanf(auth->c_str(), "%d,%d", &reader, &writer) == 2) {
ConnectFDs(reader, writer);
}
}
cm::uv_pipe_ptr ImplPosix::OpenFD(int fd)
{
// Create a CLOEXEC duplicate so `uv_pipe_ptr` can close it
// without closing the original file descriptor, which our
// child processes might want to use too.
cm::uv_pipe_ptr p;
int fd_dup = dup(fd);
if (fd_dup < 0) {
return p;
}
if (fcntl(fd_dup, F_SETFD, FD_CLOEXEC) == -1) {
close(fd_dup);
return p;
}
p.init(this->Loop, 0, this);
if (uv_pipe_open(p, fd_dup) < 0) {
close(fd_dup);
}
return p;
}
void ImplPosix::ConnectFDs(int rfd, int wfd)
{
cm::uv_pipe_ptr connRead = this->OpenFD(rfd);
cm::uv_pipe_ptr connWrite = this->OpenFD(wfd);
// Verify that the read end is readable and the write end is writable.
if (!connRead || !uv_is_readable(connRead) || //
!connWrite || !uv_is_writable(connWrite)) {
return;
}
this->ConnRead = std::move(connRead);
this->ConnWrite = std::move(connWrite);
this->Conn = Connection::FDs;
}
void ImplPosix::ConnectFIFO(const char* path)
{
int fd = open(path, O_RDWR);
if (fd < 0) {
return;
}
cm::uv_pipe_ptr connFIFO;
connFIFO.init(this->Loop, 0, this);
if (uv_pipe_open(connFIFO, fd) != 0) {
close(fd);
return;
}
// Verify that the fifo is both readable and writable.
if (!connFIFO || !uv_is_readable(connFIFO) || !uv_is_writable(connFIFO)) {
return;
}
this->ConnFIFO = std::move(connFIFO);
this->Conn = Connection::FIFO;
}
void ImplPosix::Disconnect(int status)
{
if (this->Conn == Connection::None) {
return;
}
this->StopReceivingTokens();
switch (this->Conn) {
case Connection::FDs:
this->ConnRead.reset();
this->ConnWrite.reset();
break;
case Connection::FIFO:
this->ConnFIFO.reset();
break;
default:
break;
}
this->Conn = Connection::None;
if (status != 0) {
this->Disconnected(status);
}
}
uv_stream_t* ImplPosix::GetWriter() const
{
switch (this->Conn) {
case Connection::FDs:
return this->ConnWrite;
case Connection::FIFO:
return this->ConnFIFO;
default:
return nullptr;
}
}
uv_stream_t* ImplPosix::GetReader() const
{
switch (this->Conn) {
case Connection::FDs:
return this->ConnRead;
case Connection::FIFO:
return this->ConnFIFO;
default:
return nullptr;
}
}
void ImplPosix::OnAllocateCB(uv_handle_t* handle, size_t suggested_size,
uv_buf_t* buf)
{
auto* self = static_cast<ImplPosix*>(handle->data);
self->OnAllocate(suggested_size, buf);
}
void ImplPosix::OnReadCB(uv_stream_t* stream, ssize_t nread,
const uv_buf_t* buf)
{
auto* self = static_cast<ImplPosix*>(stream->data);
self->OnRead(nread, buf);
}
void ImplPosix::OnAllocate(size_t /*suggested_size*/, uv_buf_t* buf)
{
*buf = uv_buf_init(&this->ReadBuf, 1);
}
void ImplPosix::OnRead(ssize_t nread, const uv_buf_t* /*buf*/)
{
if (nread == 0) {
return;
}
if (nread < 0) {
auto status = static_cast<int>(nread);
this->Disconnect(status);
return;
}
assert(nread == 1);
this->ReceivedToken();
}
bool ImplPosix::IsConnected() const
{
return this->Conn != Connection::None;
}
void ImplPosix::SendToken()
{
if (this->Conn == Connection::None) {
return;
}
static char token = '.';
uv_buf_t const buf = uv_buf_init(&token, sizeof(token));
int status = cm::uv_write(this->GetWriter(), &buf, 1, this->OnWrite);
if (status != 0) {
this->Disconnect(status);
}
}
void ImplPosix::StartReceivingTokens()
{
if (this->Conn == Connection::None) {
return;
}
if (this->ReceivingTokens) {
return;
}
int status = uv_read_start(this->GetReader(), &ImplPosix::OnAllocateCB,
&ImplPosix::OnReadCB);
if (status != 0) {
this->Disconnect(status);
return;
}
this->ReceivingTokens = true;
}
void ImplPosix::StopReceivingTokens()
{
if (this->Conn == Connection::None) {
return;
}
if (!this->ReceivingTokens) {
return;
}
this->ReceivingTokens = false;
uv_read_stop(this->GetReader());
}
}
#endif
//---------------------------------------------------------------------------
// Implementation of public interface.
cmUVJobServerClient::cmUVJobServerClient(std::unique_ptr<Impl> impl)
: Impl_(std::move(impl))
{
}
cmUVJobServerClient::~cmUVJobServerClient() = default;
cmUVJobServerClient::cmUVJobServerClient(cmUVJobServerClient&&) noexcept =
default;
cmUVJobServerClient& cmUVJobServerClient::operator=(
cmUVJobServerClient&&) noexcept = default;
void cmUVJobServerClient::RequestToken()
{
this->Impl_->RequestToken();
}
void cmUVJobServerClient::ReleaseToken()
{
this->Impl_->ReleaseToken();
}
int cmUVJobServerClient::GetHeldTokens() const
{
return this->Impl_->HeldTokens;
}
int cmUVJobServerClient::GetNeedTokens() const
{
return this->Impl_->NeedTokens;
}
cm::optional<cmUVJobServerClient> cmUVJobServerClient::Connect(
uv_loop_t& loop, std::function<void()> onToken,
std::function<void(int)> onDisconnect)
{
#if defined(_WIN32)
// FIXME: Windows job server client not yet implemented.
static_cast<void>(loop);
static_cast<void>(onToken);
static_cast<void>(onDisconnect);
#else
auto impl = cm::make_unique<ImplPosix>(loop);
if (impl && impl->IsConnected()) {
impl->OnToken = std::move(onToken);
impl->OnDisconnect = std::move(onDisconnect);
return cmUVJobServerClient(std::move(impl));
}
#endif
return cm::nullopt;
}
+96
View File
@@ -0,0 +1,96 @@
/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying
file Copyright.txt or https://cmake.org/licensing for details. */
#pragma once
#include "cmConfigure.h" // IWYU pragma: keep
#include <functional>
#include <memory>
#include <cm/optional>
#include <cm3p/uv.h>
/** \class cmUVJobServerClient
* \brief Job server client that can integrate with a libuv event loop.
*
* Use the \a Connect method to connect to an ambient job server as
* described by the MAKEFLAGS environment variable, if any. Request
* a token using the \a RequestToken method. The \a onToken callback
* will be invoked asynchronously when the token is received. Act
* on the token, and then use \a ReleaseToken to release it.
*
* The job server protocol states that a client process implicitly
* has one free token available, corresponding to the token its
* parent used to start it. \a cmUVJobServerClient will use the
* implicit token whenever it is available instead of requesting
* an explicit token from the job server. However, clients of
* this class must still request and receive the token before
* acting on it, and cannot assume that it is always held.
*
* If the job server connection breaks, \a onDisconnect will be
* called with the libuv error. No further tokens can be received
* from the job server, but progress can still be made serially
* using the implicit token.
*/
class cmUVJobServerClient
{
public:
class Impl;
private:
std::unique_ptr<Impl> Impl_;
cmUVJobServerClient(std::unique_ptr<Impl> impl);
public:
/**
* Disconnect from the job server.
*/
~cmUVJobServerClient();
cmUVJobServerClient(cmUVJobServerClient&&) noexcept;
cmUVJobServerClient(cmUVJobServerClient const&) = delete;
cmUVJobServerClient& operator=(cmUVJobServerClient&&) noexcept;
cmUVJobServerClient& operator=(cmUVJobServerClient const&) = delete;
/**
* Request a token from the job server.
* When the token is held, the \a onToken callback will be invoked.
*/
void RequestToken();
/**
* Release a token to the job server.
* This may be called only after a corresponding \a onToken callback.
*/
void ReleaseToken();
/**
* Get the number of implicit and explicit tokens currently held.
* This is the number of times \a onToken has been called but not
* yet followed by a call to \a ReleaseToken.
* This is meant for testing and debugging.
*/
int GetHeldTokens() const;
/**
* Get the number of explicit tokens currently requested from the
* job server but not yet received. If the implicit token becomes
* available, it is used in place of a requested token, and this
* is decremented without receiving an explicit token.
* This is meant for testing and debugging.
*/
int GetNeedTokens() const;
/**
* Connect to an ambient job server, if any.
* \param loop The libuv event loop on which to schedule events.
* \param onToken Function to call when a new token is held.
* \param onDisconnect Function to call on disconnect, with libuv error.
* \returns Connected instance, or cm::nullopt.
*/
static cm::optional<cmUVJobServerClient> Connect(
uv_loop_t& loop, std::function<void()> onToken,
std::function<void(int)> onDisconnect);
};