More work on the scheduler for windows

- Get a rump derivation goal: hook instance will come later, local
  derivation goal will come after that.

- Start cleaning up the channel / waiting code with an abstraction.
This commit is contained in:
John Ericson 2024-05-27 17:54:02 -04:00
parent 1e2b26734b
commit bcdee80a0d
13 changed files with 331 additions and 204 deletions

View file

@ -223,8 +223,8 @@
''^src/libstore/store-api\.cc$'' ''^src/libstore/store-api\.cc$''
''^src/libstore/store-api\.hh$'' ''^src/libstore/store-api\.hh$''
''^src/libstore/store-dir-config\.hh$'' ''^src/libstore/store-dir-config\.hh$''
''^src/libstore/unix/build/derivation-goal\.cc$'' ''^src/libstore/build/derivation-goal\.cc$''
''^src/libstore/unix/build/derivation-goal\.hh$'' ''^src/libstore/build/derivation-goal\.hh$''
''^src/libstore/build/drv-output-substitution-goal\.cc$'' ''^src/libstore/build/drv-output-substitution-goal\.cc$''
''^src/libstore/build/drv-output-substitution-goal\.hh$'' ''^src/libstore/build/drv-output-substitution-goal\.hh$''
''^src/libstore/build/entry-points\.cc$'' ''^src/libstore/build/entry-points\.cc$''

View file

@ -1,5 +1,8 @@
#include "derivation-goal.hh" #include "derivation-goal.hh"
#include "hook-instance.hh" #ifndef _WIN32 // TODO enable build hook on Windows
# include "hook-instance.hh"
#endif
#include "processes.hh"
#include "worker.hh" #include "worker.hh"
#include "builtins.hh" #include "builtins.hh"
#include "builtins/buildenv.hh" #include "builtins/buildenv.hh"
@ -19,19 +22,8 @@
#include <fstream> #include <fstream>
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/wait.h>
#include <netdb.h>
#include <fcntl.h> #include <fcntl.h>
#include <termios.h>
#include <unistd.h> #include <unistd.h>
#include <sys/mman.h>
#include <sys/utsname.h>
#include <sys/resource.h>
#include <pwd.h>
#include <grp.h>
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
@ -101,7 +93,9 @@ std::string DerivationGoal::key()
void DerivationGoal::killChild() void DerivationGoal::killChild()
{ {
#ifndef _WIN32 // TODO enable build hook on Windows
hook.reset(); hook.reset();
#endif
} }
@ -641,9 +635,17 @@ void DerivationGoal::started()
buildMode == bmCheck ? "checking outputs of '%s'" : buildMode == bmCheck ? "checking outputs of '%s'" :
"building '%s'", worker.store.printStorePath(drvPath)); "building '%s'", worker.store.printStorePath(drvPath));
fmt("building '%s'", worker.store.printStorePath(drvPath)); fmt("building '%s'", worker.store.printStorePath(drvPath));
#ifndef _WIN32 // TODO enable build hook on Windows
if (hook) msg += fmt(" on '%s'", machineName); if (hook) msg += fmt(" on '%s'", machineName);
#endif
act = std::make_unique<Activity>(*logger, lvlInfo, actBuild, msg, act = std::make_unique<Activity>(*logger, lvlInfo, actBuild, msg,
Logger::Fields{worker.store.printStorePath(drvPath), hook ? machineName : "", 1, 1}); Logger::Fields{worker.store.printStorePath(drvPath),
#ifndef _WIN32 // TODO enable build hook on Windows
hook ? machineName :
#endif
"",
1,
1});
mcRunningBuilds = std::make_unique<MaintainCount<uint64_t>>(worker.runningBuilds); mcRunningBuilds = std::make_unique<MaintainCount<uint64_t>>(worker.runningBuilds);
worker.updateProgress(); worker.updateProgress();
} }
@ -778,7 +780,13 @@ static void movePath(const Path & src, const Path & dst)
{ {
auto st = lstat(src); auto st = lstat(src);
bool changePerm = (geteuid() && S_ISDIR(st.st_mode) && !(st.st_mode & S_IWUSR)); bool changePerm = (
#ifndef _WIN32
geteuid()
#else
!isRootUser()
#endif
&& S_ISDIR(st.st_mode) && !(st.st_mode & S_IWUSR));
if (changePerm) if (changePerm)
chmod_(src, st.st_mode | S_IWUSR); chmod_(src, st.st_mode | S_IWUSR);
@ -796,7 +804,7 @@ void replaceValidPath(const Path & storePath, const Path & tmpPath)
tmpPath (the replacement), so we have to move it out of the tmpPath (the replacement), so we have to move it out of the
way first. We'd better not be interrupted here, because if way first. We'd better not be interrupted here, because if
we're repairing (say) Glibc, we end up with a broken system. */ we're repairing (say) Glibc, we end up with a broken system. */
Path oldPath = fmt("%1%.old-%2%-%3%", storePath, getpid(), random()); Path oldPath = fmt("%1%.old-%2%-%3%", storePath, getpid(), rand());
if (pathExists(storePath)) if (pathExists(storePath))
movePath(storePath, oldPath); movePath(storePath, oldPath);
@ -818,14 +826,20 @@ void replaceValidPath(const Path & storePath, const Path & tmpPath)
int DerivationGoal::getChildStatus() int DerivationGoal::getChildStatus()
{ {
#ifndef _WIN32 // TODO enable build hook on Windows
return hook->pid.kill(); return hook->pid.kill();
#else
return 0;
#endif
} }
void DerivationGoal::closeReadPipes() void DerivationGoal::closeReadPipes()
{ {
hook->builderOut.readSide = -1; #ifndef _WIN32 // TODO enable build hook on Windows
hook->fromHook.readSide = -1; hook->builderOut.readSide.close();
hook->fromHook.readSide.close();
#endif
} }
@ -1019,13 +1033,16 @@ void DerivationGoal::buildDone()
BuildResult::Status st = BuildResult::MiscFailure; BuildResult::Status st = BuildResult::MiscFailure;
#ifndef _WIN32
if (hook && WIFEXITED(status) && WEXITSTATUS(status) == 101) if (hook && WIFEXITED(status) && WEXITSTATUS(status) == 101)
st = BuildResult::TimedOut; st = BuildResult::TimedOut;
else if (hook && (!WIFEXITED(status) || WEXITSTATUS(status) != 100)) { else if (hook && (!WIFEXITED(status) || WEXITSTATUS(status) != 100)) {
} }
else { else
#endif
{
assert(derivationType); assert(derivationType);
st = st =
dynamic_cast<NotDeterministic*>(&e) ? BuildResult::NotDeterministic : dynamic_cast<NotDeterministic*>(&e) ? BuildResult::NotDeterministic :
@ -1112,6 +1129,9 @@ void DerivationGoal::resolvedFinished()
HookReply DerivationGoal::tryBuildHook() HookReply DerivationGoal::tryBuildHook()
{ {
#ifdef _WIN32 // TODO enable build hook on Windows
return rpDecline;
#else
if (settings.buildHook.get().empty() || !worker.tryBuildHook || !useDerivation) return rpDecline; if (settings.buildHook.get().empty() || !worker.tryBuildHook || !useDerivation) return rpDecline;
if (!worker.hook) if (!worker.hook)
@ -1205,17 +1225,18 @@ HookReply DerivationGoal::tryBuildHook()
} }
hook->sink = FdSink(); hook->sink = FdSink();
hook->toHook.writeSide = -1; hook->toHook.writeSide.close();
/* Create the log file and pipe. */ /* Create the log file and pipe. */
Path logFile = openLogFile(); Path logFile = openLogFile();
std::set<int> fds; std::set<MuxablePipePollState::CommChannel> fds;
fds.insert(hook->fromHook.readSide.get()); fds.insert(hook->fromHook.readSide.get());
fds.insert(hook->builderOut.readSide.get()); fds.insert(hook->builderOut.readSide.get());
worker.childStarted(shared_from_this(), fds, false, false); worker.childStarted(shared_from_this(), fds, false, false);
return rpAccept; return rpAccept;
#endif
} }
@ -1251,7 +1272,11 @@ Path DerivationGoal::openLogFile()
Path logFileName = fmt("%s/%s%s", dir, baseName.substr(2), Path logFileName = fmt("%s/%s%s", dir, baseName.substr(2),
settings.compressLog ? ".bz2" : ""); settings.compressLog ? ".bz2" : "");
fdLogFile = open(logFileName.c_str(), O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC, 0666); fdLogFile = toDescriptor(open(logFileName.c_str(), O_CREAT | O_WRONLY | O_TRUNC
#ifndef _WIN32
| O_CLOEXEC
#endif
, 0666));
if (!fdLogFile) throw SysError("creating log file '%1%'", logFileName); if (!fdLogFile) throw SysError("creating log file '%1%'", logFileName);
logFileSink = std::make_shared<FdSink>(fdLogFile.get()); logFileSink = std::make_shared<FdSink>(fdLogFile.get());
@ -1271,13 +1296,17 @@ void DerivationGoal::closeLogFile()
if (logSink2) logSink2->finish(); if (logSink2) logSink2->finish();
if (logFileSink) logFileSink->flush(); if (logFileSink) logFileSink->flush();
logSink = logFileSink = 0; logSink = logFileSink = 0;
fdLogFile = -1; fdLogFile.close();
} }
bool DerivationGoal::isReadDesc(int fd) bool DerivationGoal::isReadDesc(Descriptor fd)
{ {
#ifdef _WIN32 // TODO enable build hook on Windows
return false;
#else
return fd == hook->builderOut.readSide.get(); return fd == hook->builderOut.readSide.get();
#endif
} }
void DerivationGoal::handleChildOutput(Descriptor fd, std::string_view data) void DerivationGoal::handleChildOutput(Descriptor fd, std::string_view data)
@ -1310,6 +1339,7 @@ void DerivationGoal::handleChildOutput(Descriptor fd, std::string_view data)
if (logSink) (*logSink)(data); if (logSink) (*logSink)(data);
} }
#ifndef _WIN32 // TODO enable build hook on Windows
if (hook && fd == hook->fromHook.readSide.get()) { if (hook && fd == hook->fromHook.readSide.get()) {
for (auto c : data) for (auto c : data)
if (c == '\n') { if (c == '\n') {
@ -1344,6 +1374,7 @@ void DerivationGoal::handleChildOutput(Descriptor fd, std::string_view data)
} else } else
currentHookLine += c; currentHookLine += c;
} }
#endif
} }

View file

@ -2,7 +2,9 @@
///@file ///@file
#include "parsed-derivations.hh" #include "parsed-derivations.hh"
#include "user-lock.hh" #ifndef _WIN32
# include "user-lock.hh"
#endif
#include "outputs-spec.hh" #include "outputs-spec.hh"
#include "store-api.hh" #include "store-api.hh"
#include "pathlocks.hh" #include "pathlocks.hh"
@ -12,7 +14,9 @@ namespace nix {
using std::map; using std::map;
#ifndef _WIN32 // TODO enable build hook on Windows
struct HookInstance; struct HookInstance;
#endif
typedef enum {rpAccept, rpDecline, rpPostpone} HookReply; typedef enum {rpAccept, rpDecline, rpPostpone} HookReply;
@ -178,10 +182,12 @@ struct DerivationGoal : public Goal
std::string currentHookLine; std::string currentHookLine;
#ifndef _WIN32 // TODO enable build hook on Windows
/** /**
* The build hook. * The build hook.
*/ */
std::unique_ptr<HookInstance> hook; std::unique_ptr<HookInstance> hook;
#endif
/** /**
* The sort of derivation we are building. * The sort of derivation we are building.
@ -287,7 +293,7 @@ struct DerivationGoal : public Goal
virtual void cleanupPostOutputsRegisteredModeCheck(); virtual void cleanupPostOutputsRegisteredModeCheck();
virtual void cleanupPostOutputsRegisteredModeNonCheck(); virtual void cleanupPostOutputsRegisteredModeNonCheck();
virtual bool isReadDesc(int fd); virtual bool isReadDesc(Descriptor fd);
/** /**
* Callback used by the worker to write to the log. * Callback used by the worker to write to the log.

View file

@ -7,10 +7,7 @@
#include "store-api.hh" #include "store-api.hh"
#include "goal.hh" #include "goal.hh"
#include "realisation.hh" #include "realisation.hh"
#include "muxable-pipe.hh"
#ifdef _WIN32
# include "windows-async-pipe.hh"
#endif
namespace nix { namespace nix {
@ -48,11 +45,7 @@ class DrvOutputSubstitutionGoal : public Goal {
struct DownloadState struct DownloadState
{ {
#ifndef _WIN32 MuxablePipe outPipe;
Pipe outPipe;
#else
windows::AsyncPipe outPipe;
#endif
std::promise<std::shared_ptr<const Realisation>> promise; std::promise<std::shared_ptr<const Realisation>> promise;
}; };

View file

@ -3,10 +3,7 @@
#include "store-api.hh" #include "store-api.hh"
#include "goal.hh" #include "goal.hh"
#include "muxable-pipe.hh"
#ifdef _WIN32
# include "windows-async-pipe.hh"
#endif
namespace nix { namespace nix {
@ -48,11 +45,7 @@ struct PathSubstitutionGoal : public Goal
/** /**
* Pipe for the substituter's standard output. * Pipe for the substituter's standard output.
*/ */
#ifndef _WIN32 MuxablePipe outPipe;
Pipe outPipe;
#else
windows::AsyncPipe outPipe;
#endif
/** /**
* The substituter thread. * The substituter thread.

View file

@ -3,19 +3,13 @@
#include "worker.hh" #include "worker.hh"
#include "substitution-goal.hh" #include "substitution-goal.hh"
#include "drv-output-substitution-goal.hh" #include "drv-output-substitution-goal.hh"
#include "derivation-goal.hh"
#ifndef _WIN32 // TODO Enable building on Windows #ifndef _WIN32 // TODO Enable building on Windows
# include "local-derivation-goal.hh" # include "local-derivation-goal.hh"
# include "hook-instance.hh" # include "hook-instance.hh"
#endif #endif
#include "signals.hh" #include "signals.hh"
#ifndef _WIN32
# include <poll.h>
#else
# include <ioapiset.h>
# include "windows-error.hh"
#endif
namespace nix { namespace nix {
Worker::Worker(Store & store, Store & evalStore) Worker::Worker(Store & store, Store & evalStore)
@ -49,7 +43,6 @@ Worker::~Worker()
assert(expectedNarSize == 0); assert(expectedNarSize == 0);
} }
#ifndef _WIN32 // TODO Enable building on Windows
std::shared_ptr<DerivationGoal> Worker::makeDerivationGoalCommon( std::shared_ptr<DerivationGoal> Worker::makeDerivationGoalCommon(
const StorePath & drvPath, const StorePath & drvPath,
@ -73,9 +66,13 @@ std::shared_ptr<DerivationGoal> Worker::makeDerivationGoal(const StorePath & drv
const OutputsSpec & wantedOutputs, BuildMode buildMode) const OutputsSpec & wantedOutputs, BuildMode buildMode)
{ {
return makeDerivationGoalCommon(drvPath, wantedOutputs, [&]() -> std::shared_ptr<DerivationGoal> { return makeDerivationGoalCommon(drvPath, wantedOutputs, [&]() -> std::shared_ptr<DerivationGoal> {
return !dynamic_cast<LocalStore *>(&store) return
? std::make_shared</* */DerivationGoal>(drvPath, wantedOutputs, *this, buildMode) #ifndef _WIN32 // TODO Enable building on Windows
: std::make_shared<LocalDerivationGoal>(drvPath, wantedOutputs, *this, buildMode); dynamic_cast<LocalStore *>(&store)
? std::make_shared<LocalDerivationGoal>(drvPath, wantedOutputs, *this, buildMode)
:
#endif
std::make_shared</* */DerivationGoal>(drvPath, wantedOutputs, *this, buildMode);
}); });
} }
@ -83,14 +80,16 @@ std::shared_ptr<DerivationGoal> Worker::makeBasicDerivationGoal(const StorePath
const BasicDerivation & drv, const OutputsSpec & wantedOutputs, BuildMode buildMode) const BasicDerivation & drv, const OutputsSpec & wantedOutputs, BuildMode buildMode)
{ {
return makeDerivationGoalCommon(drvPath, wantedOutputs, [&]() -> std::shared_ptr<DerivationGoal> { return makeDerivationGoalCommon(drvPath, wantedOutputs, [&]() -> std::shared_ptr<DerivationGoal> {
return !dynamic_cast<LocalStore *>(&store) return
? std::make_shared</* */DerivationGoal>(drvPath, drv, wantedOutputs, *this, buildMode) #ifndef _WIN32 // TODO Enable building on Windows
: std::make_shared<LocalDerivationGoal>(drvPath, drv, wantedOutputs, *this, buildMode); dynamic_cast<LocalStore *>(&store)
? std::make_shared<LocalDerivationGoal>(drvPath, drv, wantedOutputs, *this, buildMode)
:
#endif
std::make_shared</* */DerivationGoal>(drvPath, drv, wantedOutputs, *this, buildMode);
}); });
} }
#endif
std::shared_ptr<PathSubstitutionGoal> Worker::makePathSubstitutionGoal(const StorePath & path, RepairFlag repair, std::optional<ContentAddress> ca) std::shared_ptr<PathSubstitutionGoal> Worker::makePathSubstitutionGoal(const StorePath & path, RepairFlag repair, std::optional<ContentAddress> ca)
{ {
@ -122,14 +121,10 @@ GoalPtr Worker::makeGoal(const DerivedPath & req, BuildMode buildMode)
{ {
return std::visit(overloaded { return std::visit(overloaded {
[&](const DerivedPath::Built & bfd) -> GoalPtr { [&](const DerivedPath::Built & bfd) -> GoalPtr {
#ifndef _WIN32 // TODO Enable building on Windows
if (auto bop = std::get_if<DerivedPath::Opaque>(&*bfd.drvPath)) if (auto bop = std::get_if<DerivedPath::Opaque>(&*bfd.drvPath))
return makeDerivationGoal(bop->path, bfd.outputs, buildMode); return makeDerivationGoal(bop->path, bfd.outputs, buildMode);
else else
throw UnimplementedError("Building dynamic derivations in one shot is not yet implemented."); throw UnimplementedError("Building dynamic derivations in one shot is not yet implemented.");
#else
throw UnimplementedError("Building derivations not yet implemented on Windows");
#endif
}, },
[&](const DerivedPath::Opaque & bo) -> GoalPtr { [&](const DerivedPath::Opaque & bo) -> GoalPtr {
return makePathSubstitutionGoal(bo.path, buildMode == bmRepair ? Repair : NoRepair); return makePathSubstitutionGoal(bo.path, buildMode == bmRepair ? Repair : NoRepair);
@ -155,11 +150,9 @@ static void removeGoal(std::shared_ptr<G> goal, std::map<K, std::weak_ptr<G>> &
void Worker::removeGoal(GoalPtr goal) void Worker::removeGoal(GoalPtr goal)
{ {
#ifndef _WIN32 // TODO Enable building on Windows
if (auto drvGoal = std::dynamic_pointer_cast<DerivationGoal>(goal)) if (auto drvGoal = std::dynamic_pointer_cast<DerivationGoal>(goal))
nix::removeGoal(drvGoal, derivationGoals); nix::removeGoal(drvGoal, derivationGoals);
else else
#endif
if (auto subGoal = std::dynamic_pointer_cast<PathSubstitutionGoal>(goal)) if (auto subGoal = std::dynamic_pointer_cast<PathSubstitutionGoal>(goal))
nix::removeGoal(subGoal, substitutionGoals); nix::removeGoal(subGoal, substitutionGoals);
else if (auto subGoal = std::dynamic_pointer_cast<DrvOutputSubstitutionGoal>(goal)) else if (auto subGoal = std::dynamic_pointer_cast<DrvOutputSubstitutionGoal>(goal))
@ -204,7 +197,7 @@ unsigned Worker::getNrSubstitutions()
} }
void Worker::childStarted(GoalPtr goal, const std::set<Child::CommChannel> & channels, void Worker::childStarted(GoalPtr goal, const std::set<MuxablePipePollState::CommChannel> & channels,
bool inBuildSlot, bool respectTimeouts) bool inBuildSlot, bool respectTimeouts)
{ {
Child child; Child child;
@ -298,14 +291,12 @@ void Worker::run(const Goals & _topGoals)
for (auto & i : _topGoals) { for (auto & i : _topGoals) {
topGoals.insert(i); topGoals.insert(i);
#ifndef _WIN32 // TODO Enable building on Windows
if (auto goal = dynamic_cast<DerivationGoal *>(i.get())) { if (auto goal = dynamic_cast<DerivationGoal *>(i.get())) {
topPaths.push_back(DerivedPath::Built { topPaths.push_back(DerivedPath::Built {
.drvPath = makeConstantStorePathRef(goal->drvPath), .drvPath = makeConstantStorePathRef(goal->drvPath),
.outputs = goal->wantedOutputs, .outputs = goal->wantedOutputs,
}); });
} else } else
#endif
if (auto goal = dynamic_cast<PathSubstitutionGoal *>(i.get())) { if (auto goal = dynamic_cast<PathSubstitutionGoal *>(i.get())) {
topPaths.push_back(DerivedPath::Opaque{goal->storePath}); topPaths.push_back(DerivedPath::Opaque{goal->storePath});
} }
@ -428,47 +419,26 @@ void Worker::waitForInput()
if (useTimeout) if (useTimeout)
vomit("sleeping %d seconds", timeout); vomit("sleeping %d seconds", timeout);
MuxablePipePollState state;
#ifndef _WIN32 #ifndef _WIN32
/* Use select() to wait for the input side of any logger pipe to /* Use select() to wait for the input side of any logger pipe to
become `available'. Note that `available' (i.e., non-blocking) become `available'. Note that `available' (i.e., non-blocking)
includes EOF. */ includes EOF. */
std::vector<struct pollfd> pollStatus;
std::map<int, size_t> fdToPollStatus;
for (auto & i : children) { for (auto & i : children) {
for (auto & j : i.channels) { for (auto & j : i.channels) {
pollStatus.push_back((struct pollfd) { .fd = j, .events = POLLIN }); state.pollStatus.push_back((struct pollfd) { .fd = j, .events = POLLIN });
fdToPollStatus[j] = pollStatus.size() - 1; state.fdToPollStatus[j] = state.pollStatus.size() - 1;
} }
} }
if (poll(pollStatus.data(), pollStatus.size(),
useTimeout ? timeout * 1000 : -1) == -1) {
if (errno == EINTR) return;
throw SysError("waiting for input");
}
#else
OVERLAPPED_ENTRY oentries[0x20] = {0};
ULONG removed;
bool gotEOF = false;
// we are on at least Windows Vista / Server 2008 and can get many (countof(oentries)) statuses in one API call
if (!GetQueuedCompletionStatusEx(
ioport.get(),
oentries,
sizeof(oentries) / sizeof(*oentries),
&removed,
useTimeout ? timeout * 1000 : INFINITE,
false))
{
windows::WinError winError("GetQueuedCompletionStatusEx");
if (winError.lastError != WAIT_TIMEOUT)
throw winError;
assert(removed == 0);
} else {
assert(0 < removed && removed <= sizeof(oentries)/sizeof(*oentries));
}
#endif #endif
state.poll(
#ifdef _WIN32
ioport.get(),
#endif
useTimeout ? (std::optional { timeout * 1000 }) : std::nullopt);
auto after = steady_time_point::clock::now(); auto after = steady_time_point::clock::now();
/* Process all available file descriptors. FIXME: this is /* Process all available file descriptors. FIXME: this is
@ -482,75 +452,18 @@ void Worker::waitForInput()
GoalPtr goal = j->goal.lock(); GoalPtr goal = j->goal.lock();
assert(goal); assert(goal);
#ifndef _WIN32 state.iterate(
std::set<Descriptor> fds2(j->channels); j->channels,
std::vector<unsigned char> buffer(4096); [&](Descriptor k, std::string_view data) {
for (auto & k : fds2) {
const auto fdPollStatusId = get(fdToPollStatus, k);
assert(fdPollStatusId);
assert(*fdPollStatusId < pollStatus.size());
if (pollStatus.at(*fdPollStatusId).revents) {
ssize_t rd = ::read(fromDescriptorReadOnly(k), buffer.data(), buffer.size());
// FIXME: is there a cleaner way to handle pt close
// than EIO? Is this even standard?
if (rd == 0 || (rd == -1 && errno == EIO)) {
debug("%1%: got EOF", goal->getName());
goal->handleEOF(k);
j->channels.erase(k);
} else if (rd == -1) {
if (errno != EINTR)
throw SysError("%s: read failed", goal->getName());
} else {
printMsg(lvlVomit, "%1%: read %2% bytes", printMsg(lvlVomit, "%1%: read %2% bytes",
goal->getName(), rd); goal->getName(), data.size());
std::string_view data((char *) buffer.data(), rd);
j->lastOutput = after; j->lastOutput = after;
goal->handleChildOutput(k, data); goal->handleChildOutput(k, data);
} },
} [&](Descriptor k) {
} debug("%1%: got EOF", goal->getName());
#else goal->handleEOF(k);
decltype(j->channels)::iterator p = j->channels.begin(); });
while (p != j->channels.end()) {
decltype(p) nextp = p;
++nextp;
for (ULONG i = 0; i < removed; i++) {
if (oentries[i].lpCompletionKey == ((ULONG_PTR)((*p)->readSide.get()) ^ 0x5555)) {
printMsg(lvlVomit, "%s: read %s bytes", goal->getName(), oentries[i].dwNumberOfBytesTransferred);
if (oentries[i].dwNumberOfBytesTransferred > 0) {
std::string data {
(char *) (*p)->buffer.data(),
oentries[i].dwNumberOfBytesTransferred,
};
//std::cerr << "read [" << data << "]" << std::endl;
j->lastOutput = after;
goal->handleChildOutput((*p)->readSide.get(), data);
}
if (gotEOF) {
debug("%s: got EOF", goal->getName());
goal->handleEOF((*p)->readSide.get());
nextp = j->channels.erase(p); // no need to maintain `j->channels` ?
} else {
BOOL rc = ReadFile((*p)->readSide.get(), (*p)->buffer.data(), (*p)->buffer.size(), &(*p)->got, &(*p)->overlapped);
if (rc) {
// here is possible (but not obligatory) to call `goal->handleChildOutput` and repeat ReadFile immediately
} else {
windows::WinError winError("ReadFile(%s, ..)", (*p)->readSide.get());
if (winError.lastError == ERROR_BROKEN_PIPE) {
debug("%s: got EOF", goal->getName());
goal->handleEOF((*p)->readSide.get());
nextp = j->channels.erase(p); // no need to maintain `j->channels` ?
} else if (winError.lastError != ERROR_IO_PENDING)
throw winError;
}
}
break;
}
}
p = nextp;
}
#endif
if (goal->exitCode == Goal::ecBusy && if (goal->exitCode == Goal::ecBusy &&
0 != settings.maxSilentTime && 0 != settings.maxSilentTime &&

View file

@ -5,10 +5,7 @@
#include "store-api.hh" #include "store-api.hh"
#include "goal.hh" #include "goal.hh"
#include "realisation.hh" #include "realisation.hh"
#include "muxable-pipe.hh"
#ifdef _WIN32
# include "windows-async-pipe.hh"
#endif
#include <future> #include <future>
#include <thread> #include <thread>
@ -16,9 +13,7 @@
namespace nix { namespace nix {
/* Forward definition. */ /* Forward definition. */
#ifndef _WIN32 // TODO Enable building on Windows
struct DerivationGoal; struct DerivationGoal;
#endif
struct PathSubstitutionGoal; struct PathSubstitutionGoal;
class DrvOutputSubstitutionGoal; class DrvOutputSubstitutionGoal;
@ -46,17 +41,9 @@ typedef std::chrono::time_point<std::chrono::steady_clock> steady_time_point;
*/ */
struct Child struct Child
{ {
using CommChannel =
#ifndef _WIN32
Descriptor
#else
windows::AsyncPipe *
#endif
;
WeakGoalPtr goal; WeakGoalPtr goal;
Goal * goal2; // ugly hackery Goal * goal2; // ugly hackery
std::set<CommChannel> channels; std::set<MuxablePipePollState::CommChannel> channels;
bool respectTimeouts; bool respectTimeouts;
bool inBuildSlot; bool inBuildSlot;
/** /**
@ -116,9 +103,7 @@ private:
* Maps used to prevent multiple instantiations of a goal for the * Maps used to prevent multiple instantiations of a goal for the
* same derivation / path. * same derivation / path.
*/ */
#ifndef _WIN32 // TODO Enable building on Windows
std::map<StorePath, std::weak_ptr<DerivationGoal>> derivationGoals; std::map<StorePath, std::weak_ptr<DerivationGoal>> derivationGoals;
#endif
std::map<StorePath, std::weak_ptr<PathSubstitutionGoal>> substitutionGoals; std::map<StorePath, std::weak_ptr<PathSubstitutionGoal>> substitutionGoals;
std::map<DrvOutput, std::weak_ptr<DrvOutputSubstitutionGoal>> drvOutputSubstitutionGoals; std::map<DrvOutput, std::weak_ptr<DrvOutputSubstitutionGoal>> drvOutputSubstitutionGoals;
@ -207,7 +192,6 @@ public:
* Make a goal (with caching). * Make a goal (with caching).
*/ */
#ifndef _WIN32 // TODO Enable building on Windows
/** /**
* @ref DerivationGoal "derivation goal" * @ref DerivationGoal "derivation goal"
*/ */
@ -222,7 +206,6 @@ public:
std::shared_ptr<DerivationGoal> makeBasicDerivationGoal( std::shared_ptr<DerivationGoal> makeBasicDerivationGoal(
const StorePath & drvPath, const BasicDerivation & drv, const StorePath & drvPath, const BasicDerivation & drv,
const OutputsSpec & wantedOutputs, BuildMode buildMode = bmNormal); const OutputsSpec & wantedOutputs, BuildMode buildMode = bmNormal);
#endif
/** /**
* @ref SubstitutionGoal "substitution goal" * @ref SubstitutionGoal "substitution goal"
@ -263,7 +246,7 @@ public:
* Registers a running child process. `inBuildSlot` means that * Registers a running child process. `inBuildSlot` means that
* the process counts towards the jobs limit. * the process counts towards the jobs limit.
*/ */
void childStarted(GoalPtr goal, const std::set<Child::CommChannel> & channels, void childStarted(GoalPtr goal, const std::set<MuxablePipePollState::CommChannel> & channels,
bool inBuildSlot, bool respectTimeouts); bool inBuildSlot, bool respectTimeouts);
/** /**

View file

@ -0,0 +1,82 @@
#pragma once
///@file
#include "file-descriptor.hh"
#ifdef _WIN32
# include "windows-async-pipe.hh"
#endif
#ifndef _WIN32
# include <poll.h>
#else
# include <ioapiset.h>
# include "windows-error.hh"
#endif
namespace nix {
/**
* An "muxable pipe" is a type of pipe supporting endpoints that wait
* for events on multiple pipes at once.
*
* On Unix, this is just a regular anonymous pipe. On Windows, this has
* to be a named pipe because we need I/O Completion Ports to wait on
* multiple pipes.
*/
using MuxablePipe =
#ifndef _WIN32
Pipe
#else
windows::AsyncPipe
#endif
;
/**
* Use pool() (Unix) / I/O Completion Ports (Windows) to wait for the
* input side of any logger pipe to become `available'. Note that
* `available' (i.e., non-blocking) includes EOF.
*/
struct MuxablePipePollState
{
#ifndef _WIN32
std::vector<struct pollfd> pollStatus;
std::map<int, size_t> fdToPollStatus;
#else
OVERLAPPED_ENTRY oentries[0x20] = {0};
ULONG removed;
bool gotEOF = false;
#endif
/**
* Check for ready (Unix) / completed (Windows) operations
*/
void poll(
#ifdef _WIN32
HANDLE ioport,
#endif
std::optional<unsigned int> timeout);
using CommChannel =
#ifndef _WIN32
Descriptor
#else
windows::AsyncPipe *
#endif
;
/**
* Process for ready (Unix) / completed (Windows) operations,
* calling the callbacks as needed.
*
* @param handleRead callback to be passed read data.
*
* @param handleEOF callback for when the `MuxablePipe` has closed.
*/
void iterate(
std::set<CommChannel> & channels,
std::function<void(Descriptor fd, std::string_view data)> handleRead,
std::function<void(Descriptor fd)> handleEOF);
};
}

View file

@ -118,8 +118,6 @@ public:
{ } { }
}; };
#ifndef _WIN32
/** /**
* Convert the exit status of a child as returned by wait() into an * Convert the exit status of a child as returned by wait() into an
* error string. * error string.
@ -128,6 +126,4 @@ std::string statusToString(int status);
bool statusOk(int status); bool statusOk(int status);
#endif
} }

View file

@ -0,0 +1,47 @@
#include <poll.h>
#include "logging.hh"
#include "util.hh"
#include "muxable-pipe.hh"
namespace nix {
void MuxablePipePollState::poll(std::optional<unsigned int> timeout)
{
if (::poll(pollStatus.data(), pollStatus.size(), timeout ? *timeout : -1) == -1) {
if (errno == EINTR)
return;
throw SysError("waiting for input");
}
}
void MuxablePipePollState::iterate(
std::set<MuxablePipePollState::CommChannel> & channels,
std::function<void(Descriptor fd, std::string_view data)> handleRead,
std::function<void(Descriptor fd)> handleEOF)
{
std::set<Descriptor> fds2(channels);
std::vector<unsigned char> buffer(4096);
for (auto & k : fds2) {
const auto fdPollStatusId = get(fdToPollStatus, k);
assert(fdPollStatusId);
assert(*fdPollStatusId < pollStatus.size());
if (pollStatus.at(*fdPollStatusId).revents) {
ssize_t rd = ::read(fromDescriptorReadOnly(k), buffer.data(), buffer.size());
// FIXME: is there a cleaner way to handle pt close
// than EIO? Is this even standard?
if (rd == 0 || (rd == -1 && errno == EIO)) {
handleEOF(k);
channels.erase(k);
} else if (rd == -1) {
if (errno != EINTR)
throw SysError("read failed");
} else {
std::string_view data((char *) buffer.data(), rd);
handleRead(k, data);
}
}
}
}
}

View file

@ -0,0 +1,70 @@
#include <ioapiset.h>
#include "windows-error.hh"
#include "logging.hh"
#include "util.hh"
#include "muxable-pipe.hh"
namespace nix {
void MuxablePipePollState::poll(HANDLE ioport, std::optional<unsigned int> timeout)
{
/* We are on at least Windows Vista / Server 2008 and can get many
(countof(oentries)) statuses in one API call. */
if (!GetQueuedCompletionStatusEx(
ioport, oentries, sizeof(oentries) / sizeof(*oentries), &removed, timeout ? *timeout : INFINITE, false)) {
windows::WinError winError("GetQueuedCompletionStatusEx");
if (winError.lastError != WAIT_TIMEOUT)
throw winError;
assert(removed == 0);
} else {
assert(0 < removed && removed <= sizeof(oentries) / sizeof(*oentries));
}
}
void MuxablePipePollState::iterate(
std::set<MuxablePipePollState::CommChannel> & channels,
std::function<void(Descriptor fd, std::string_view data)> handleRead,
std::function<void(Descriptor fd)> handleEOF)
{
auto p = channels.begin();
while (p != channels.end()) {
decltype(p) nextp = p;
++nextp;
for (ULONG i = 0; i < removed; i++) {
if (oentries[i].lpCompletionKey == ((ULONG_PTR) ((*p)->readSide.get()) ^ 0x5555)) {
printMsg(lvlVomit, "read %s bytes", oentries[i].dwNumberOfBytesTransferred);
if (oentries[i].dwNumberOfBytesTransferred > 0) {
std::string data{
(char *) (*p)->buffer.data(),
oentries[i].dwNumberOfBytesTransferred,
};
handleRead((*p)->readSide.get(), data);
}
if (gotEOF) {
handleEOF((*p)->readSide.get());
nextp = channels.erase(p); // no need to maintain `channels`?
} else {
BOOL rc = ReadFile(
(*p)->readSide.get(), (*p)->buffer.data(), (*p)->buffer.size(), &(*p)->got, &(*p)->overlapped);
if (rc) {
// here is possible (but not obligatory) to call
// `handleRead` and repeat ReadFile immediately
} else {
windows::WinError winError("ReadFile(%s, ..)", (*p)->readSide.get());
if (winError.lastError == ERROR_BROKEN_PIPE) {
handleEOF((*p)->readSide.get());
nextp = channels.erase(p); // no need to maintain `channels` ?
} else if (winError.lastError != ERROR_IO_PENDING)
throw winError;
}
}
break;
}
}
p = nextp;
}
}
}

View file

@ -16,16 +16,6 @@
#include <sys/types.h> #include <sys/types.h>
#include <unistd.h> #include <unistd.h>
#ifdef __APPLE__
# include <sys/syscall.h>
#endif
#ifdef __linux__
# include <sys/prctl.h>
# include <sys/mman.h>
#endif
namespace nix { namespace nix {
std::string runProgram(Path program, bool lookupPath, const Strings & args, std::string runProgram(Path program, bool lookupPath, const Strings & args,
@ -34,15 +24,31 @@ std::string runProgram(Path program, bool lookupPath, const Strings & args,
throw UnimplementedError("Cannot shell out to git on Windows yet"); throw UnimplementedError("Cannot shell out to git on Windows yet");
} }
// Output = error code + "standard out" output stream // Output = error code + "standard out" output stream
std::pair<int, std::string> runProgram(RunOptions && options) std::pair<int, std::string> runProgram(RunOptions && options)
{ {
throw UnimplementedError("Cannot shell out to git on Windows yet"); throw UnimplementedError("Cannot shell out to git on Windows yet");
} }
void runProgram2(const RunOptions & options) void runProgram2(const RunOptions & options)
{ {
throw UnimplementedError("Cannot shell out to git on Windows yet"); throw UnimplementedError("Cannot shell out to git on Windows yet");
} }
std::string statusToString(int status)
{
if (status != 0)
return fmt("with exit code %d", status);
else
return "succeeded";
}
bool statusOk(int status)
{
return status == 0;
}
} }

View file

@ -5,6 +5,13 @@
namespace nix::windows { namespace nix::windows {
/***
* An "async pipe" is a pipe that supports I/O Completion Ports so
* multiple pipes can be listened too.
*
* Unfortunately, only named pipes support that on windows, so we use
* those with randomized temp file names.
*/
class AsyncPipe class AsyncPipe
{ {
public: public: