From: Stefan Becker Date: Fri, 27 May 2016 16:47:10 +0300 Subject: [PATCH] Add TokenPool monitoring to SubprocessSet::DoWork() Improve on the original jobserver client implementation. This makes ninja a more aggressive GNU make jobserver client. - add monitor interface to TokenPool - TokenPool is passed down when main loop indicates that more work is ready and would be allowed to start if a token becomes available - posix: update DoWork() to monitor TokenPool read file descriptor - WaitForCommand() exits when DoWork() sets token flag - Main loop starts over when WaitForCommand() sets token exit status --- src/build.cc | 53 +++++++++++++++++++++++++++++++++++------------ src/build.h | 3 ++- src/build_test.cc | 9 ++++++-- src/exit_status.h | 3 ++- src/subprocess-posix.cc | 33 +++++++++++++++++++++++++++-- src/subprocess-win32.cc | 2 +- src/subprocess.h | 8 ++++++- src/subprocess_test.cc | 47 ++++++++++++++++++++++++++++------------- src/tokenpool-gnu-make.cc | 5 +++++ src/tokenpool.h | 6 ++++++ 10 files changed, 134 insertions(+), 35 deletions(-) diff --git a/src/build.cc b/src/build.cc index cc796ff838fa..219bb9f1ff48 100644 --- a/src/build.cc +++ b/src/build.cc @@ -49,8 +49,9 @@ struct DryRunCommandRunner : public CommandRunner { // Overridden from CommandRunner: virtual bool CanRunMore(); + virtual bool AcquireToken(); virtual bool StartCommand(Edge* edge); - virtual bool WaitForCommand(Result* result); + virtual bool WaitForCommand(Result* result, bool more_ready); private: queue finished_; @@ -60,12 +61,16 @@ bool DryRunCommandRunner::CanRunMore() { return true; } +bool DryRunCommandRunner::AcquireToken() { + return true; +} + bool DryRunCommandRunner::StartCommand(Edge* edge) { finished_.push(edge); return true; } -bool DryRunCommandRunner::WaitForCommand(Result* result) { +bool DryRunCommandRunner::WaitForCommand(Result* result, bool more_ready) { if (finished_.empty()) return false; @@ -489,8 +494,9 @@ struct RealCommandRunner : public CommandRunner { explicit RealCommandRunner(const BuildConfig& config); virtual ~RealCommandRunner(); virtual bool CanRunMore(); + virtual bool AcquireToken(); virtual bool StartCommand(Edge* edge); - virtual bool WaitForCommand(Result* result); + virtual bool WaitForCommand(Result* result, bool more_ready); virtual vector GetActiveEdges(); virtual void Abort(); @@ -527,9 +533,12 @@ bool RealCommandRunner::CanRunMore() { subprocs_.running_.size() + subprocs_.finished_.size(); return (int)subproc_number < config_.parallelism && (subprocs_.running_.empty() || - ((config_.max_load_average <= 0.0f || - GetLoadAverage() < config_.max_load_average) - && (!tokens_ || tokens_->Acquire()))); + (config_.max_load_average <= 0.0f || + GetLoadAverage() < config_.max_load_average)); +} + +bool RealCommandRunner::AcquireToken() { + return (!tokens_ || tokens_->Acquire()); } bool RealCommandRunner::StartCommand(Edge* edge) { @@ -544,14 +553,23 @@ bool RealCommandRunner::StartCommand(Edge* edge) { return true; } -bool RealCommandRunner::WaitForCommand(Result* result) { +bool RealCommandRunner::WaitForCommand(Result* result, bool more_ready) { Subprocess* subproc; - while ((subproc = subprocs_.NextFinished()) == NULL) { - bool interrupted = subprocs_.DoWork(); + subprocs_.ResetTokenAvailable(); + while (((subproc = subprocs_.NextFinished()) == NULL) && + !subprocs_.IsTokenAvailable()) { + bool interrupted = subprocs_.DoWork(more_ready ? tokens_ : NULL); if (interrupted) return false; } + // token became available + if (subproc == NULL) { + result->status = ExitTokenAvailable; + return true; + } + + // command completed if (tokens_) tokens_->Release(); @@ -662,9 +680,14 @@ bool Builder::Build(string* err) { // command runner. // Second, we attempt to wait for / reap the next finished command. while (plan_.more_to_do()) { - // See if we can start any more commands. - if (failures_allowed && plan_.more_ready() && - command_runner_->CanRunMore()) { + // See if we can start any more commands... + bool can_run_more = + failures_allowed && + plan_.more_ready() && + command_runner_->CanRunMore(); + + // ... but we also need a token to do that. + if (can_run_more && command_runner_->AcquireToken()) { Edge* edge = plan_.FindWork(); if (!StartEdge(edge, err)) { Cleanup(); @@ -685,7 +708,7 @@ bool Builder::Build(string* err) { // See if we can reap any finished commands. if (pending_commands) { CommandRunner::Result result; - if (!command_runner_->WaitForCommand(&result) || + if (!command_runner_->WaitForCommand(&result, can_run_more) || result.status == ExitInterrupted) { Cleanup(); status_->BuildFinished(); @@ -693,6 +716,10 @@ bool Builder::Build(string* err) { return false; } + // We might be able to start another command; start the main loop over. + if (result.status == ExitTokenAvailable) + continue; + --pending_commands; if (!FinishCommand(&result, err)) { Cleanup(); diff --git a/src/build.h b/src/build.h index cca7e8d8181d..ca605e62e0e3 100644 --- a/src/build.h +++ b/src/build.h @@ -108,6 +108,7 @@ private: struct CommandRunner { virtual ~CommandRunner() {} virtual bool CanRunMore() = 0; + virtual bool AcquireToken() = 0; virtual bool StartCommand(Edge* edge) = 0; /// The result of waiting for a command. @@ -119,7 +120,7 @@ struct CommandRunner { bool success() const { return status == ExitSuccess; } }; /// Wait for a command to complete, or return false if interrupted. - virtual bool WaitForCommand(Result* result) = 0; + virtual bool WaitForCommand(Result* result, bool more_ready) = 0; virtual vector GetActiveEdges() { return vector(); } virtual void Abort() {} diff --git a/src/build_test.cc b/src/build_test.cc index 46ab33ef86c8..a1022edcf546 100644 --- a/src/build_test.cc +++ b/src/build_test.cc @@ -445,8 +445,9 @@ struct FakeCommandRunner : public CommandRunner { // CommandRunner impl virtual bool CanRunMore(); + virtual bool AcquireToken(); virtual bool StartCommand(Edge* edge); - virtual bool WaitForCommand(Result* result); + virtual bool WaitForCommand(Result* result, bool more_ready); virtual vector GetActiveEdges(); virtual void Abort(); @@ -547,6 +548,10 @@ bool FakeCommandRunner::CanRunMore() { return last_command_ == NULL; } +bool FakeCommandRunner::AcquireToken() { + return true; +} + bool FakeCommandRunner::StartCommand(Edge* edge) { assert(!last_command_); commands_ran_.push_back(edge->EvaluateCommand()); @@ -575,7 +580,7 @@ bool FakeCommandRunner::StartCommand(Edge* edge) { return true; } -bool FakeCommandRunner::WaitForCommand(Result* result) { +bool FakeCommandRunner::WaitForCommand(Result* result, bool more_ready) { if (!last_command_) return false; diff --git a/src/exit_status.h b/src/exit_status.h index a714ece791f7..75ebf6a7a0ce 100644 --- a/src/exit_status.h +++ b/src/exit_status.h @@ -18,7 +18,8 @@ enum ExitStatus { ExitSuccess, ExitFailure, - ExitInterrupted + ExitTokenAvailable, + ExitInterrupted, }; #endif // NINJA_EXIT_STATUS_H_ diff --git a/src/subprocess-posix.cc b/src/subprocess-posix.cc index 1de22c38f7fa..980fadf78e0d 100644 --- a/src/subprocess-posix.cc +++ b/src/subprocess-posix.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "subprocess.h" +#include "tokenpool.h" #include #include @@ -219,7 +220,7 @@ Subprocess *SubprocessSet::Add(const string& command, bool use_console) { } #ifdef USE_PPOLL -bool SubprocessSet::DoWork() { +bool SubprocessSet::DoWork(struct TokenPool* tokens) { vector fds; nfds_t nfds = 0; @@ -233,6 +234,12 @@ bool SubprocessSet::DoWork() { ++nfds; } + if (tokens) { + pollfd pfd = { tokens->GetMonitorFd(), POLLIN | POLLPRI, 0 }; + fds.push_back(pfd); + ++nfds; + } + interrupted_ = 0; int ret = ppoll(&fds.front(), nfds, NULL, &old_mask_); if (ret == -1) { @@ -265,11 +272,20 @@ bool SubprocessSet::DoWork() { ++i; } + if (tokens) { + pollfd *pfd = &fds[nfds - 1]; + if (pfd->fd >= 0) { + assert(pfd->fd == tokens->GetMonitorFd()); + if (pfd->revents != 0) + token_available_ = true; + } + } + return IsInterrupted(); } #else // !defined(USE_PPOLL) -bool SubprocessSet::DoWork() { +bool SubprocessSet::DoWork(struct TokenPool* tokens) { fd_set set; int nfds = 0; FD_ZERO(&set); @@ -284,6 +300,13 @@ bool SubprocessSet::DoWork() { } } + if (tokens) { + int fd = tokens->GetMonitorFd(); + FD_SET(fd, &set); + if (nfds < fd+1) + nfds = fd+1; + } + interrupted_ = 0; int ret = pselect(nfds, &set, 0, 0, 0, &old_mask_); if (ret == -1) { @@ -312,6 +335,12 @@ bool SubprocessSet::DoWork() { ++i; } + if (tokens) { + int fd = tokens->GetMonitorFd(); + if ((fd >= 0) && FD_ISSET(fd, &set)) + token_available_ = true; + } + return IsInterrupted(); } #endif // !defined(USE_PPOLL) diff --git a/src/subprocess-win32.cc b/src/subprocess-win32.cc index 4bab71939d6d..9b415b0b7bc3 100644 --- a/src/subprocess-win32.cc +++ b/src/subprocess-win32.cc @@ -236,7 +236,7 @@ Subprocess *SubprocessSet::Add(const string& command, bool use_console) { return subprocess; } -bool SubprocessSet::DoWork() { +bool SubprocessSet::DoWork(struct TokenPool* tokens) { DWORD bytes_read; Subprocess* subproc; OVERLAPPED* overlapped; diff --git a/src/subprocess.h b/src/subprocess.h index b2d486ca400c..bf1a46090bc1 100644 --- a/src/subprocess.h +++ b/src/subprocess.h @@ -77,6 +77,8 @@ struct Subprocess { friend struct SubprocessSet; }; +struct TokenPool; + /// SubprocessSet runs a ppoll/pselect() loop around a set of Subprocesses. /// DoWork() waits for any state change in subprocesses; finished_ /// is a queue of subprocesses as they finish. @@ -85,13 +87,17 @@ struct SubprocessSet { ~SubprocessSet(); Subprocess* Add(const string& command, bool use_console = false); - bool DoWork(); + bool DoWork(struct TokenPool* tokens); Subprocess* NextFinished(); void Clear(); vector running_; queue finished_; + bool token_available_; + bool IsTokenAvailable() { return token_available_; } + void ResetTokenAvailable() { token_available_ = false; } + #ifdef _WIN32 static BOOL WINAPI NotifyInterrupted(DWORD dwCtrlType); static HANDLE ioport_; diff --git a/src/subprocess_test.cc b/src/subprocess_test.cc index 0a8c2061b7f2..e759ea4574bc 100644 --- a/src/subprocess_test.cc +++ b/src/subprocess_test.cc @@ -43,10 +43,12 @@ TEST_F(SubprocessTest, BadCommandStderr) { Subprocess* subproc = subprocs_.Add("cmd /c ninja_no_such_command"); ASSERT_NE((Subprocess *) 0, subproc); + subprocs_.ResetTokenAvailable(); while (!subproc->Done()) { // Pretend we discovered that stderr was ready for writing. - subprocs_.DoWork(); + subprocs_.DoWork(NULL); } + ASSERT_EQ(false, subprocs_.IsTokenAvailable()); EXPECT_EQ(ExitFailure, subproc->Finish()); EXPECT_NE("", subproc->GetOutput()); @@ -57,10 +59,12 @@ TEST_F(SubprocessTest, NoSuchCommand) { Subprocess* subproc = subprocs_.Add("ninja_no_such_command"); ASSERT_NE((Subprocess *) 0, subproc); + subprocs_.ResetTokenAvailable(); while (!subproc->Done()) { // Pretend we discovered that stderr was ready for writing. - subprocs_.DoWork(); + subprocs_.DoWork(NULL); } + ASSERT_EQ(false, subprocs_.IsTokenAvailable()); EXPECT_EQ(ExitFailure, subproc->Finish()); EXPECT_NE("", subproc->GetOutput()); @@ -76,9 +80,11 @@ TEST_F(SubprocessTest, InterruptChild) { Subprocess* subproc = subprocs_.Add("kill -INT $$"); ASSERT_NE((Subprocess *) 0, subproc); + subprocs_.ResetTokenAvailable(); while (!subproc->Done()) { - subprocs_.DoWork(); + subprocs_.DoWork(NULL); } + ASSERT_EQ(false, subprocs_.IsTokenAvailable()); EXPECT_EQ(ExitInterrupted, subproc->Finish()); } @@ -88,7 +94,7 @@ TEST_F(SubprocessTest, InterruptParent) { ASSERT_NE((Subprocess *) 0, subproc); while (!subproc->Done()) { - bool interrupted = subprocs_.DoWork(); + bool interrupted = subprocs_.DoWork(NULL); if (interrupted) return; } @@ -100,9 +106,11 @@ TEST_F(SubprocessTest, InterruptChildWithSigTerm) { Subprocess* subproc = subprocs_.Add("kill -TERM $$"); ASSERT_NE((Subprocess *) 0, subproc); + subprocs_.ResetTokenAvailable(); while (!subproc->Done()) { - subprocs_.DoWork(); + subprocs_.DoWork(NULL); } + ASSERT_EQ(false, subprocs_.IsTokenAvailable()); EXPECT_EQ(ExitInterrupted, subproc->Finish()); } @@ -112,7 +120,7 @@ TEST_F(SubprocessTest, InterruptParentWithSigTerm) { ASSERT_NE((Subprocess *) 0, subproc); while (!subproc->Done()) { - bool interrupted = subprocs_.DoWork(); + bool interrupted = subprocs_.DoWork(NULL); if (interrupted) return; } @@ -124,9 +132,11 @@ TEST_F(SubprocessTest, InterruptChildWithSigHup) { Subprocess* subproc = subprocs_.Add("kill -HUP $$"); ASSERT_NE((Subprocess *) 0, subproc); + subprocs_.ResetTokenAvailable(); while (!subproc->Done()) { - subprocs_.DoWork(); + subprocs_.DoWork(NULL); } + ASSERT_EQ(false, subprocs_.IsTokenAvailable()); EXPECT_EQ(ExitInterrupted, subproc->Finish()); } @@ -136,7 +146,7 @@ TEST_F(SubprocessTest, InterruptParentWithSigHup) { ASSERT_NE((Subprocess *) 0, subproc); while (!subproc->Done()) { - bool interrupted = subprocs_.DoWork(); + bool interrupted = subprocs_.DoWork(NULL); if (interrupted) return; } @@ -151,9 +161,11 @@ TEST_F(SubprocessTest, Console) { subprocs_.Add("test -t 0 -a -t 1 -a -t 2", /*use_console=*/true); ASSERT_NE((Subprocess*)0, subproc); + subprocs_.ResetTokenAvailable(); while (!subproc->Done()) { - subprocs_.DoWork(); + subprocs_.DoWork(NULL); } + ASSERT_EQ(false, subprocs_.IsTokenAvailable()); EXPECT_EQ(ExitSuccess, subproc->Finish()); } @@ -165,9 +177,11 @@ TEST_F(SubprocessTest, SetWithSingle) { Subprocess* subproc = subprocs_.Add(kSimpleCommand); ASSERT_NE((Subprocess *) 0, subproc); + subprocs_.ResetTokenAvailable(); while (!subproc->Done()) { - subprocs_.DoWork(); + subprocs_.DoWork(NULL); } + ASSERT_EQ(false, subprocs_.IsTokenAvailable()); ASSERT_EQ(ExitSuccess, subproc->Finish()); ASSERT_NE("", subproc->GetOutput()); @@ -198,12 +212,13 @@ TEST_F(SubprocessTest, SetWithMulti) { ASSERT_EQ("", processes[i]->GetOutput()); } + subprocs_.ResetTokenAvailable(); while (!processes[0]->Done() || !processes[1]->Done() || !processes[2]->Done()) { ASSERT_GT(subprocs_.running_.size(), 0u); - subprocs_.DoWork(); + subprocs_.DoWork(NULL); } - + ASSERT_EQ(false, subprocs_.IsTokenAvailable()); ASSERT_EQ(0u, subprocs_.running_.size()); ASSERT_EQ(3u, subprocs_.finished_.size()); @@ -235,8 +250,10 @@ TEST_F(SubprocessTest, SetWithLots) { ASSERT_NE((Subprocess *) 0, subproc); procs.push_back(subproc); } + subprocs_.ResetTokenAvailable(); while (!subprocs_.running_.empty()) - subprocs_.DoWork(); + subprocs_.DoWork(NULL); + ASSERT_EQ(false, subprocs_.IsTokenAvailable()); for (size_t i = 0; i < procs.size(); ++i) { ASSERT_EQ(ExitSuccess, procs[i]->Finish()); ASSERT_NE("", procs[i]->GetOutput()); @@ -252,9 +269,11 @@ TEST_F(SubprocessTest, SetWithLots) { // that stdin is closed. TEST_F(SubprocessTest, ReadStdin) { Subprocess* subproc = subprocs_.Add("cat -"); + subprocs_.ResetTokenAvailable(); while (!subproc->Done()) { - subprocs_.DoWork(); + subprocs_.DoWork(NULL); } + ASSERT_EQ(false, subprocs_.IsTokenAvailable()); ASSERT_EQ(ExitSuccess, subproc->Finish()); ASSERT_EQ(1u, subprocs_.finished_.size()); } diff --git a/src/tokenpool-gnu-make.cc b/src/tokenpool-gnu-make.cc index a8f9b7139d23..396bb7d87443 100644 --- a/src/tokenpool-gnu-make.cc +++ b/src/tokenpool-gnu-make.cc @@ -33,6 +33,7 @@ struct GNUmakeTokenPool : public TokenPool { virtual void Reserve(); virtual void Release(); virtual void Clear(); + virtual int GetMonitorFd(); bool Setup(); @@ -201,6 +202,10 @@ void GNUmakeTokenPool::Clear() { Return(); } +int GNUmakeTokenPool::GetMonitorFd() { + return(rfd_); +} + struct TokenPool *TokenPool::Get(void) { GNUmakeTokenPool *tokenpool = new GNUmakeTokenPool; if (tokenpool->Setup()) diff --git a/src/tokenpool.h b/src/tokenpool.h index f560b1083b65..301e1998ee8e 100644 --- a/src/tokenpool.h +++ b/src/tokenpool.h @@ -21,6 +21,12 @@ struct TokenPool { virtual void Release() = 0; virtual void Clear() = 0; +#ifdef _WIN32 + // @TODO +#else + virtual int GetMonitorFd() = 0; +#endif + // returns NULL if token pool is not available static struct TokenPool *Get(void); };