Make goals use C++20 coroutines (#11005)

undefined
This commit is contained in:
Las Safin 2024-07-15 21:49:15 +01:00 committed by GitHub
parent c6b5503190
commit 846869da0e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 708 additions and 471 deletions

View file

@ -36,6 +36,14 @@
namespace nix { namespace nix {
Goal::Co DerivationGoal::init() {
if (useDerivation) {
co_return getDerivation();
} else {
co_return haveDerivation();
}
}
DerivationGoal::DerivationGoal(const StorePath & drvPath, DerivationGoal::DerivationGoal(const StorePath & drvPath,
const OutputsSpec & wantedOutputs, Worker & worker, BuildMode buildMode) const OutputsSpec & wantedOutputs, Worker & worker, BuildMode buildMode)
: Goal(worker, DerivedPath::Built { .drvPath = makeConstantStorePathRef(drvPath), .outputs = wantedOutputs }) : Goal(worker, DerivedPath::Built { .drvPath = makeConstantStorePathRef(drvPath), .outputs = wantedOutputs })
@ -44,7 +52,6 @@ DerivationGoal::DerivationGoal(const StorePath & drvPath,
, wantedOutputs(wantedOutputs) , wantedOutputs(wantedOutputs)
, buildMode(buildMode) , buildMode(buildMode)
{ {
state = &DerivationGoal::getDerivation;
name = fmt( name = fmt(
"building of '%s' from .drv file", "building of '%s' from .drv file",
DerivedPath::Built { makeConstantStorePathRef(drvPath), wantedOutputs }.to_string(worker.store)); DerivedPath::Built { makeConstantStorePathRef(drvPath), wantedOutputs }.to_string(worker.store));
@ -65,7 +72,6 @@ DerivationGoal::DerivationGoal(const StorePath & drvPath, const BasicDerivation
{ {
this->drv = std::make_unique<Derivation>(drv); this->drv = std::make_unique<Derivation>(drv);
state = &DerivationGoal::haveDerivation;
name = fmt( name = fmt(
"building of '%s' from in-memory derivation", "building of '%s' from in-memory derivation",
DerivedPath::Built { makeConstantStorePathRef(drvPath), drv.outputNames() }.to_string(worker.store)); DerivedPath::Built { makeConstantStorePathRef(drvPath), drv.outputNames() }.to_string(worker.store));
@ -109,13 +115,9 @@ void DerivationGoal::killChild()
void DerivationGoal::timedOut(Error && ex) void DerivationGoal::timedOut(Error && ex)
{ {
killChild(); killChild();
done(BuildResult::TimedOut, {}, std::move(ex)); // We're not inside a coroutine, hence we can't use co_return here.
} // Thus we ignore the return value.
[[maybe_unused]] Done _ = done(BuildResult::TimedOut, {}, std::move(ex));
void DerivationGoal::work()
{
(this->*state)();
} }
void DerivationGoal::addWantedOutputs(const OutputsSpec & outputs) void DerivationGoal::addWantedOutputs(const OutputsSpec & outputs)
@ -139,7 +141,7 @@ void DerivationGoal::addWantedOutputs(const OutputsSpec & outputs)
} }
void DerivationGoal::getDerivation() Goal::Co DerivationGoal::getDerivation()
{ {
trace("init"); trace("init");
@ -147,23 +149,22 @@ void DerivationGoal::getDerivation()
exists. If it doesn't, it may be created through a exists. If it doesn't, it may be created through a
substitute. */ substitute. */
if (buildMode == bmNormal && worker.evalStore.isValidPath(drvPath)) { if (buildMode == bmNormal && worker.evalStore.isValidPath(drvPath)) {
loadDerivation(); co_return loadDerivation();
return;
} }
addWaitee(upcast_goal(worker.makePathSubstitutionGoal(drvPath))); addWaitee(upcast_goal(worker.makePathSubstitutionGoal(drvPath)));
state = &DerivationGoal::loadDerivation; co_await Suspend{};
co_return loadDerivation();
} }
void DerivationGoal::loadDerivation() Goal::Co DerivationGoal::loadDerivation()
{ {
trace("loading derivation"); trace("loading derivation");
if (nrFailed != 0) { if (nrFailed != 0) {
done(BuildResult::MiscFailure, {}, Error("cannot build missing derivation '%s'", worker.store.printStorePath(drvPath))); co_return done(BuildResult::MiscFailure, {}, Error("cannot build missing derivation '%s'", worker.store.printStorePath(drvPath)));
return;
} }
/* `drvPath' should already be a root, but let's be on the safe /* `drvPath' should already be a root, but let's be on the safe
@ -185,11 +186,11 @@ void DerivationGoal::loadDerivation()
} }
assert(drv); assert(drv);
haveDerivation(); co_return haveDerivation();
} }
void DerivationGoal::haveDerivation() Goal::Co DerivationGoal::haveDerivation()
{ {
trace("have derivation"); trace("have derivation");
@ -217,8 +218,7 @@ void DerivationGoal::haveDerivation()
}); });
} }
gaveUpOnSubstitution(); co_return gaveUpOnSubstitution();
return;
} }
for (auto & i : drv->outputsAndOptPaths(worker.store)) for (auto & i : drv->outputsAndOptPaths(worker.store))
@ -240,8 +240,7 @@ void DerivationGoal::haveDerivation()
/* If they are all valid, then we're done. */ /* If they are all valid, then we're done. */
if (allValid && buildMode == bmNormal) { if (allValid && buildMode == bmNormal) {
done(BuildResult::AlreadyValid, std::move(validOutputs)); co_return done(BuildResult::AlreadyValid, std::move(validOutputs));
return;
} }
/* We are first going to try to create the invalid output paths /* We are first going to try to create the invalid output paths
@ -268,24 +267,21 @@ void DerivationGoal::haveDerivation()
} }
} }
if (waitees.empty()) /* to prevent hang (no wake-up event) */ if (!waitees.empty()) co_await Suspend{}; /* to prevent hang (no wake-up event) */
outputsSubstitutionTried(); co_return outputsSubstitutionTried();
else
state = &DerivationGoal::outputsSubstitutionTried;
} }
void DerivationGoal::outputsSubstitutionTried() Goal::Co DerivationGoal::outputsSubstitutionTried()
{ {
trace("all outputs substituted (maybe)"); trace("all outputs substituted (maybe)");
assert(!drv->type().isImpure()); assert(!drv->type().isImpure());
if (nrFailed > 0 && nrFailed > nrNoSubstituters + nrIncompleteClosure && !settings.tryFallback) { if (nrFailed > 0 && nrFailed > nrNoSubstituters + nrIncompleteClosure && !settings.tryFallback) {
done(BuildResult::TransientFailure, {}, co_return done(BuildResult::TransientFailure, {},
Error("some substitutes for the outputs of derivation '%s' failed (usually happens due to networking issues); try '--fallback' to build derivation from source ", Error("some substitutes for the outputs of derivation '%s' failed (usually happens due to networking issues); try '--fallback' to build derivation from source ",
worker.store.printStorePath(drvPath))); worker.store.printStorePath(drvPath)));
return;
} }
/* If the substitutes form an incomplete closure, then we should /* If the substitutes form an incomplete closure, then we should
@ -319,32 +315,29 @@ void DerivationGoal::outputsSubstitutionTried()
if (needRestart == NeedRestartForMoreOutputs::OutputsAddedDoNeed) { if (needRestart == NeedRestartForMoreOutputs::OutputsAddedDoNeed) {
needRestart = NeedRestartForMoreOutputs::OutputsUnmodifedDontNeed; needRestart = NeedRestartForMoreOutputs::OutputsUnmodifedDontNeed;
haveDerivation(); co_return haveDerivation();
return;
} }
auto [allValid, validOutputs] = checkPathValidity(); auto [allValid, validOutputs] = checkPathValidity();
if (buildMode == bmNormal && allValid) { if (buildMode == bmNormal && allValid) {
done(BuildResult::Substituted, std::move(validOutputs)); co_return done(BuildResult::Substituted, std::move(validOutputs));
return;
} }
if (buildMode == bmRepair && allValid) { if (buildMode == bmRepair && allValid) {
repairClosure(); co_return repairClosure();
return;
} }
if (buildMode == bmCheck && !allValid) if (buildMode == bmCheck && !allValid)
throw Error("some outputs of '%s' are not valid, so checking is not possible", throw Error("some outputs of '%s' are not valid, so checking is not possible",
worker.store.printStorePath(drvPath)); worker.store.printStorePath(drvPath));
/* Nothing to wait for; tail call */ /* Nothing to wait for; tail call */
gaveUpOnSubstitution(); co_return gaveUpOnSubstitution();
} }
/* At least one of the output paths could not be /* At least one of the output paths could not be
produced using a substitute. So we have to build instead. */ produced using a substitute. So we have to build instead. */
void DerivationGoal::gaveUpOnSubstitution() Goal::Co DerivationGoal::gaveUpOnSubstitution()
{ {
/* At this point we are building all outputs, so if more are wanted there /* At this point we are building all outputs, so if more are wanted there
is no need to restart. */ is no need to restart. */
@ -405,14 +398,12 @@ void DerivationGoal::gaveUpOnSubstitution()
addWaitee(upcast_goal(worker.makePathSubstitutionGoal(i))); addWaitee(upcast_goal(worker.makePathSubstitutionGoal(i)));
} }
if (waitees.empty()) /* to prevent hang (no wake-up event) */ if (!waitees.empty()) co_await Suspend{}; /* to prevent hang (no wake-up event) */
inputsRealised(); co_return inputsRealised();
else
state = &DerivationGoal::inputsRealised;
} }
void DerivationGoal::repairClosure() Goal::Co DerivationGoal::repairClosure()
{ {
assert(!drv->type().isImpure()); assert(!drv->type().isImpure());
@ -466,41 +457,39 @@ void DerivationGoal::repairClosure()
} }
if (waitees.empty()) { if (waitees.empty()) {
done(BuildResult::AlreadyValid, assertPathValidity()); co_return done(BuildResult::AlreadyValid, assertPathValidity());
return; } else {
co_await Suspend{};
co_return closureRepaired();
} }
state = &DerivationGoal::closureRepaired;
} }
void DerivationGoal::closureRepaired() Goal::Co DerivationGoal::closureRepaired()
{ {
trace("closure repaired"); trace("closure repaired");
if (nrFailed > 0) if (nrFailed > 0)
throw Error("some paths in the output closure of derivation '%s' could not be repaired", throw Error("some paths in the output closure of derivation '%s' could not be repaired",
worker.store.printStorePath(drvPath)); worker.store.printStorePath(drvPath));
done(BuildResult::AlreadyValid, assertPathValidity()); co_return done(BuildResult::AlreadyValid, assertPathValidity());
} }
void DerivationGoal::inputsRealised() Goal::Co DerivationGoal::inputsRealised()
{ {
trace("all inputs realised"); trace("all inputs realised");
if (nrFailed != 0) { if (nrFailed != 0) {
if (!useDerivation) if (!useDerivation)
throw Error("some dependencies of '%s' are missing", worker.store.printStorePath(drvPath)); throw Error("some dependencies of '%s' are missing", worker.store.printStorePath(drvPath));
done(BuildResult::DependencyFailed, {}, Error( co_return done(BuildResult::DependencyFailed, {}, Error(
"%s dependencies of derivation '%s' failed to build", "%s dependencies of derivation '%s' failed to build",
nrFailed, worker.store.printStorePath(drvPath))); nrFailed, worker.store.printStorePath(drvPath)));
return;
} }
if (retrySubstitution == RetrySubstitution::YesNeed) { if (retrySubstitution == RetrySubstitution::YesNeed) {
retrySubstitution = RetrySubstitution::AlreadyRetried; retrySubstitution = RetrySubstitution::AlreadyRetried;
haveDerivation(); co_return haveDerivation();
return;
} }
/* Gather information necessary for computing the closure and/or /* Gather information necessary for computing the closure and/or
@ -566,8 +555,8 @@ void DerivationGoal::inputsRealised()
pathResolved, wantedOutputs, buildMode); pathResolved, wantedOutputs, buildMode);
addWaitee(resolvedDrvGoal); addWaitee(resolvedDrvGoal);
state = &DerivationGoal::resolvedFinished; co_await Suspend{};
return; co_return resolvedFinished();
} }
std::function<void(const StorePath &, const DerivedPathMap<StringSet>::ChildNode &)> accumInputPaths; std::function<void(const StorePath &, const DerivedPathMap<StringSet>::ChildNode &)> accumInputPaths;
@ -631,8 +620,9 @@ void DerivationGoal::inputsRealised()
/* Okay, try to build. Note that here we don't wait for a build /* Okay, try to build. Note that here we don't wait for a build
slot to become available, since we don't need one if there is a slot to become available, since we don't need one if there is a
build hook. */ build hook. */
state = &DerivationGoal::tryToBuild;
worker.wakeUp(shared_from_this()); worker.wakeUp(shared_from_this());
co_await Suspend{};
co_return tryToBuild();
} }
void DerivationGoal::started() void DerivationGoal::started()
@ -657,7 +647,7 @@ void DerivationGoal::started()
worker.updateProgress(); worker.updateProgress();
} }
void DerivationGoal::tryToBuild() Goal::Co DerivationGoal::tryToBuild()
{ {
trace("trying to build"); trace("trying to build");
@ -693,7 +683,8 @@ void DerivationGoal::tryToBuild()
actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting, actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting,
fmt("waiting for lock on %s", Magenta(showPaths(lockFiles)))); fmt("waiting for lock on %s", Magenta(showPaths(lockFiles))));
worker.waitForAWhile(shared_from_this()); worker.waitForAWhile(shared_from_this());
return; co_await Suspend{};
co_return tryToBuild();
} }
actLock.reset(); actLock.reset();
@ -710,8 +701,7 @@ void DerivationGoal::tryToBuild()
if (buildMode != bmCheck && allValid) { if (buildMode != bmCheck && allValid) {
debug("skipping build of derivation '%s', someone beat us to it", worker.store.printStorePath(drvPath)); debug("skipping build of derivation '%s', someone beat us to it", worker.store.printStorePath(drvPath));
outputLocks.setDeletion(true); outputLocks.setDeletion(true);
done(BuildResult::AlreadyValid, std::move(validOutputs)); co_return done(BuildResult::AlreadyValid, std::move(validOutputs));
return;
} }
/* If any of the outputs already exist but are not valid, delete /* If any of the outputs already exist but are not valid, delete
@ -737,9 +727,9 @@ void DerivationGoal::tryToBuild()
EOF from the hook. */ EOF from the hook. */
actLock.reset(); actLock.reset();
buildResult.startTime = time(0); // inexact buildResult.startTime = time(0); // inexact
state = &DerivationGoal::buildDone;
started(); started();
return; co_await Suspend{};
co_return buildDone();
case rpPostpone: case rpPostpone:
/* Not now; wait until at least one child finishes or /* Not now; wait until at least one child finishes or
the wake-up timeout expires. */ the wake-up timeout expires. */
@ -748,7 +738,8 @@ void DerivationGoal::tryToBuild()
fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath)))); fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath))));
worker.waitForAWhile(shared_from_this()); worker.waitForAWhile(shared_from_this());
outputLocks.unlock(); outputLocks.unlock();
return; co_await Suspend{};
co_return tryToBuild();
case rpDecline: case rpDecline:
/* We should do it ourselves. */ /* We should do it ourselves. */
break; break;
@ -757,11 +748,12 @@ void DerivationGoal::tryToBuild()
actLock.reset(); actLock.reset();
state = &DerivationGoal::tryLocalBuild;
worker.wakeUp(shared_from_this()); worker.wakeUp(shared_from_this());
co_await Suspend{};
co_return tryLocalBuild();
} }
void DerivationGoal::tryLocalBuild() { Goal::Co DerivationGoal::tryLocalBuild() {
throw Error( throw Error(
R"( R"(
Unable to build with a primary store that isn't a local store; Unable to build with a primary store that isn't a local store;
@ -938,7 +930,7 @@ void runPostBuildHook(
}); });
} }
void DerivationGoal::buildDone() Goal::Co DerivationGoal::buildDone()
{ {
trace("build done"); trace("build done");
@ -1033,7 +1025,7 @@ void DerivationGoal::buildDone()
outputLocks.setDeletion(true); outputLocks.setDeletion(true);
outputLocks.unlock(); outputLocks.unlock();
done(BuildResult::Built, std::move(builtOutputs)); co_return done(BuildResult::Built, std::move(builtOutputs));
} catch (BuildError & e) { } catch (BuildError & e) {
outputLocks.unlock(); outputLocks.unlock();
@ -1058,12 +1050,11 @@ void DerivationGoal::buildDone()
BuildResult::PermanentFailure; BuildResult::PermanentFailure;
} }
done(st, {}, std::move(e)); co_return done(st, {}, std::move(e));
return;
} }
} }
void DerivationGoal::resolvedFinished() Goal::Co DerivationGoal::resolvedFinished()
{ {
trace("resolved derivation finished"); trace("resolved derivation finished");
@ -1131,7 +1122,7 @@ void DerivationGoal::resolvedFinished()
if (status == BuildResult::AlreadyValid) if (status == BuildResult::AlreadyValid)
status = BuildResult::ResolvesToAlreadyValid; status = BuildResult::ResolvesToAlreadyValid;
done(status, std::move(builtOutputs)); co_return done(status, std::move(builtOutputs));
} }
HookReply DerivationGoal::tryBuildHook() HookReply DerivationGoal::tryBuildHook()
@ -1325,7 +1316,9 @@ void DerivationGoal::handleChildOutput(Descriptor fd, std::string_view data)
logSize += data.size(); logSize += data.size();
if (settings.maxLogSize && logSize > settings.maxLogSize) { if (settings.maxLogSize && logSize > settings.maxLogSize) {
killChild(); killChild();
done( // We're not inside a coroutine, hence we can't use co_return here.
// Thus we ignore the return value.
[[maybe_unused]] Done _ = done(
BuildResult::LogLimitExceeded, {}, BuildResult::LogLimitExceeded, {},
Error("%s killed after writing more than %d bytes of log output", Error("%s killed after writing more than %d bytes of log output",
getName(), settings.maxLogSize)); getName(), settings.maxLogSize));
@ -1531,7 +1524,7 @@ SingleDrvOutputs DerivationGoal::assertPathValidity()
} }
void DerivationGoal::done( Goal::Done DerivationGoal::done(
BuildResult::Status status, BuildResult::Status status,
SingleDrvOutputs builtOutputs, SingleDrvOutputs builtOutputs,
std::optional<Error> ex) std::optional<Error> ex)
@ -1568,7 +1561,7 @@ void DerivationGoal::done(
fs << worker.store.printStorePath(drvPath) << "\t" << buildResult.toString() << std::endl; fs << worker.store.printStorePath(drvPath) << "\t" << buildResult.toString() << std::endl;
} }
amDone(buildResult.success() ? ecSuccess : ecFailed, std::move(ex)); return amDone(buildResult.success() ? ecSuccess : ecFailed, std::move(ex));
} }

View file

@ -194,9 +194,6 @@ struct DerivationGoal : public Goal
*/ */
std::optional<DerivationType> derivationType; std::optional<DerivationType> derivationType;
typedef void (DerivationGoal::*GoalState)();
GoalState state;
BuildMode buildMode; BuildMode buildMode;
std::unique_ptr<MaintainCount<uint64_t>> mcExpectedBuilds, mcRunningBuilds; std::unique_ptr<MaintainCount<uint64_t>> mcExpectedBuilds, mcRunningBuilds;
@ -227,8 +224,6 @@ struct DerivationGoal : public Goal
std::string key() override; std::string key() override;
void work() override;
/** /**
* Add wanted outputs to an already existing derivation goal. * Add wanted outputs to an already existing derivation goal.
*/ */
@ -237,18 +232,19 @@ struct DerivationGoal : public Goal
/** /**
* The states. * The states.
*/ */
void getDerivation(); Co init() override;
void loadDerivation(); Co getDerivation();
void haveDerivation(); Co loadDerivation();
void outputsSubstitutionTried(); Co haveDerivation();
void gaveUpOnSubstitution(); Co outputsSubstitutionTried();
void closureRepaired(); Co gaveUpOnSubstitution();
void inputsRealised(); Co closureRepaired();
void tryToBuild(); Co inputsRealised();
virtual void tryLocalBuild(); Co tryToBuild();
void buildDone(); virtual Co tryLocalBuild();
Co buildDone();
void resolvedFinished(); Co resolvedFinished();
/** /**
* Is the build hook willing to perform the build? * Is the build hook willing to perform the build?
@ -329,11 +325,11 @@ struct DerivationGoal : public Goal
*/ */
virtual void killChild(); virtual void killChild();
void repairClosure(); Co repairClosure();
void started(); void started();
void done( Done done(
BuildResult::Status status, BuildResult::Status status,
SingleDrvOutputs builtOutputs = {}, SingleDrvOutputs builtOutputs = {},
std::optional<Error> ex = {}); std::optional<Error> ex = {});

View file

@ -14,100 +14,78 @@ DrvOutputSubstitutionGoal::DrvOutputSubstitutionGoal(
: Goal(worker, DerivedPath::Opaque { StorePath::dummy }) : Goal(worker, DerivedPath::Opaque { StorePath::dummy })
, id(id) , id(id)
{ {
state = &DrvOutputSubstitutionGoal::init;
name = fmt("substitution of '%s'", id.to_string()); name = fmt("substitution of '%s'", id.to_string());
trace("created"); trace("created");
} }
void DrvOutputSubstitutionGoal::init() Goal::Co DrvOutputSubstitutionGoal::init()
{ {
trace("init"); trace("init");
/* If the derivation already exists, were done */ /* If the derivation already exists, were done */
if (worker.store.queryRealisation(id)) { if (worker.store.queryRealisation(id)) {
amDone(ecSuccess); co_return amDone(ecSuccess);
return;
} }
subs = settings.useSubstitutes ? getDefaultSubstituters() : std::list<ref<Store>>(); auto subs = settings.useSubstitutes ? getDefaultSubstituters() : std::list<ref<Store>>();
tryNext();
}
void DrvOutputSubstitutionGoal::tryNext() bool substituterFailed = false;
{
for (auto sub : subs) {
trace("trying next substituter"); trace("trying next substituter");
if (subs.size() == 0) {
/* None left. Terminate this goal and let someone else deal
with it. */
debug("derivation output '%s' is required, but there is no substituter that can provide it", id.to_string());
/* Hack: don't indicate failure if there were no substituters.
In that case the calling derivation should just do a
build. */
amDone(substituterFailed ? ecFailed : ecNoSubstituters);
if (substituterFailed) {
worker.failedSubstitutions++;
worker.updateProgress();
}
return;
}
sub = subs.front();
subs.pop_front();
// FIXME: Make async
// outputInfo = sub->queryRealisation(id);
/* The callback of the curl download below can outlive `this` (if /* The callback of the curl download below can outlive `this` (if
some other error occurs), so it must not touch `this`. So put some other error occurs), so it must not touch `this`. So put
the shared state in a separate refcounted object. */ the shared state in a separate refcounted object. */
downloadState = std::make_shared<DownloadState>(); auto outPipe = std::make_shared<MuxablePipe>();
#ifndef _WIN32 #ifndef _WIN32
downloadState->outPipe.create(); outPipe->create();
#else #else
downloadState->outPipe.createAsyncPipe(worker.ioport.get()); outPipe->createAsyncPipe(worker.ioport.get());
#endif #endif
auto promise = std::make_shared<std::promise<std::shared_ptr<const Realisation>>>();
sub->queryRealisation( sub->queryRealisation(
id, id,
{ [downloadState(downloadState)](std::future<std::shared_ptr<const Realisation>> res) { { [outPipe(outPipe), promise(promise)](std::future<std::shared_ptr<const Realisation>> res) {
try { try {
Finally updateStats([&]() { downloadState->outPipe.writeSide.close(); }); Finally updateStats([&]() { outPipe->writeSide.close(); });
downloadState->promise.set_value(res.get()); promise->set_value(res.get());
} catch (...) { } catch (...) {
downloadState->promise.set_exception(std::current_exception()); promise->set_exception(std::current_exception());
} }
} }); } });
worker.childStarted(shared_from_this(), { worker.childStarted(shared_from_this(), {
#ifndef _WIN32 #ifndef _WIN32
downloadState->outPipe.readSide.get() outPipe->readSide.get()
#else #else
&downloadState->outPipe &outPipe
#endif #endif
}, true, false); }, true, false);
state = &DrvOutputSubstitutionGoal::realisationFetched; co_await Suspend{};
}
void DrvOutputSubstitutionGoal::realisationFetched()
{
worker.childTerminated(this); worker.childTerminated(this);
/*
* The realisation corresponding to the given output id.
* Will be filled once we can get it.
*/
std::shared_ptr<const Realisation> outputInfo;
try { try {
outputInfo = downloadState->promise.get_future().get(); outputInfo = promise->get_future().get();
} catch (std::exception & e) { } catch (std::exception & e) {
printError(e.what()); printError(e.what());
substituterFailed = true; substituterFailed = true;
} }
if (!outputInfo) { if (!outputInfo) continue;
return tryNext();
} bool failed = false;
for (const auto & [depId, depPath] : outputInfo->dependentRealisations) { for (const auto & [depId, depPath] : outputInfo->dependentRealisations) {
if (depId != id) { if (depId != id) {
@ -122,38 +100,49 @@ void DrvOutputSubstitutionGoal::realisationFetched()
worker.store.printStorePath(localOutputInfo->outPath), worker.store.printStorePath(localOutputInfo->outPath),
worker.store.printStorePath(depPath) worker.store.printStorePath(depPath)
); );
tryNext(); failed = true;
return; break;
} }
addWaitee(worker.makeDrvOutputSubstitutionGoal(depId)); addWaitee(worker.makeDrvOutputSubstitutionGoal(depId));
} }
} }
addWaitee(worker.makePathSubstitutionGoal(outputInfo->outPath)); if (failed) continue;
if (waitees.empty()) outPathValid(); co_return realisationFetched(outputInfo, sub);
else state = &DrvOutputSubstitutionGoal::outPathValid; }
/* None left. Terminate this goal and let someone else deal
with it. */
debug("derivation output '%s' is required, but there is no substituter that can provide it", id.to_string());
if (substituterFailed) {
worker.failedSubstitutions++;
worker.updateProgress();
}
/* Hack: don't indicate failure if there were no substituters.
In that case the calling derivation should just do a
build. */
co_return amDone(substituterFailed ? ecFailed : ecNoSubstituters);
} }
void DrvOutputSubstitutionGoal::outPathValid() Goal::Co DrvOutputSubstitutionGoal::realisationFetched(std::shared_ptr<const Realisation> outputInfo, nix::ref<nix::Store> sub) {
{ addWaitee(worker.makePathSubstitutionGoal(outputInfo->outPath));
assert(outputInfo);
if (!waitees.empty()) co_await Suspend{};
trace("output path substituted"); trace("output path substituted");
if (nrFailed > 0) { if (nrFailed > 0) {
debug("The output path of the derivation output '%s' could not be substituted", id.to_string()); debug("The output path of the derivation output '%s' could not be substituted", id.to_string());
amDone(nrNoSubstituters > 0 || nrIncompleteClosure > 0 ? ecIncompleteClosure : ecFailed); co_return amDone(nrNoSubstituters > 0 || nrIncompleteClosure > 0 ? ecIncompleteClosure : ecFailed);
return;
} }
worker.store.registerDrvOutput(*outputInfo); worker.store.registerDrvOutput(*outputInfo);
finished();
}
void DrvOutputSubstitutionGoal::finished()
{
trace("finished"); trace("finished");
amDone(ecSuccess); co_return amDone(ecSuccess);
} }
std::string DrvOutputSubstitutionGoal::key() std::string DrvOutputSubstitutionGoal::key()
@ -163,14 +152,9 @@ std::string DrvOutputSubstitutionGoal::key()
return "a$" + std::string(id.to_string()); return "a$" + std::string(id.to_string());
} }
void DrvOutputSubstitutionGoal::work()
{
(this->*state)();
}
void DrvOutputSubstitutionGoal::handleEOF(Descriptor fd) void DrvOutputSubstitutionGoal::handleEOF(Descriptor fd)
{ {
if (fd == downloadState->outPipe.readSide.get()) worker.wakeUp(shared_from_this()); worker.wakeUp(shared_from_this());
} }

View file

@ -27,52 +27,19 @@ class DrvOutputSubstitutionGoal : public Goal {
*/ */
DrvOutput id; DrvOutput id;
/**
* The realisation corresponding to the given output id.
* Will be filled once we can get it.
*/
std::shared_ptr<const Realisation> outputInfo;
/**
* The remaining substituters.
*/
std::list<ref<Store>> subs;
/**
* The current substituter.
*/
std::shared_ptr<Store> sub;
struct DownloadState
{
MuxablePipe outPipe;
std::promise<std::shared_ptr<const Realisation>> promise;
};
std::shared_ptr<DownloadState> downloadState;
/**
* Whether a substituter failed.
*/
bool substituterFailed = false;
public: public:
DrvOutputSubstitutionGoal(const DrvOutput& id, Worker & worker, RepairFlag repair = NoRepair, std::optional<ContentAddress> ca = std::nullopt); DrvOutputSubstitutionGoal(const DrvOutput& id, Worker & worker, RepairFlag repair = NoRepair, std::optional<ContentAddress> ca = std::nullopt);
typedef void (DrvOutputSubstitutionGoal::*GoalState)(); typedef void (DrvOutputSubstitutionGoal::*GoalState)();
GoalState state; GoalState state;
void init(); Co init() override;
void tryNext(); Co realisationFetched(std::shared_ptr<const Realisation> outputInfo, nix::ref<nix::Store> sub);
void realisationFetched();
void outPathValid();
void finished();
void timedOut(Error && ex) override { abort(); }; void timedOut(Error && ex) override { abort(); };
std::string key() override; std::string key() override;
void work() override;
void handleEOF(Descriptor fd) override; void handleEOF(Descriptor fd) override;
JobCategory jobCategory() const override { JobCategory jobCategory() const override {

View file

@ -3,6 +3,97 @@
namespace nix { namespace nix {
using Co = nix::Goal::Co;
using promise_type = nix::Goal::promise_type;
using handle_type = nix::Goal::handle_type;
using Suspend = nix::Goal::Suspend;
Co::Co(Co&& rhs) {
this->handle = rhs.handle;
rhs.handle = nullptr;
}
void Co::operator=(Co&& rhs) {
this->handle = rhs.handle;
rhs.handle = nullptr;
}
Co::~Co() {
if (handle) {
handle.promise().alive = false;
handle.destroy();
}
}
Co promise_type::get_return_object() {
auto handle = handle_type::from_promise(*this);
return Co{handle};
};
std::coroutine_handle<> promise_type::final_awaiter::await_suspend(handle_type h) noexcept {
auto& p = h.promise();
auto goal = p.goal;
assert(goal);
goal->trace("in final_awaiter");
auto c = std::move(p.continuation);
if (c) {
// We still have a continuation, i.e. work to do.
// We assert that the goal is still busy.
assert(goal->exitCode == ecBusy);
assert(goal->top_co); // Goal must have an active coroutine.
assert(goal->top_co->handle == h); // The active coroutine must be us.
assert(p.alive); // We must not have been destructed.
// we move continuation to the top,
// note: previous top_co is actually h, so by moving into it,
// we're calling the destructor on h, DON'T use h and p after this!
// We move our continuation into `top_co`, i.e. the marker for the active continuation.
// By doing this we destruct the old `top_co`, i.e. us, so `h` can't be used anymore.
// Be careful not to access freed memory!
goal->top_co = std::move(c);
// We resume `top_co`.
return goal->top_co->handle;
} else {
// We have no continuation, i.e. no more work to do,
// so the goal must not be busy anymore.
assert(goal->exitCode != ecBusy);
// We reset `top_co` for good measure.
p.goal->top_co = {};
// We jump to the noop coroutine, which doesn't do anything and immediately suspends.
// This passes control back to the caller of goal.work().
return std::noop_coroutine();
}
}
void promise_type::return_value(Co&& next) {
goal->trace("return_value(Co&&)");
// Save old continuation.
auto old_continuation = std::move(continuation);
// We set next as our continuation.
continuation = std::move(next);
// We set next's goal, and thus it must not have one already.
assert(!continuation->handle.promise().goal);
continuation->handle.promise().goal = goal;
// Nor can next have a continuation, as we set it to our old one.
assert(!continuation->handle.promise().continuation);
continuation->handle.promise().continuation = std::move(old_continuation);
}
std::coroutine_handle<> nix::Goal::Co::await_suspend(handle_type caller) {
assert(handle); // we must be a valid coroutine
auto& p = handle.promise();
assert(!p.continuation); // we must have no continuation
assert(!p.goal); // we must not have a goal yet
auto goal = caller.promise().goal;
assert(goal);
p.goal = goal;
p.continuation = std::move(goal->top_co); // we set our continuation to be top_co (i.e. caller)
goal->top_co = std::move(*this); // we set top_co to ourselves, don't use this anymore after this!
return p.goal->top_co->handle; // we execute ourselves
}
bool CompareGoalPtrs::operator() (const GoalPtr & a, const GoalPtr & b) const { bool CompareGoalPtrs::operator() (const GoalPtr & a, const GoalPtr & b) const {
std::string s1 = a->key(); std::string s1 = a->key();
@ -75,10 +166,10 @@ void Goal::waiteeDone(GoalPtr waitee, ExitCode result)
} }
} }
Goal::Done Goal::amDone(ExitCode result, std::optional<Error> ex)
void Goal::amDone(ExitCode result, std::optional<Error> ex)
{ {
trace("done"); trace("done");
assert(top_co);
assert(exitCode == ecBusy); assert(exitCode == ecBusy);
assert(result == ecSuccess || result == ecFailed || result == ecNoSubstituters || result == ecIncompleteClosure); assert(result == ecSuccess || result == ecFailed || result == ecNoSubstituters || result == ecIncompleteClosure);
exitCode = result; exitCode = result;
@ -98,6 +189,13 @@ void Goal::amDone(ExitCode result, std::optional<Error> ex)
worker.removeGoal(shared_from_this()); worker.removeGoal(shared_from_this());
cleanup(); cleanup();
// We drop the continuation.
// In `final_awaiter` this will signal that there is no more work to be done.
top_co->handle.promise().continuation = {};
// won't return to caller because of logic in final_awaiter
return Done{};
} }
@ -106,4 +204,16 @@ void Goal::trace(std::string_view s)
debug("%1%: %2%", name, s); debug("%1%: %2%", name, s);
} }
void Goal::work()
{
assert(top_co);
assert(top_co->handle);
assert(top_co->handle.promise().alive);
top_co->handle.resume();
// We either should be in a state where we can be work()-ed again,
// or we should be done.
assert(top_co || exitCode != ecBusy);
}
} }

View file

@ -1,10 +1,11 @@
#pragma once #pragma once
///@file ///@file
#include "types.hh"
#include "store-api.hh" #include "store-api.hh"
#include "build-result.hh" #include "build-result.hh"
#include <coroutine>
namespace nix { namespace nix {
/** /**
@ -103,9 +104,263 @@ protected:
* Build result. * Build result.
*/ */
BuildResult buildResult; BuildResult buildResult;
public: public:
/**
* Suspend our goal and wait until we get @ref work()-ed again.
* `co_await`-able by @ref Co.
*/
struct Suspend {};
/**
* Return from the current coroutine and suspend our goal
* if we're not busy anymore, or jump to the next coroutine
* set to be executed/resumed.
*/
struct Return {};
/**
* `co_return`-ing this will end the goal.
* If you're not inside a coroutine, you can safely discard this.
*/
struct [[nodiscard]] Done {
private:
Done(){}
friend Goal;
};
// forward declaration of promise_type, see below
struct promise_type;
/**
* Handle to coroutine using @ref Co and @ref promise_type.
*/
using handle_type = std::coroutine_handle<promise_type>;
/**
* C++20 coroutine wrapper for use in goal logic.
* Coroutines are functions that use `co_await`/`co_return` (and `co_yield`, but not supported by @ref Co).
*
* @ref Co is meant to be used by methods of subclasses of @ref Goal.
* The main functionality provided by `Co` is
* - `co_await Suspend{}`: Suspends the goal.
* - `co_await f()`: Waits until `f()` finishes.
* - `co_return f()`: Tail-calls `f()`.
* - `co_return Return{}`: Ends coroutine.
*
* The idea is that you implement the goal logic using coroutines,
* and do the core thing a goal can do, suspension, when you have
* children you're waiting for.
* Coroutines allow you to resume the work cleanly.
*
* @note Brief explanation of C++20 coroutines:
* When you `Co f()`, a `std::coroutine_handle<promise_type>` is created,
* alongside its @ref promise_type.
* There are suspension points at the beginning of the coroutine,
* at every `co_await`, and at the final (possibly implicit) `co_return`.
* Once suspended, you can resume the `std::coroutine_handle` by doing `coroutine_handle.resume()`.
* Suspension points are implemented by passing a struct to the compiler
* that implements `await_sus`pend.
* `await_suspend` can either say "cancel suspension", in which case execution resumes,
* "suspend", in which case control is passed back to the caller of `coroutine_handle.resume()`
* or the place where the coroutine function is initially executed in the case of the initial
* suspension, or `await_suspend` can specify another coroutine to jump to, which is
* how tail calls are implemented.
*
* @note Resources:
* - https://lewissbaker.github.io/
* - https://www.chiark.greenend.org.uk/~sgtatham/quasiblog/coroutines-c++20/
* - https://www.scs.stanford.edu/~dm/blog/c++-coroutines.html
*
* @todo Allocate explicitly on stack since HALO thing doesn't really work,
* specifically, there's no way to uphold the requirements when trying to do
* tail-calls without using a trampoline AFAICT.
*
* @todo Support returning data natively
*/
struct [[nodiscard]] Co {
/**
* The underlying handle.
*/
handle_type handle;
explicit Co(handle_type handle) : handle(handle) {};
void operator=(Co&&);
Co(Co&& rhs);
~Co();
bool await_ready() { return false; };
/**
* When we `co_await` another @ref Co-returning coroutine,
* we tell the caller of `caller_coroutine.resume()` to switch to our coroutine (@ref handle).
* To make sure we return to the original coroutine, we set it as the continuation of our
* coroutine. In @ref promise_type::final_awaiter we check if it's set and if so we return to it.
*
* To explain in more understandable terms:
* When we `co_await Co_returning_function()`, this function is called on the resultant @ref Co of
* the _called_ function, and C++ automatically passes the caller in.
*
* `goal` field of @ref promise_type is also set here by copying it from the caller.
*/
std::coroutine_handle<> await_suspend(handle_type handle);
void await_resume() {};
};
/**
* Used on initial suspend, does the same as @ref std::suspend_always,
* but asserts that everything has been set correctly.
*/
struct InitialSuspend {
/**
* Handle of coroutine that does the
* initial suspend
*/
handle_type handle;
bool await_ready() { return false; };
void await_suspend(handle_type handle_) {
handle = handle_;
}
void await_resume() {
assert(handle);
assert(handle.promise().goal); // goal must be set
assert(handle.promise().goal->top_co); // top_co of goal must be set
assert(handle.promise().goal->top_co->handle == handle); // top_co of goal must be us
}
};
/**
* Promise type for coroutines defined using @ref Co.
* Attached to coroutine handle.
*/
struct promise_type {
/**
* Either this is who called us, or it is who we will tail-call.
* It is what we "jump" to once we are done.
*/
std::optional<Co> continuation;
/**
* The goal that we're a part of.
* Set either in @ref Co::await_suspend or in constructor of @ref Goal.
*/
Goal* goal = nullptr;
/**
* Is set to false when destructed to ensure we don't use a
* destructed coroutine by accident
*/
bool alive = true;
/**
* The awaiter used by @ref final_suspend.
*/
struct final_awaiter {
bool await_ready() noexcept { return false; };
/**
* Here we execute our continuation, by passing it back to the caller.
* C++ compiler will create code that takes that and executes it promptly.
* `h` is the handle for the coroutine that is finishing execution,
* thus it must be destroyed.
*/
std::coroutine_handle<> await_suspend(handle_type h) noexcept;
void await_resume() noexcept { assert(false); };
};
/**
* Called by compiler generated code to construct the @ref Co
* that is returned from a @ref Co-returning coroutine.
*/
Co get_return_object();
/**
* Called by compiler generated code before body of coroutine.
* We use this opportunity to set the @ref goal field
* and `top_co` field of @ref Goal.
*/
InitialSuspend initial_suspend() { return {}; };
/**
* Called on `co_return`. Creates @ref final_awaiter which
* either jumps to continuation or suspends goal.
*/
final_awaiter final_suspend() noexcept { return {}; };
/**
* Does nothing, but provides an opportunity for
* @ref final_suspend to happen.
*/
void return_value(Return) {}
/**
* Does nothing, but provides an opportunity for
* @ref final_suspend to happen.
*/
void return_value(Done) {}
/**
* When "returning" another coroutine, what happens is that
* we set it as our own continuation, thus once the final suspend
* happens, we transfer control to it.
* The original continuation we had is set as the continuation
* of the coroutine passed in.
* @ref final_suspend is called after this, and @ref final_awaiter will
* pass control off to @ref continuation.
*
* If we already have a continuation, that continuation is set as
* the continuation of the new continuation. Thus, the continuation
* passed to @ref return_value must not have a continuation set.
*/
void return_value(Co&&);
/**
* If an exception is thrown inside a coroutine,
* we re-throw it in the context of the "resumer" of the continuation.
*/
void unhandled_exception() { throw; };
/**
* Allows awaiting a @ref Co.
*/
Co&& await_transform(Co&& co) { return static_cast<Co&&>(co); }
/**
* Allows awaiting a @ref Suspend.
* Always suspends.
*/
std::suspend_always await_transform(Suspend) { return {}; };
};
/**
* The coroutine being currently executed.
* MUST be updated when switching the coroutine being executed.
* This is used both for memory management and to resume the last
* coroutine executed.
* Destroying this should destroy all coroutines created for this goal.
*/
std::optional<Co> top_co;
/**
* The entry point for the goal
*/
virtual Co init() = 0;
/**
* Wrapper around @ref init since virtual functions
* can't be used in constructors.
*/
inline Co init_wrapper();
/**
* Signals that the goal is done.
* `co_return` the result. If you're not inside a coroutine, you can ignore
* the return value safely.
*/
Done amDone(ExitCode result, std::optional<Error> ex = {});
virtual void cleanup() { }
/** /**
* Project a `BuildResult` with just the information that pertains * Project a `BuildResult` with just the information that pertains
* to the given request. * to the given request.
@ -124,15 +379,20 @@ public:
std::optional<Error> ex; std::optional<Error> ex;
Goal(Worker & worker, DerivedPath path) Goal(Worker & worker, DerivedPath path)
: worker(worker) : worker(worker), top_co(init_wrapper())
{ } {
// top_co shouldn't have a goal already, should be nullptr.
assert(!top_co->handle.promise().goal);
// we set it such that top_co can pass it down to its subcoroutines.
top_co->handle.promise().goal = this;
}
virtual ~Goal() virtual ~Goal()
{ {
trace("goal destroyed"); trace("goal destroyed");
} }
virtual void work() = 0; void work();
void addWaitee(GoalPtr waitee); void addWaitee(GoalPtr waitee);
@ -164,10 +424,6 @@ public:
virtual std::string key() = 0; virtual std::string key() = 0;
void amDone(ExitCode result, std::optional<Error> ex = {});
virtual void cleanup() { }
/** /**
* @brief Hint for the scheduler, which concurrency limit applies. * @brief Hint for the scheduler, which concurrency limit applies.
* @see JobCategory * @see JobCategory
@ -178,3 +434,12 @@ public:
void addToWeakGoals(WeakGoals & goals, GoalPtr p); void addToWeakGoals(WeakGoals & goals, GoalPtr p);
} }
template<typename... ArgTypes>
struct std::coroutine_traits<nix::Goal::Co, ArgTypes...> {
using promise_type = nix::Goal::promise_type;
};
nix::Goal::Co nix::Goal::init_wrapper() {
co_return init();
}

View file

@ -3,6 +3,7 @@
#include "nar-info.hh" #include "nar-info.hh"
#include "finally.hh" #include "finally.hh"
#include "signals.hh" #include "signals.hh"
#include <coroutine>
namespace nix { namespace nix {
@ -12,7 +13,6 @@ PathSubstitutionGoal::PathSubstitutionGoal(const StorePath & storePath, Worker &
, repair(repair) , repair(repair)
, ca(ca) , ca(ca)
{ {
state = &PathSubstitutionGoal::init;
name = fmt("substitution of '%s'", worker.store.printStorePath(this->storePath)); name = fmt("substitution of '%s'", worker.store.printStorePath(this->storePath));
trace("created"); trace("created");
maintainExpectedSubstitutions = std::make_unique<MaintainCount<uint64_t>>(worker.expectedSubstitutions); maintainExpectedSubstitutions = std::make_unique<MaintainCount<uint64_t>>(worker.expectedSubstitutions);
@ -25,7 +25,7 @@ PathSubstitutionGoal::~PathSubstitutionGoal()
} }
void PathSubstitutionGoal::done( Goal::Done PathSubstitutionGoal::done(
ExitCode result, ExitCode result,
BuildResult::Status status, BuildResult::Status status,
std::optional<std::string> errorMsg) std::optional<std::string> errorMsg)
@ -35,17 +35,11 @@ void PathSubstitutionGoal::done(
debug(*errorMsg); debug(*errorMsg);
buildResult.errorMsg = *errorMsg; buildResult.errorMsg = *errorMsg;
} }
amDone(result); return amDone(result);
} }
void PathSubstitutionGoal::work() Goal::Co PathSubstitutionGoal::init()
{
(this->*state)();
}
void PathSubstitutionGoal::init()
{ {
trace("init"); trace("init");
@ -53,47 +47,27 @@ void PathSubstitutionGoal::init()
/* If the path already exists we're done. */ /* If the path already exists we're done. */
if (!repair && worker.store.isValidPath(storePath)) { if (!repair && worker.store.isValidPath(storePath)) {
done(ecSuccess, BuildResult::AlreadyValid); co_return done(ecSuccess, BuildResult::AlreadyValid);
return;
} }
if (settings.readOnlyMode) if (settings.readOnlyMode)
throw Error("cannot substitute path '%s' - no write access to the Nix store", worker.store.printStorePath(storePath)); throw Error("cannot substitute path '%s' - no write access to the Nix store", worker.store.printStorePath(storePath));
subs = settings.useSubstitutes ? getDefaultSubstituters() : std::list<ref<Store>>(); auto subs = settings.useSubstitutes ? getDefaultSubstituters() : std::list<ref<Store>>();
tryNext(); bool substituterFailed = false;
}
for (auto sub : subs) {
void PathSubstitutionGoal::tryNext()
{
trace("trying next substituter"); trace("trying next substituter");
cleanup(); cleanup();
if (subs.size() == 0) { /* The path the substituter refers to the path as. This will be
/* None left. Terminate this goal and let someone else deal * different when the stores have different names. */
with it. */ std::optional<StorePath> subPath;
/* Hack: don't indicate failure if there were no substituters. /* Path info returned by the substituter's query info operation. */
In that case the calling derivation should just do a std::shared_ptr<const ValidPathInfo> info;
build. */
done(
substituterFailed ? ecFailed : ecNoSubstituters,
BuildResult::NoSubstituters,
fmt("path '%s' is required, but there is no substituter that can build it", worker.store.printStorePath(storePath)));
if (substituterFailed) {
worker.failedSubstitutions++;
worker.updateProgress();
}
return;
}
sub = subs.front();
subs.pop_front();
if (ca) { if (ca) {
subPath = sub->makeFixedOutputPathFromCA( subPath = sub->makeFixedOutputPathFromCA(
@ -102,29 +76,22 @@ void PathSubstitutionGoal::tryNext()
if (sub->storeDir == worker.store.storeDir) if (sub->storeDir == worker.store.storeDir)
assert(subPath == storePath); assert(subPath == storePath);
} else if (sub->storeDir != worker.store.storeDir) { } else if (sub->storeDir != worker.store.storeDir) {
tryNext(); continue;
return;
} }
try { try {
// FIXME: make async // FIXME: make async
info = sub->queryPathInfo(subPath ? *subPath : storePath); info = sub->queryPathInfo(subPath ? *subPath : storePath);
} catch (InvalidPath &) { } catch (InvalidPath &) {
tryNext(); continue;
return; } catch (SubstituterDisabled & e) {
} catch (SubstituterDisabled &) { if (settings.tryFallback) continue;
if (settings.tryFallback) { else throw e;
tryNext();
return;
}
throw;
} catch (Error & e) { } catch (Error & e) {
if (settings.tryFallback) { if (settings.tryFallback) {
logError(e.info()); logError(e.info());
tryNext(); continue;
return; } else throw e;
}
throw;
} }
if (info->path != storePath) { if (info->path != storePath) {
@ -135,8 +102,7 @@ void PathSubstitutionGoal::tryNext()
} else { } else {
printError("asked '%s' for '%s' but got '%s'", printError("asked '%s' for '%s' but got '%s'",
sub->getUri(), worker.store.printStorePath(storePath), sub->printStorePath(info->path)); sub->getUri(), worker.store.printStorePath(storePath), sub->printStorePath(info->path));
tryNext(); continue;
return;
} }
} }
@ -159,8 +125,7 @@ void PathSubstitutionGoal::tryNext()
{ {
warn("ignoring substitute for '%s' from '%s', as it's not signed by any of the keys in 'trusted-public-keys'", warn("ignoring substitute for '%s' from '%s', as it's not signed by any of the keys in 'trusted-public-keys'",
worker.store.printStorePath(storePath), sub->getUri()); worker.store.printStorePath(storePath), sub->getUri());
tryNext(); continue;
return;
} }
/* To maintain the closure invariant, we first have to realise the /* To maintain the closure invariant, we first have to realise the
@ -169,36 +134,48 @@ void PathSubstitutionGoal::tryNext()
if (i != storePath) /* ignore self-references */ if (i != storePath) /* ignore self-references */
addWaitee(worker.makePathSubstitutionGoal(i)); addWaitee(worker.makePathSubstitutionGoal(i));
if (waitees.empty()) /* to prevent hang (no wake-up event) */ if (!waitees.empty()) co_await Suspend{};
referencesValid();
else // FIXME: consider returning boolean instead of passing in reference
state = &PathSubstitutionGoal::referencesValid; bool out = false; // is mutated by tryToRun
co_await tryToRun(subPath ? *subPath : storePath, sub, info, out);
substituterFailed = substituterFailed || out;
}
/* None left. Terminate this goal and let someone else deal
with it. */
worker.failedSubstitutions++;
worker.updateProgress();
/* Hack: don't indicate failure if there were no substituters.
In that case the calling derivation should just do a
build. */
co_return done(
substituterFailed ? ecFailed : ecNoSubstituters,
BuildResult::NoSubstituters,
fmt("path '%s' is required, but there is no substituter that can build it", worker.store.printStorePath(storePath)));
} }
void PathSubstitutionGoal::referencesValid() Goal::Co PathSubstitutionGoal::tryToRun(StorePath subPath, nix::ref<Store> sub, std::shared_ptr<const ValidPathInfo> info, bool& substituterFailed)
{ {
trace("all references realised"); trace("all references realised");
if (nrFailed > 0) { if (nrFailed > 0) {
done( co_return done(
nrNoSubstituters > 0 || nrIncompleteClosure > 0 ? ecIncompleteClosure : ecFailed, nrNoSubstituters > 0 || nrIncompleteClosure > 0 ? ecIncompleteClosure : ecFailed,
BuildResult::DependencyFailed, BuildResult::DependencyFailed,
fmt("some references of path '%s' could not be realised", worker.store.printStorePath(storePath))); fmt("some references of path '%s' could not be realised", worker.store.printStorePath(storePath)));
return;
} }
for (auto & i : info->references) for (auto & i : info->references)
if (i != storePath) /* ignore self-references */ if (i != storePath) /* ignore self-references */
assert(worker.store.isValidPath(i)); assert(worker.store.isValidPath(i));
state = &PathSubstitutionGoal::tryToRun;
worker.wakeUp(shared_from_this()); worker.wakeUp(shared_from_this());
} co_await Suspend{};
void PathSubstitutionGoal::tryToRun()
{
trace("trying to run"); trace("trying to run");
/* Make sure that we are allowed to start a substitution. Note that even /* Make sure that we are allowed to start a substitution. Note that even
@ -206,10 +183,10 @@ void PathSubstitutionGoal::tryToRun()
prevents infinite waiting. */ prevents infinite waiting. */
if (worker.getNrSubstitutions() >= std::max(1U, (unsigned int) settings.maxSubstitutionJobs)) { if (worker.getNrSubstitutions() >= std::max(1U, (unsigned int) settings.maxSubstitutionJobs)) {
worker.waitForBuildSlot(shared_from_this()); worker.waitForBuildSlot(shared_from_this());
return; co_await Suspend{};
} }
maintainRunningSubstitutions = std::make_unique<MaintainCount<uint64_t>>(worker.runningSubstitutions); auto maintainRunningSubstitutions = std::make_unique<MaintainCount<uint64_t>>(worker.runningSubstitutions);
worker.updateProgress(); worker.updateProgress();
#ifndef _WIN32 #ifndef _WIN32
@ -218,9 +195,9 @@ void PathSubstitutionGoal::tryToRun()
outPipe.createAsyncPipe(worker.ioport.get()); outPipe.createAsyncPipe(worker.ioport.get());
#endif #endif
promise = std::promise<void>(); auto promise = std::promise<void>();
thr = std::thread([this]() { thr = std::thread([this, &promise, &subPath, &sub]() {
try { try {
ReceiveInterrupts receiveInterrupts; ReceiveInterrupts receiveInterrupts;
@ -231,7 +208,7 @@ void PathSubstitutionGoal::tryToRun()
PushActivity pact(act.id); PushActivity pact(act.id);
copyStorePath(*sub, worker.store, copyStorePath(*sub, worker.store,
subPath ? *subPath : storePath, repair, sub->isTrusted ? NoCheckSigs : CheckSigs); subPath, repair, sub->isTrusted ? NoCheckSigs : CheckSigs);
promise.set_value(); promise.set_value();
} catch (...) { } catch (...) {
@ -247,12 +224,8 @@ void PathSubstitutionGoal::tryToRun()
#endif #endif
}, true, false); }, true, false);
state = &PathSubstitutionGoal::finished; co_await Suspend{};
}
void PathSubstitutionGoal::finished()
{
trace("substitute finished"); trace("substitute finished");
thr.join(); thr.join();
@ -274,10 +247,7 @@ void PathSubstitutionGoal::finished()
substituterFailed = true; substituterFailed = true;
} }
/* Try the next substitute. */ co_return Return{};
state = &PathSubstitutionGoal::tryNext;
worker.wakeUp(shared_from_this());
return;
} }
worker.markContentsGood(storePath); worker.markContentsGood(storePath);
@ -295,23 +265,19 @@ void PathSubstitutionGoal::finished()
worker.doneDownloadSize += fileSize; worker.doneDownloadSize += fileSize;
} }
assert(maintainExpectedNar);
worker.doneNarSize += maintainExpectedNar->delta; worker.doneNarSize += maintainExpectedNar->delta;
maintainExpectedNar.reset(); maintainExpectedNar.reset();
worker.updateProgress(); worker.updateProgress();
done(ecSuccess, BuildResult::Substituted); co_return done(ecSuccess, BuildResult::Substituted);
}
void PathSubstitutionGoal::handleChildOutput(Descriptor fd, std::string_view data)
{
} }
void PathSubstitutionGoal::handleEOF(Descriptor fd) void PathSubstitutionGoal::handleEOF(Descriptor fd)
{ {
if (fd == outPipe.readSide.get()) worker.wakeUp(shared_from_this()); worker.wakeUp(shared_from_this());
} }

View file

@ -1,14 +1,16 @@
#pragma once #pragma once
///@file ///@file
#include "worker.hh"
#include "store-api.hh" #include "store-api.hh"
#include "goal.hh" #include "goal.hh"
#include "muxable-pipe.hh" #include "muxable-pipe.hh"
#include <coroutine>
#include <future>
#include <source_location>
namespace nix { namespace nix {
class Worker;
struct PathSubstitutionGoal : public Goal struct PathSubstitutionGoal : public Goal
{ {
/** /**
@ -17,30 +19,9 @@ struct PathSubstitutionGoal : public Goal
StorePath storePath; StorePath storePath;
/** /**
* The path the substituter refers to the path as. This will be * Whether to try to repair a valid path.
* different when the stores have different names.
*/ */
std::optional<StorePath> subPath; RepairFlag repair;
/**
* The remaining substituters.
*/
std::list<ref<Store>> subs;
/**
* The current substituter.
*/
std::shared_ptr<Store> sub;
/**
* Whether a substituter failed.
*/
bool substituterFailed = false;
/**
* Path info returned by the substituter's query info operation.
*/
std::shared_ptr<const ValidPathInfo> info;
/** /**
* Pipe for the substituter's standard output. * Pipe for the substituter's standard output.
@ -52,31 +33,15 @@ struct PathSubstitutionGoal : public Goal
*/ */
std::thread thr; std::thread thr;
std::promise<void> promise;
/**
* Whether to try to repair a valid path.
*/
RepairFlag repair;
/**
* Location where we're downloading the substitute. Differs from
* storePath when doing a repair.
*/
Path destPath;
std::unique_ptr<MaintainCount<uint64_t>> maintainExpectedSubstitutions, std::unique_ptr<MaintainCount<uint64_t>> maintainExpectedSubstitutions,
maintainRunningSubstitutions, maintainExpectedNar, maintainExpectedDownload; maintainRunningSubstitutions, maintainExpectedNar, maintainExpectedDownload;
typedef void (PathSubstitutionGoal::*GoalState)();
GoalState state;
/** /**
* Content address for recomputing store path * Content address for recomputing store path
*/ */
std::optional<ContentAddress> ca; std::optional<ContentAddress> ca;
void done( Done done(
ExitCode result, ExitCode result,
BuildResult::Status status, BuildResult::Status status,
std::optional<std::string> errorMsg = {}); std::optional<std::string> errorMsg = {});
@ -96,22 +61,18 @@ public:
return "a$" + std::string(storePath.name()) + "$" + worker.store.printStorePath(storePath); return "a$" + std::string(storePath.name()) + "$" + worker.store.printStorePath(storePath);
} }
void work() override;
/** /**
* The states. * The states.
*/ */
void init(); Co init() override;
void tryNext(); Co gotInfo();
void gotInfo(); Co tryToRun(StorePath subPath, nix::ref<Store> sub, std::shared_ptr<const ValidPathInfo> info, bool& substituterFailed);
void referencesValid(); Co finished();
void tryToRun();
void finished();
/** /**
* Callback used by the worker to write to the log. * Callback used by the worker to write to the log.
*/ */
void handleChildOutput(Descriptor fd, std::string_view data) override; void handleChildOutput(Descriptor fd, std::string_view data) override {};
void handleEOF(Descriptor fd) override; void handleEOF(Descriptor fd) override;
/* Called by destructor, can't be overridden */ /* Called by destructor, can't be overridden */

View file

@ -337,9 +337,7 @@ void Worker::run(const Goals & _topGoals)
/* Wait for input. */ /* Wait for input. */
if (!children.empty() || !waitingForAWhile.empty()) if (!children.empty() || !waitingForAWhile.empty())
waitForInput(); waitForInput();
else { else if (awake.empty() && 0U == settings.maxBuildJobs) {
if (awake.empty() && 0U == settings.maxBuildJobs)
{
if (getMachines().empty()) if (getMachines().empty())
throw Error( throw Error(
R"( R"(
@ -359,9 +357,7 @@ void Worker::run(const Goals & _topGoals)
)" )"
); );
} } else assert(!awake.empty());
assert(!awake.empty());
}
} }
/* If --keep-going is not set, it's possible that the main goal /* If --keep-going is not set, it's possible that the main goal

View file

@ -177,7 +177,7 @@ void LocalDerivationGoal::killSandbox(bool getStats)
} }
void LocalDerivationGoal::tryLocalBuild() Goal::Co LocalDerivationGoal::tryLocalBuild()
{ {
#if __APPLE__ #if __APPLE__
additionalSandboxProfile = parsedDrv->getStringAttr("__sandboxProfile").value_or(""); additionalSandboxProfile = parsedDrv->getStringAttr("__sandboxProfile").value_or("");
@ -185,10 +185,10 @@ void LocalDerivationGoal::tryLocalBuild()
unsigned int curBuilds = worker.getNrLocalBuilds(); unsigned int curBuilds = worker.getNrLocalBuilds();
if (curBuilds >= settings.maxBuildJobs) { if (curBuilds >= settings.maxBuildJobs) {
state = &DerivationGoal::tryToBuild;
worker.waitForBuildSlot(shared_from_this()); worker.waitForBuildSlot(shared_from_this());
outputLocks.unlock(); outputLocks.unlock();
return; co_await Suspend{};
co_return tryToBuild();
} }
assert(derivationType); assert(derivationType);
@ -242,7 +242,8 @@ void LocalDerivationGoal::tryLocalBuild()
actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting, actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting,
fmt("waiting for a free build user ID for '%s'", Magenta(worker.store.printStorePath(drvPath)))); fmt("waiting for a free build user ID for '%s'", Magenta(worker.store.printStorePath(drvPath))));
worker.waitForAWhile(shared_from_this()); worker.waitForAWhile(shared_from_this());
return; co_await Suspend{};
co_return tryLocalBuild();
} }
} }
@ -257,15 +258,13 @@ void LocalDerivationGoal::tryLocalBuild()
outputLocks.unlock(); outputLocks.unlock();
buildUser.reset(); buildUser.reset();
worker.permanentFailure = true; worker.permanentFailure = true;
done(BuildResult::InputRejected, {}, std::move(e)); co_return done(BuildResult::InputRejected, {}, std::move(e));
return;
} }
/* This state will be reached when we get EOF on the child's
log pipe. */
state = &DerivationGoal::buildDone;
started(); started();
co_await Suspend{};
// after EOF on child
co_return buildDone();
} }
static void chmod_(const Path & path, mode_t mode) static void chmod_(const Path & path, mode_t mode)

View file

@ -198,7 +198,7 @@ struct LocalDerivationGoal : public DerivationGoal
/** /**
* The additional states. * The additional states.
*/ */
void tryLocalBuild() override; Goal::Co tryLocalBuild() override;
/** /**
* Start building a derivation. * Start building a derivation.