From 846869da0ed0580beb7f827b303fef9a8386de37 Mon Sep 17 00:00:00 2001 From: Las Safin Date: Mon, 15 Jul 2024 21:49:15 +0100 Subject: [PATCH] Make goals use C++20 coroutines (#11005) undefined --- src/libstore/build/derivation-goal.cc | 141 +++++---- src/libstore/build/derivation-goal.hh | 32 +- .../build/drv-output-substitution-goal.cc | 204 ++++++------- .../build/drv-output-substitution-goal.hh | 37 +-- src/libstore/build/goal.cc | 114 ++++++- src/libstore/build/goal.hh | 283 +++++++++++++++++- src/libstore/build/substitution-goal.cc | 244 +++++++-------- src/libstore/build/substitution-goal.hh | 63 +--- src/libstore/build/worker.cc | 40 ++- .../unix/build/local-derivation-goal.cc | 19 +- .../unix/build/local-derivation-goal.hh | 2 +- 11 files changed, 708 insertions(+), 471 deletions(-) diff --git a/src/libstore/build/derivation-goal.cc b/src/libstore/build/derivation-goal.cc index f795b05a1..010f905d6 100644 --- a/src/libstore/build/derivation-goal.cc +++ b/src/libstore/build/derivation-goal.cc @@ -36,6 +36,14 @@ namespace nix { +Goal::Co DerivationGoal::init() { + if (useDerivation) { + co_return getDerivation(); + } else { + co_return haveDerivation(); + } +} + DerivationGoal::DerivationGoal(const StorePath & drvPath, const OutputsSpec & wantedOutputs, Worker & worker, BuildMode buildMode) : Goal(worker, DerivedPath::Built { .drvPath = makeConstantStorePathRef(drvPath), .outputs = wantedOutputs }) @@ -44,7 +52,6 @@ DerivationGoal::DerivationGoal(const StorePath & drvPath, , wantedOutputs(wantedOutputs) , buildMode(buildMode) { - state = &DerivationGoal::getDerivation; name = fmt( "building of '%s' from .drv file", DerivedPath::Built { makeConstantStorePathRef(drvPath), wantedOutputs }.to_string(worker.store)); @@ -65,7 +72,6 @@ DerivationGoal::DerivationGoal(const StorePath & drvPath, const BasicDerivation { this->drv = std::make_unique(drv); - state = &DerivationGoal::haveDerivation; name = fmt( "building of '%s' from in-memory derivation", DerivedPath::Built { makeConstantStorePathRef(drvPath), drv.outputNames() }.to_string(worker.store)); @@ -109,13 +115,9 @@ void DerivationGoal::killChild() void DerivationGoal::timedOut(Error && ex) { killChild(); - done(BuildResult::TimedOut, {}, std::move(ex)); -} - - -void DerivationGoal::work() -{ - (this->*state)(); + // We're not inside a coroutine, hence we can't use co_return here. + // Thus we ignore the return value. + [[maybe_unused]] Done _ = done(BuildResult::TimedOut, {}, std::move(ex)); } void DerivationGoal::addWantedOutputs(const OutputsSpec & outputs) @@ -139,7 +141,7 @@ void DerivationGoal::addWantedOutputs(const OutputsSpec & outputs) } -void DerivationGoal::getDerivation() +Goal::Co DerivationGoal::getDerivation() { trace("init"); @@ -147,23 +149,22 @@ void DerivationGoal::getDerivation() exists. If it doesn't, it may be created through a substitute. */ if (buildMode == bmNormal && worker.evalStore.isValidPath(drvPath)) { - loadDerivation(); - return; + co_return loadDerivation(); } addWaitee(upcast_goal(worker.makePathSubstitutionGoal(drvPath))); - state = &DerivationGoal::loadDerivation; + co_await Suspend{}; + co_return loadDerivation(); } -void DerivationGoal::loadDerivation() +Goal::Co DerivationGoal::loadDerivation() { trace("loading derivation"); if (nrFailed != 0) { - done(BuildResult::MiscFailure, {}, Error("cannot build missing derivation '%s'", worker.store.printStorePath(drvPath))); - return; + co_return done(BuildResult::MiscFailure, {}, Error("cannot build missing derivation '%s'", worker.store.printStorePath(drvPath))); } /* `drvPath' should already be a root, but let's be on the safe @@ -185,11 +186,11 @@ void DerivationGoal::loadDerivation() } assert(drv); - haveDerivation(); + co_return haveDerivation(); } -void DerivationGoal::haveDerivation() +Goal::Co DerivationGoal::haveDerivation() { trace("have derivation"); @@ -217,8 +218,7 @@ void DerivationGoal::haveDerivation() }); } - gaveUpOnSubstitution(); - return; + co_return gaveUpOnSubstitution(); } for (auto & i : drv->outputsAndOptPaths(worker.store)) @@ -240,8 +240,7 @@ void DerivationGoal::haveDerivation() /* If they are all valid, then we're done. */ if (allValid && buildMode == bmNormal) { - done(BuildResult::AlreadyValid, std::move(validOutputs)); - return; + co_return done(BuildResult::AlreadyValid, std::move(validOutputs)); } /* We are first going to try to create the invalid output paths @@ -268,24 +267,21 @@ void DerivationGoal::haveDerivation() } } - if (waitees.empty()) /* to prevent hang (no wake-up event) */ - outputsSubstitutionTried(); - else - state = &DerivationGoal::outputsSubstitutionTried; + if (!waitees.empty()) co_await Suspend{}; /* to prevent hang (no wake-up event) */ + co_return outputsSubstitutionTried(); } -void DerivationGoal::outputsSubstitutionTried() +Goal::Co DerivationGoal::outputsSubstitutionTried() { trace("all outputs substituted (maybe)"); assert(!drv->type().isImpure()); if (nrFailed > 0 && nrFailed > nrNoSubstituters + nrIncompleteClosure && !settings.tryFallback) { - done(BuildResult::TransientFailure, {}, + co_return done(BuildResult::TransientFailure, {}, Error("some substitutes for the outputs of derivation '%s' failed (usually happens due to networking issues); try '--fallback' to build derivation from source ", worker.store.printStorePath(drvPath))); - return; } /* If the substitutes form an incomplete closure, then we should @@ -319,32 +315,29 @@ void DerivationGoal::outputsSubstitutionTried() if (needRestart == NeedRestartForMoreOutputs::OutputsAddedDoNeed) { needRestart = NeedRestartForMoreOutputs::OutputsUnmodifedDontNeed; - haveDerivation(); - return; + co_return haveDerivation(); } auto [allValid, validOutputs] = checkPathValidity(); if (buildMode == bmNormal && allValid) { - done(BuildResult::Substituted, std::move(validOutputs)); - return; + co_return done(BuildResult::Substituted, std::move(validOutputs)); } if (buildMode == bmRepair && allValid) { - repairClosure(); - return; + co_return repairClosure(); } if (buildMode == bmCheck && !allValid) throw Error("some outputs of '%s' are not valid, so checking is not possible", worker.store.printStorePath(drvPath)); /* Nothing to wait for; tail call */ - gaveUpOnSubstitution(); + co_return gaveUpOnSubstitution(); } /* At least one of the output paths could not be produced using a substitute. So we have to build instead. */ -void DerivationGoal::gaveUpOnSubstitution() +Goal::Co DerivationGoal::gaveUpOnSubstitution() { /* At this point we are building all outputs, so if more are wanted there is no need to restart. */ @@ -405,14 +398,12 @@ void DerivationGoal::gaveUpOnSubstitution() addWaitee(upcast_goal(worker.makePathSubstitutionGoal(i))); } - if (waitees.empty()) /* to prevent hang (no wake-up event) */ - inputsRealised(); - else - state = &DerivationGoal::inputsRealised; + if (!waitees.empty()) co_await Suspend{}; /* to prevent hang (no wake-up event) */ + co_return inputsRealised(); } -void DerivationGoal::repairClosure() +Goal::Co DerivationGoal::repairClosure() { assert(!drv->type().isImpure()); @@ -466,41 +457,39 @@ void DerivationGoal::repairClosure() } if (waitees.empty()) { - done(BuildResult::AlreadyValid, assertPathValidity()); - return; + co_return done(BuildResult::AlreadyValid, assertPathValidity()); + } else { + co_await Suspend{}; + co_return closureRepaired(); } - - state = &DerivationGoal::closureRepaired; } -void DerivationGoal::closureRepaired() +Goal::Co DerivationGoal::closureRepaired() { trace("closure repaired"); if (nrFailed > 0) throw Error("some paths in the output closure of derivation '%s' could not be repaired", worker.store.printStorePath(drvPath)); - done(BuildResult::AlreadyValid, assertPathValidity()); + co_return done(BuildResult::AlreadyValid, assertPathValidity()); } -void DerivationGoal::inputsRealised() +Goal::Co DerivationGoal::inputsRealised() { trace("all inputs realised"); if (nrFailed != 0) { if (!useDerivation) throw Error("some dependencies of '%s' are missing", worker.store.printStorePath(drvPath)); - done(BuildResult::DependencyFailed, {}, Error( + co_return done(BuildResult::DependencyFailed, {}, Error( "%s dependencies of derivation '%s' failed to build", nrFailed, worker.store.printStorePath(drvPath))); - return; } if (retrySubstitution == RetrySubstitution::YesNeed) { retrySubstitution = RetrySubstitution::AlreadyRetried; - haveDerivation(); - return; + co_return haveDerivation(); } /* Gather information necessary for computing the closure and/or @@ -566,8 +555,8 @@ void DerivationGoal::inputsRealised() pathResolved, wantedOutputs, buildMode); addWaitee(resolvedDrvGoal); - state = &DerivationGoal::resolvedFinished; - return; + co_await Suspend{}; + co_return resolvedFinished(); } std::function::ChildNode &)> accumInputPaths; @@ -631,8 +620,9 @@ void DerivationGoal::inputsRealised() /* Okay, try to build. Note that here we don't wait for a build slot to become available, since we don't need one if there is a build hook. */ - state = &DerivationGoal::tryToBuild; worker.wakeUp(shared_from_this()); + co_await Suspend{}; + co_return tryToBuild(); } void DerivationGoal::started() @@ -657,7 +647,7 @@ void DerivationGoal::started() worker.updateProgress(); } -void DerivationGoal::tryToBuild() +Goal::Co DerivationGoal::tryToBuild() { trace("trying to build"); @@ -693,7 +683,8 @@ void DerivationGoal::tryToBuild() actLock = std::make_unique(*logger, lvlWarn, actBuildWaiting, fmt("waiting for lock on %s", Magenta(showPaths(lockFiles)))); worker.waitForAWhile(shared_from_this()); - return; + co_await Suspend{}; + co_return tryToBuild(); } actLock.reset(); @@ -710,8 +701,7 @@ void DerivationGoal::tryToBuild() if (buildMode != bmCheck && allValid) { debug("skipping build of derivation '%s', someone beat us to it", worker.store.printStorePath(drvPath)); outputLocks.setDeletion(true); - done(BuildResult::AlreadyValid, std::move(validOutputs)); - return; + co_return done(BuildResult::AlreadyValid, std::move(validOutputs)); } /* If any of the outputs already exist but are not valid, delete @@ -737,9 +727,9 @@ void DerivationGoal::tryToBuild() EOF from the hook. */ actLock.reset(); buildResult.startTime = time(0); // inexact - state = &DerivationGoal::buildDone; started(); - return; + co_await Suspend{}; + co_return buildDone(); case rpPostpone: /* Not now; wait until at least one child finishes or the wake-up timeout expires. */ @@ -748,7 +738,8 @@ void DerivationGoal::tryToBuild() fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath)))); worker.waitForAWhile(shared_from_this()); outputLocks.unlock(); - return; + co_await Suspend{}; + co_return tryToBuild(); case rpDecline: /* We should do it ourselves. */ break; @@ -757,11 +748,12 @@ void DerivationGoal::tryToBuild() actLock.reset(); - state = &DerivationGoal::tryLocalBuild; worker.wakeUp(shared_from_this()); + co_await Suspend{}; + co_return tryLocalBuild(); } -void DerivationGoal::tryLocalBuild() { +Goal::Co DerivationGoal::tryLocalBuild() { throw Error( R"( Unable to build with a primary store that isn't a local store; @@ -938,7 +930,7 @@ void runPostBuildHook( }); } -void DerivationGoal::buildDone() +Goal::Co DerivationGoal::buildDone() { trace("build done"); @@ -1033,7 +1025,7 @@ void DerivationGoal::buildDone() outputLocks.setDeletion(true); outputLocks.unlock(); - done(BuildResult::Built, std::move(builtOutputs)); + co_return done(BuildResult::Built, std::move(builtOutputs)); } catch (BuildError & e) { outputLocks.unlock(); @@ -1058,12 +1050,11 @@ void DerivationGoal::buildDone() BuildResult::PermanentFailure; } - done(st, {}, std::move(e)); - return; + co_return done(st, {}, std::move(e)); } } -void DerivationGoal::resolvedFinished() +Goal::Co DerivationGoal::resolvedFinished() { trace("resolved derivation finished"); @@ -1131,7 +1122,7 @@ void DerivationGoal::resolvedFinished() if (status == BuildResult::AlreadyValid) status = BuildResult::ResolvesToAlreadyValid; - done(status, std::move(builtOutputs)); + co_return done(status, std::move(builtOutputs)); } HookReply DerivationGoal::tryBuildHook() @@ -1325,7 +1316,9 @@ void DerivationGoal::handleChildOutput(Descriptor fd, std::string_view data) logSize += data.size(); if (settings.maxLogSize && logSize > settings.maxLogSize) { killChild(); - done( + // We're not inside a coroutine, hence we can't use co_return here. + // Thus we ignore the return value. + [[maybe_unused]] Done _ = done( BuildResult::LogLimitExceeded, {}, Error("%s killed after writing more than %d bytes of log output", getName(), settings.maxLogSize)); @@ -1531,7 +1524,7 @@ SingleDrvOutputs DerivationGoal::assertPathValidity() } -void DerivationGoal::done( +Goal::Done DerivationGoal::done( BuildResult::Status status, SingleDrvOutputs builtOutputs, std::optional ex) @@ -1568,7 +1561,7 @@ void DerivationGoal::done( fs << worker.store.printStorePath(drvPath) << "\t" << buildResult.toString() << std::endl; } - amDone(buildResult.success() ? ecSuccess : ecFailed, std::move(ex)); + return amDone(buildResult.success() ? ecSuccess : ecFailed, std::move(ex)); } diff --git a/src/libstore/build/derivation-goal.hh b/src/libstore/build/derivation-goal.hh index 04f13aedd..ad3d9ca2a 100644 --- a/src/libstore/build/derivation-goal.hh +++ b/src/libstore/build/derivation-goal.hh @@ -194,9 +194,6 @@ struct DerivationGoal : public Goal */ std::optional derivationType; - typedef void (DerivationGoal::*GoalState)(); - GoalState state; - BuildMode buildMode; std::unique_ptr> mcExpectedBuilds, mcRunningBuilds; @@ -227,8 +224,6 @@ struct DerivationGoal : public Goal std::string key() override; - void work() override; - /** * Add wanted outputs to an already existing derivation goal. */ @@ -237,18 +232,19 @@ struct DerivationGoal : public Goal /** * The states. */ - void getDerivation(); - void loadDerivation(); - void haveDerivation(); - void outputsSubstitutionTried(); - void gaveUpOnSubstitution(); - void closureRepaired(); - void inputsRealised(); - void tryToBuild(); - virtual void tryLocalBuild(); - void buildDone(); + Co init() override; + Co getDerivation(); + Co loadDerivation(); + Co haveDerivation(); + Co outputsSubstitutionTried(); + Co gaveUpOnSubstitution(); + Co closureRepaired(); + Co inputsRealised(); + Co tryToBuild(); + virtual Co tryLocalBuild(); + Co buildDone(); - void resolvedFinished(); + Co resolvedFinished(); /** * Is the build hook willing to perform the build? @@ -329,11 +325,11 @@ struct DerivationGoal : public Goal */ virtual void killChild(); - void repairClosure(); + Co repairClosure(); void started(); - void done( + Done done( BuildResult::Status status, SingleDrvOutputs builtOutputs = {}, std::optional ex = {}); diff --git a/src/libstore/build/drv-output-substitution-goal.cc b/src/libstore/build/drv-output-substitution-goal.cc index 13a07e4ea..02284d93c 100644 --- a/src/libstore/build/drv-output-substitution-goal.cc +++ b/src/libstore/build/drv-output-substitution-goal.cc @@ -14,146 +14,135 @@ DrvOutputSubstitutionGoal::DrvOutputSubstitutionGoal( : Goal(worker, DerivedPath::Opaque { StorePath::dummy }) , id(id) { - state = &DrvOutputSubstitutionGoal::init; name = fmt("substitution of '%s'", id.to_string()); trace("created"); } -void DrvOutputSubstitutionGoal::init() +Goal::Co DrvOutputSubstitutionGoal::init() { trace("init"); /* If the derivation already exists, we’re done */ if (worker.store.queryRealisation(id)) { - amDone(ecSuccess); - return; + co_return amDone(ecSuccess); } - subs = settings.useSubstitutes ? getDefaultSubstituters() : std::list>(); - tryNext(); -} + auto subs = settings.useSubstitutes ? getDefaultSubstituters() : std::list>(); -void DrvOutputSubstitutionGoal::tryNext() -{ - trace("trying next substituter"); + bool substituterFailed = false; - if (subs.size() == 0) { - /* None left. Terminate this goal and let someone else deal - with it. */ - debug("derivation output '%s' is required, but there is no substituter that can provide it", id.to_string()); + for (auto sub : subs) { + trace("trying next substituter"); - /* Hack: don't indicate failure if there were no substituters. - In that case the calling derivation should just do a - build. */ - amDone(substituterFailed ? ecFailed : ecNoSubstituters); + /* The callback of the curl download below can outlive `this` (if + some other error occurs), so it must not touch `this`. So put + the shared state in a separate refcounted object. */ + auto outPipe = std::make_shared(); + #ifndef _WIN32 + outPipe->create(); + #else + outPipe->createAsyncPipe(worker.ioport.get()); + #endif - if (substituterFailed) { - worker.failedSubstitutions++; - worker.updateProgress(); + auto promise = std::make_shared>>(); + + sub->queryRealisation( + id, + { [outPipe(outPipe), promise(promise)](std::future> res) { + try { + Finally updateStats([&]() { outPipe->writeSide.close(); }); + promise->set_value(res.get()); + } catch (...) { + promise->set_exception(std::current_exception()); + } + } }); + + worker.childStarted(shared_from_this(), { + #ifndef _WIN32 + outPipe->readSide.get() + #else + &outPipe + #endif + }, true, false); + + co_await Suspend{}; + + worker.childTerminated(this); + + /* + * The realisation corresponding to the given output id. + * Will be filled once we can get it. + */ + std::shared_ptr outputInfo; + + try { + outputInfo = promise->get_future().get(); + } catch (std::exception & e) { + printError(e.what()); + substituterFailed = true; } - return; + if (!outputInfo) continue; + + bool failed = false; + + for (const auto & [depId, depPath] : outputInfo->dependentRealisations) { + if (depId != id) { + if (auto localOutputInfo = worker.store.queryRealisation(depId); + localOutputInfo && localOutputInfo->outPath != depPath) { + warn( + "substituter '%s' has an incompatible realisation for '%s', ignoring.\n" + "Local: %s\n" + "Remote: %s", + sub->getUri(), + depId.to_string(), + worker.store.printStorePath(localOutputInfo->outPath), + worker.store.printStorePath(depPath) + ); + failed = true; + break; + } + addWaitee(worker.makeDrvOutputSubstitutionGoal(depId)); + } + } + + if (failed) continue; + + co_return realisationFetched(outputInfo, sub); } - sub = subs.front(); - subs.pop_front(); + /* None left. Terminate this goal and let someone else deal + with it. */ + debug("derivation output '%s' is required, but there is no substituter that can provide it", id.to_string()); - // FIXME: Make async - // outputInfo = sub->queryRealisation(id); + if (substituterFailed) { + worker.failedSubstitutions++; + worker.updateProgress(); + } - /* The callback of the curl download below can outlive `this` (if - some other error occurs), so it must not touch `this`. So put - the shared state in a separate refcounted object. */ - downloadState = std::make_shared(); -#ifndef _WIN32 - downloadState->outPipe.create(); -#else - downloadState->outPipe.createAsyncPipe(worker.ioport.get()); -#endif - - sub->queryRealisation( - id, - { [downloadState(downloadState)](std::future> res) { - try { - Finally updateStats([&]() { downloadState->outPipe.writeSide.close(); }); - downloadState->promise.set_value(res.get()); - } catch (...) { - downloadState->promise.set_exception(std::current_exception()); - } - } }); - - worker.childStarted(shared_from_this(), { -#ifndef _WIN32 - downloadState->outPipe.readSide.get() -#else - &downloadState->outPipe -#endif - }, true, false); - - state = &DrvOutputSubstitutionGoal::realisationFetched; + /* Hack: don't indicate failure if there were no substituters. + In that case the calling derivation should just do a + build. */ + co_return amDone(substituterFailed ? ecFailed : ecNoSubstituters); } -void DrvOutputSubstitutionGoal::realisationFetched() -{ - worker.childTerminated(this); - - try { - outputInfo = downloadState->promise.get_future().get(); - } catch (std::exception & e) { - printError(e.what()); - substituterFailed = true; - } - - if (!outputInfo) { - return tryNext(); - } - - for (const auto & [depId, depPath] : outputInfo->dependentRealisations) { - if (depId != id) { - if (auto localOutputInfo = worker.store.queryRealisation(depId); - localOutputInfo && localOutputInfo->outPath != depPath) { - warn( - "substituter '%s' has an incompatible realisation for '%s', ignoring.\n" - "Local: %s\n" - "Remote: %s", - sub->getUri(), - depId.to_string(), - worker.store.printStorePath(localOutputInfo->outPath), - worker.store.printStorePath(depPath) - ); - tryNext(); - return; - } - addWaitee(worker.makeDrvOutputSubstitutionGoal(depId)); - } - } - +Goal::Co DrvOutputSubstitutionGoal::realisationFetched(std::shared_ptr outputInfo, nix::ref sub) { addWaitee(worker.makePathSubstitutionGoal(outputInfo->outPath)); - if (waitees.empty()) outPathValid(); - else state = &DrvOutputSubstitutionGoal::outPathValid; -} + if (!waitees.empty()) co_await Suspend{}; -void DrvOutputSubstitutionGoal::outPathValid() -{ - assert(outputInfo); trace("output path substituted"); if (nrFailed > 0) { debug("The output path of the derivation output '%s' could not be substituted", id.to_string()); - amDone(nrNoSubstituters > 0 || nrIncompleteClosure > 0 ? ecIncompleteClosure : ecFailed); - return; + co_return amDone(nrNoSubstituters > 0 || nrIncompleteClosure > 0 ? ecIncompleteClosure : ecFailed); } worker.store.registerDrvOutput(*outputInfo); - finished(); -} -void DrvOutputSubstitutionGoal::finished() -{ trace("finished"); - amDone(ecSuccess); + co_return amDone(ecSuccess); } std::string DrvOutputSubstitutionGoal::key() @@ -163,14 +152,9 @@ std::string DrvOutputSubstitutionGoal::key() return "a$" + std::string(id.to_string()); } -void DrvOutputSubstitutionGoal::work() -{ - (this->*state)(); -} - void DrvOutputSubstitutionGoal::handleEOF(Descriptor fd) { - if (fd == downloadState->outPipe.readSide.get()) worker.wakeUp(shared_from_this()); + worker.wakeUp(shared_from_this()); } diff --git a/src/libstore/build/drv-output-substitution-goal.hh b/src/libstore/build/drv-output-substitution-goal.hh index 6967ca84f..807054926 100644 --- a/src/libstore/build/drv-output-substitution-goal.hh +++ b/src/libstore/build/drv-output-substitution-goal.hh @@ -27,52 +27,19 @@ class DrvOutputSubstitutionGoal : public Goal { */ DrvOutput id; - /** - * The realisation corresponding to the given output id. - * Will be filled once we can get it. - */ - std::shared_ptr outputInfo; - - /** - * The remaining substituters. - */ - std::list> subs; - - /** - * The current substituter. - */ - std::shared_ptr sub; - - struct DownloadState - { - MuxablePipe outPipe; - std::promise> promise; - }; - - std::shared_ptr downloadState; - - /** - * Whether a substituter failed. - */ - bool substituterFailed = false; - public: DrvOutputSubstitutionGoal(const DrvOutput& id, Worker & worker, RepairFlag repair = NoRepair, std::optional ca = std::nullopt); typedef void (DrvOutputSubstitutionGoal::*GoalState)(); GoalState state; - void init(); - void tryNext(); - void realisationFetched(); - void outPathValid(); - void finished(); + Co init() override; + Co realisationFetched(std::shared_ptr outputInfo, nix::ref sub); void timedOut(Error && ex) override { abort(); }; std::string key() override; - void work() override; void handleEOF(Descriptor fd) override; JobCategory jobCategory() const override { diff --git a/src/libstore/build/goal.cc b/src/libstore/build/goal.cc index f8db98280..9a16da145 100644 --- a/src/libstore/build/goal.cc +++ b/src/libstore/build/goal.cc @@ -3,6 +3,97 @@ namespace nix { +using Co = nix::Goal::Co; +using promise_type = nix::Goal::promise_type; +using handle_type = nix::Goal::handle_type; +using Suspend = nix::Goal::Suspend; + +Co::Co(Co&& rhs) { + this->handle = rhs.handle; + rhs.handle = nullptr; +} +void Co::operator=(Co&& rhs) { + this->handle = rhs.handle; + rhs.handle = nullptr; +} +Co::~Co() { + if (handle) { + handle.promise().alive = false; + handle.destroy(); + } +} + +Co promise_type::get_return_object() { + auto handle = handle_type::from_promise(*this); + return Co{handle}; +}; + +std::coroutine_handle<> promise_type::final_awaiter::await_suspend(handle_type h) noexcept { + auto& p = h.promise(); + auto goal = p.goal; + assert(goal); + goal->trace("in final_awaiter"); + auto c = std::move(p.continuation); + + if (c) { + // We still have a continuation, i.e. work to do. + // We assert that the goal is still busy. + assert(goal->exitCode == ecBusy); + assert(goal->top_co); // Goal must have an active coroutine. + assert(goal->top_co->handle == h); // The active coroutine must be us. + assert(p.alive); // We must not have been destructed. + + // we move continuation to the top, + // note: previous top_co is actually h, so by moving into it, + // we're calling the destructor on h, DON'T use h and p after this! + + // We move our continuation into `top_co`, i.e. the marker for the active continuation. + // By doing this we destruct the old `top_co`, i.e. us, so `h` can't be used anymore. + // Be careful not to access freed memory! + goal->top_co = std::move(c); + + // We resume `top_co`. + return goal->top_co->handle; + } else { + // We have no continuation, i.e. no more work to do, + // so the goal must not be busy anymore. + assert(goal->exitCode != ecBusy); + + // We reset `top_co` for good measure. + p.goal->top_co = {}; + + // We jump to the noop coroutine, which doesn't do anything and immediately suspends. + // This passes control back to the caller of goal.work(). + return std::noop_coroutine(); + } +} + +void promise_type::return_value(Co&& next) { + goal->trace("return_value(Co&&)"); + // Save old continuation. + auto old_continuation = std::move(continuation); + // We set next as our continuation. + continuation = std::move(next); + // We set next's goal, and thus it must not have one already. + assert(!continuation->handle.promise().goal); + continuation->handle.promise().goal = goal; + // Nor can next have a continuation, as we set it to our old one. + assert(!continuation->handle.promise().continuation); + continuation->handle.promise().continuation = std::move(old_continuation); +} + +std::coroutine_handle<> nix::Goal::Co::await_suspend(handle_type caller) { + assert(handle); // we must be a valid coroutine + auto& p = handle.promise(); + assert(!p.continuation); // we must have no continuation + assert(!p.goal); // we must not have a goal yet + auto goal = caller.promise().goal; + assert(goal); + p.goal = goal; + p.continuation = std::move(goal->top_co); // we set our continuation to be top_co (i.e. caller) + goal->top_co = std::move(*this); // we set top_co to ourselves, don't use this anymore after this! + return p.goal->top_co->handle; // we execute ourselves +} bool CompareGoalPtrs::operator() (const GoalPtr & a, const GoalPtr & b) const { std::string s1 = a->key(); @@ -75,10 +166,10 @@ void Goal::waiteeDone(GoalPtr waitee, ExitCode result) } } - -void Goal::amDone(ExitCode result, std::optional ex) +Goal::Done Goal::amDone(ExitCode result, std::optional ex) { trace("done"); + assert(top_co); assert(exitCode == ecBusy); assert(result == ecSuccess || result == ecFailed || result == ecNoSubstituters || result == ecIncompleteClosure); exitCode = result; @@ -98,6 +189,13 @@ void Goal::amDone(ExitCode result, std::optional ex) worker.removeGoal(shared_from_this()); cleanup(); + + // We drop the continuation. + // In `final_awaiter` this will signal that there is no more work to be done. + top_co->handle.promise().continuation = {}; + + // won't return to caller because of logic in final_awaiter + return Done{}; } @@ -106,4 +204,16 @@ void Goal::trace(std::string_view s) debug("%1%: %2%", name, s); } +void Goal::work() +{ + assert(top_co); + assert(top_co->handle); + assert(top_co->handle.promise().alive); + top_co->handle.resume(); + // We either should be in a state where we can be work()-ed again, + // or we should be done. + assert(top_co || exitCode != ecBusy); +} + + } diff --git a/src/libstore/build/goal.hh b/src/libstore/build/goal.hh index 0d9b828e1..162c392d0 100644 --- a/src/libstore/build/goal.hh +++ b/src/libstore/build/goal.hh @@ -1,10 +1,11 @@ #pragma once ///@file -#include "types.hh" #include "store-api.hh" #include "build-result.hh" +#include + namespace nix { /** @@ -103,9 +104,263 @@ protected: * Build result. */ BuildResult buildResult; - public: + /** + * Suspend our goal and wait until we get @ref work()-ed again. + * `co_await`-able by @ref Co. + */ + struct Suspend {}; + + /** + * Return from the current coroutine and suspend our goal + * if we're not busy anymore, or jump to the next coroutine + * set to be executed/resumed. + */ + struct Return {}; + + /** + * `co_return`-ing this will end the goal. + * If you're not inside a coroutine, you can safely discard this. + */ + struct [[nodiscard]] Done { + private: + Done(){} + + friend Goal; + }; + + // forward declaration of promise_type, see below + struct promise_type; + + /** + * Handle to coroutine using @ref Co and @ref promise_type. + */ + using handle_type = std::coroutine_handle; + + /** + * C++20 coroutine wrapper for use in goal logic. + * Coroutines are functions that use `co_await`/`co_return` (and `co_yield`, but not supported by @ref Co). + * + * @ref Co is meant to be used by methods of subclasses of @ref Goal. + * The main functionality provided by `Co` is + * - `co_await Suspend{}`: Suspends the goal. + * - `co_await f()`: Waits until `f()` finishes. + * - `co_return f()`: Tail-calls `f()`. + * - `co_return Return{}`: Ends coroutine. + * + * The idea is that you implement the goal logic using coroutines, + * and do the core thing a goal can do, suspension, when you have + * children you're waiting for. + * Coroutines allow you to resume the work cleanly. + * + * @note Brief explanation of C++20 coroutines: + * When you `Co f()`, a `std::coroutine_handle` is created, + * alongside its @ref promise_type. + * There are suspension points at the beginning of the coroutine, + * at every `co_await`, and at the final (possibly implicit) `co_return`. + * Once suspended, you can resume the `std::coroutine_handle` by doing `coroutine_handle.resume()`. + * Suspension points are implemented by passing a struct to the compiler + * that implements `await_sus`pend. + * `await_suspend` can either say "cancel suspension", in which case execution resumes, + * "suspend", in which case control is passed back to the caller of `coroutine_handle.resume()` + * or the place where the coroutine function is initially executed in the case of the initial + * suspension, or `await_suspend` can specify another coroutine to jump to, which is + * how tail calls are implemented. + * + * @note Resources: + * - https://lewissbaker.github.io/ + * - https://www.chiark.greenend.org.uk/~sgtatham/quasiblog/coroutines-c++20/ + * - https://www.scs.stanford.edu/~dm/blog/c++-coroutines.html + * + * @todo Allocate explicitly on stack since HALO thing doesn't really work, + * specifically, there's no way to uphold the requirements when trying to do + * tail-calls without using a trampoline AFAICT. + * + * @todo Support returning data natively + */ + struct [[nodiscard]] Co { + /** + * The underlying handle. + */ + handle_type handle; + + explicit Co(handle_type handle) : handle(handle) {}; + void operator=(Co&&); + Co(Co&& rhs); + ~Co(); + + bool await_ready() { return false; }; + /** + * When we `co_await` another @ref Co-returning coroutine, + * we tell the caller of `caller_coroutine.resume()` to switch to our coroutine (@ref handle). + * To make sure we return to the original coroutine, we set it as the continuation of our + * coroutine. In @ref promise_type::final_awaiter we check if it's set and if so we return to it. + * + * To explain in more understandable terms: + * When we `co_await Co_returning_function()`, this function is called on the resultant @ref Co of + * the _called_ function, and C++ automatically passes the caller in. + * + * `goal` field of @ref promise_type is also set here by copying it from the caller. + */ + std::coroutine_handle<> await_suspend(handle_type handle); + void await_resume() {}; + }; + + /** + * Used on initial suspend, does the same as @ref std::suspend_always, + * but asserts that everything has been set correctly. + */ + struct InitialSuspend { + /** + * Handle of coroutine that does the + * initial suspend + */ + handle_type handle; + + bool await_ready() { return false; }; + void await_suspend(handle_type handle_) { + handle = handle_; + } + void await_resume() { + assert(handle); + assert(handle.promise().goal); // goal must be set + assert(handle.promise().goal->top_co); // top_co of goal must be set + assert(handle.promise().goal->top_co->handle == handle); // top_co of goal must be us + } + }; + + /** + * Promise type for coroutines defined using @ref Co. + * Attached to coroutine handle. + */ + struct promise_type { + /** + * Either this is who called us, or it is who we will tail-call. + * It is what we "jump" to once we are done. + */ + std::optional continuation; + + /** + * The goal that we're a part of. + * Set either in @ref Co::await_suspend or in constructor of @ref Goal. + */ + Goal* goal = nullptr; + + /** + * Is set to false when destructed to ensure we don't use a + * destructed coroutine by accident + */ + bool alive = true; + + /** + * The awaiter used by @ref final_suspend. + */ + struct final_awaiter { + bool await_ready() noexcept { return false; }; + /** + * Here we execute our continuation, by passing it back to the caller. + * C++ compiler will create code that takes that and executes it promptly. + * `h` is the handle for the coroutine that is finishing execution, + * thus it must be destroyed. + */ + std::coroutine_handle<> await_suspend(handle_type h) noexcept; + void await_resume() noexcept { assert(false); }; + }; + + /** + * Called by compiler generated code to construct the @ref Co + * that is returned from a @ref Co-returning coroutine. + */ + Co get_return_object(); + + /** + * Called by compiler generated code before body of coroutine. + * We use this opportunity to set the @ref goal field + * and `top_co` field of @ref Goal. + */ + InitialSuspend initial_suspend() { return {}; }; + + /** + * Called on `co_return`. Creates @ref final_awaiter which + * either jumps to continuation or suspends goal. + */ + final_awaiter final_suspend() noexcept { return {}; }; + + /** + * Does nothing, but provides an opportunity for + * @ref final_suspend to happen. + */ + void return_value(Return) {} + + /** + * Does nothing, but provides an opportunity for + * @ref final_suspend to happen. + */ + void return_value(Done) {} + + /** + * When "returning" another coroutine, what happens is that + * we set it as our own continuation, thus once the final suspend + * happens, we transfer control to it. + * The original continuation we had is set as the continuation + * of the coroutine passed in. + * @ref final_suspend is called after this, and @ref final_awaiter will + * pass control off to @ref continuation. + * + * If we already have a continuation, that continuation is set as + * the continuation of the new continuation. Thus, the continuation + * passed to @ref return_value must not have a continuation set. + */ + void return_value(Co&&); + + /** + * If an exception is thrown inside a coroutine, + * we re-throw it in the context of the "resumer" of the continuation. + */ + void unhandled_exception() { throw; }; + + /** + * Allows awaiting a @ref Co. + */ + Co&& await_transform(Co&& co) { return static_cast(co); } + + /** + * Allows awaiting a @ref Suspend. + * Always suspends. + */ + std::suspend_always await_transform(Suspend) { return {}; }; + }; + + /** + * The coroutine being currently executed. + * MUST be updated when switching the coroutine being executed. + * This is used both for memory management and to resume the last + * coroutine executed. + * Destroying this should destroy all coroutines created for this goal. + */ + std::optional top_co; + + /** + * The entry point for the goal + */ + virtual Co init() = 0; + + /** + * Wrapper around @ref init since virtual functions + * can't be used in constructors. + */ + inline Co init_wrapper(); + + /** + * Signals that the goal is done. + * `co_return` the result. If you're not inside a coroutine, you can ignore + * the return value safely. + */ + Done amDone(ExitCode result, std::optional ex = {}); + + virtual void cleanup() { } + /** * Project a `BuildResult` with just the information that pertains * to the given request. @@ -124,15 +379,20 @@ public: std::optional ex; Goal(Worker & worker, DerivedPath path) - : worker(worker) - { } + : worker(worker), top_co(init_wrapper()) + { + // top_co shouldn't have a goal already, should be nullptr. + assert(!top_co->handle.promise().goal); + // we set it such that top_co can pass it down to its subcoroutines. + top_co->handle.promise().goal = this; + } virtual ~Goal() { trace("goal destroyed"); } - virtual void work() = 0; + void work(); void addWaitee(GoalPtr waitee); @@ -164,10 +424,6 @@ public: virtual std::string key() = 0; - void amDone(ExitCode result, std::optional ex = {}); - - virtual void cleanup() { } - /** * @brief Hint for the scheduler, which concurrency limit applies. * @see JobCategory @@ -178,3 +434,12 @@ public: void addToWeakGoals(WeakGoals & goals, GoalPtr p); } + +template +struct std::coroutine_traits { + using promise_type = nix::Goal::promise_type; +}; + +nix::Goal::Co nix::Goal::init_wrapper() { + co_return init(); +} diff --git a/src/libstore/build/substitution-goal.cc b/src/libstore/build/substitution-goal.cc index 0be3d1e8d..7deeb4748 100644 --- a/src/libstore/build/substitution-goal.cc +++ b/src/libstore/build/substitution-goal.cc @@ -3,6 +3,7 @@ #include "nar-info.hh" #include "finally.hh" #include "signals.hh" +#include namespace nix { @@ -12,7 +13,6 @@ PathSubstitutionGoal::PathSubstitutionGoal(const StorePath & storePath, Worker & , repair(repair) , ca(ca) { - state = &PathSubstitutionGoal::init; name = fmt("substitution of '%s'", worker.store.printStorePath(this->storePath)); trace("created"); maintainExpectedSubstitutions = std::make_unique>(worker.expectedSubstitutions); @@ -25,7 +25,7 @@ PathSubstitutionGoal::~PathSubstitutionGoal() } -void PathSubstitutionGoal::done( +Goal::Done PathSubstitutionGoal::done( ExitCode result, BuildResult::Status status, std::optional errorMsg) @@ -35,17 +35,11 @@ void PathSubstitutionGoal::done( debug(*errorMsg); buildResult.errorMsg = *errorMsg; } - amDone(result); + return amDone(result); } -void PathSubstitutionGoal::work() -{ - (this->*state)(); -} - - -void PathSubstitutionGoal::init() +Goal::Co PathSubstitutionGoal::init() { trace("init"); @@ -53,152 +47,135 @@ void PathSubstitutionGoal::init() /* If the path already exists we're done. */ if (!repair && worker.store.isValidPath(storePath)) { - done(ecSuccess, BuildResult::AlreadyValid); - return; + co_return done(ecSuccess, BuildResult::AlreadyValid); } if (settings.readOnlyMode) throw Error("cannot substitute path '%s' - no write access to the Nix store", worker.store.printStorePath(storePath)); - subs = settings.useSubstitutes ? getDefaultSubstituters() : std::list>(); + auto subs = settings.useSubstitutes ? getDefaultSubstituters() : std::list>(); - tryNext(); -} + bool substituterFailed = false; + for (auto sub : subs) { + trace("trying next substituter"); -void PathSubstitutionGoal::tryNext() -{ - trace("trying next substituter"); + cleanup(); - cleanup(); + /* The path the substituter refers to the path as. This will be + * different when the stores have different names. */ + std::optional subPath; - if (subs.size() == 0) { - /* None left. Terminate this goal and let someone else deal - with it. */ + /* Path info returned by the substituter's query info operation. */ + std::shared_ptr info; - /* Hack: don't indicate failure if there were no substituters. - In that case the calling derivation should just do a - build. */ - done( - substituterFailed ? ecFailed : ecNoSubstituters, - BuildResult::NoSubstituters, - fmt("path '%s' is required, but there is no substituter that can build it", worker.store.printStorePath(storePath))); - - if (substituterFailed) { - worker.failedSubstitutions++; - worker.updateProgress(); + if (ca) { + subPath = sub->makeFixedOutputPathFromCA( + std::string { storePath.name() }, + ContentAddressWithReferences::withoutRefs(*ca)); + if (sub->storeDir == worker.store.storeDir) + assert(subPath == storePath); + } else if (sub->storeDir != worker.store.storeDir) { + continue; } - return; - } - - sub = subs.front(); - subs.pop_front(); - - if (ca) { - subPath = sub->makeFixedOutputPathFromCA( - std::string { storePath.name() }, - ContentAddressWithReferences::withoutRefs(*ca)); - if (sub->storeDir == worker.store.storeDir) - assert(subPath == storePath); - } else if (sub->storeDir != worker.store.storeDir) { - tryNext(); - return; - } - - try { - // FIXME: make async - info = sub->queryPathInfo(subPath ? *subPath : storePath); - } catch (InvalidPath &) { - tryNext(); - return; - } catch (SubstituterDisabled &) { - if (settings.tryFallback) { - tryNext(); - return; + try { + // FIXME: make async + info = sub->queryPathInfo(subPath ? *subPath : storePath); + } catch (InvalidPath &) { + continue; + } catch (SubstituterDisabled & e) { + if (settings.tryFallback) continue; + else throw e; + } catch (Error & e) { + if (settings.tryFallback) { + logError(e.info()); + continue; + } else throw e; } - throw; - } catch (Error & e) { - if (settings.tryFallback) { - logError(e.info()); - tryNext(); - return; + + if (info->path != storePath) { + if (info->isContentAddressed(*sub) && info->references.empty()) { + auto info2 = std::make_shared(*info); + info2->path = storePath; + info = info2; + } else { + printError("asked '%s' for '%s' but got '%s'", + sub->getUri(), worker.store.printStorePath(storePath), sub->printStorePath(info->path)); + continue; + } } - throw; + + /* Update the total expected download size. */ + auto narInfo = std::dynamic_pointer_cast(info); + + maintainExpectedNar = std::make_unique>(worker.expectedNarSize, info->narSize); + + maintainExpectedDownload = + narInfo && narInfo->fileSize + ? std::make_unique>(worker.expectedDownloadSize, narInfo->fileSize) + : nullptr; + + worker.updateProgress(); + + /* Bail out early if this substituter lacks a valid + signature. LocalStore::addToStore() also checks for this, but + only after we've downloaded the path. */ + if (!sub->isTrusted && worker.store.pathInfoIsUntrusted(*info)) + { + warn("ignoring substitute for '%s' from '%s', as it's not signed by any of the keys in 'trusted-public-keys'", + worker.store.printStorePath(storePath), sub->getUri()); + continue; + } + + /* To maintain the closure invariant, we first have to realise the + paths referenced by this one. */ + for (auto & i : info->references) + if (i != storePath) /* ignore self-references */ + addWaitee(worker.makePathSubstitutionGoal(i)); + + if (!waitees.empty()) co_await Suspend{}; + + // FIXME: consider returning boolean instead of passing in reference + bool out = false; // is mutated by tryToRun + co_await tryToRun(subPath ? *subPath : storePath, sub, info, out); + substituterFailed = substituterFailed || out; } - if (info->path != storePath) { - if (info->isContentAddressed(*sub) && info->references.empty()) { - auto info2 = std::make_shared(*info); - info2->path = storePath; - info = info2; - } else { - printError("asked '%s' for '%s' but got '%s'", - sub->getUri(), worker.store.printStorePath(storePath), sub->printStorePath(info->path)); - tryNext(); - return; - } - } - - /* Update the total expected download size. */ - auto narInfo = std::dynamic_pointer_cast(info); - - maintainExpectedNar = std::make_unique>(worker.expectedNarSize, info->narSize); - - maintainExpectedDownload = - narInfo && narInfo->fileSize - ? std::make_unique>(worker.expectedDownloadSize, narInfo->fileSize) - : nullptr; + /* None left. Terminate this goal and let someone else deal + with it. */ + worker.failedSubstitutions++; worker.updateProgress(); - /* Bail out early if this substituter lacks a valid - signature. LocalStore::addToStore() also checks for this, but - only after we've downloaded the path. */ - if (!sub->isTrusted && worker.store.pathInfoIsUntrusted(*info)) - { - warn("ignoring substitute for '%s' from '%s', as it's not signed by any of the keys in 'trusted-public-keys'", - worker.store.printStorePath(storePath), sub->getUri()); - tryNext(); - return; - } - - /* To maintain the closure invariant, we first have to realise the - paths referenced by this one. */ - for (auto & i : info->references) - if (i != storePath) /* ignore self-references */ - addWaitee(worker.makePathSubstitutionGoal(i)); - - if (waitees.empty()) /* to prevent hang (no wake-up event) */ - referencesValid(); - else - state = &PathSubstitutionGoal::referencesValid; + /* Hack: don't indicate failure if there were no substituters. + In that case the calling derivation should just do a + build. */ + co_return done( + substituterFailed ? ecFailed : ecNoSubstituters, + BuildResult::NoSubstituters, + fmt("path '%s' is required, but there is no substituter that can build it", worker.store.printStorePath(storePath))); } -void PathSubstitutionGoal::referencesValid() +Goal::Co PathSubstitutionGoal::tryToRun(StorePath subPath, nix::ref sub, std::shared_ptr info, bool& substituterFailed) { trace("all references realised"); if (nrFailed > 0) { - done( + co_return done( nrNoSubstituters > 0 || nrIncompleteClosure > 0 ? ecIncompleteClosure : ecFailed, BuildResult::DependencyFailed, fmt("some references of path '%s' could not be realised", worker.store.printStorePath(storePath))); - return; } for (auto & i : info->references) if (i != storePath) /* ignore self-references */ assert(worker.store.isValidPath(i)); - state = &PathSubstitutionGoal::tryToRun; worker.wakeUp(shared_from_this()); -} + co_await Suspend{}; - -void PathSubstitutionGoal::tryToRun() -{ trace("trying to run"); /* Make sure that we are allowed to start a substitution. Note that even @@ -206,10 +183,10 @@ void PathSubstitutionGoal::tryToRun() prevents infinite waiting. */ if (worker.getNrSubstitutions() >= std::max(1U, (unsigned int) settings.maxSubstitutionJobs)) { worker.waitForBuildSlot(shared_from_this()); - return; + co_await Suspend{}; } - maintainRunningSubstitutions = std::make_unique>(worker.runningSubstitutions); + auto maintainRunningSubstitutions = std::make_unique>(worker.runningSubstitutions); worker.updateProgress(); #ifndef _WIN32 @@ -218,9 +195,9 @@ void PathSubstitutionGoal::tryToRun() outPipe.createAsyncPipe(worker.ioport.get()); #endif - promise = std::promise(); + auto promise = std::promise(); - thr = std::thread([this]() { + thr = std::thread([this, &promise, &subPath, &sub]() { try { ReceiveInterrupts receiveInterrupts; @@ -231,7 +208,7 @@ void PathSubstitutionGoal::tryToRun() PushActivity pact(act.id); copyStorePath(*sub, worker.store, - subPath ? *subPath : storePath, repair, sub->isTrusted ? NoCheckSigs : CheckSigs); + subPath, repair, sub->isTrusted ? NoCheckSigs : CheckSigs); promise.set_value(); } catch (...) { @@ -247,12 +224,8 @@ void PathSubstitutionGoal::tryToRun() #endif }, true, false); - state = &PathSubstitutionGoal::finished; -} + co_await Suspend{}; - -void PathSubstitutionGoal::finished() -{ trace("substitute finished"); thr.join(); @@ -274,10 +247,7 @@ void PathSubstitutionGoal::finished() substituterFailed = true; } - /* Try the next substitute. */ - state = &PathSubstitutionGoal::tryNext; - worker.wakeUp(shared_from_this()); - return; + co_return Return{}; } worker.markContentsGood(storePath); @@ -295,23 +265,19 @@ void PathSubstitutionGoal::finished() worker.doneDownloadSize += fileSize; } + assert(maintainExpectedNar); worker.doneNarSize += maintainExpectedNar->delta; maintainExpectedNar.reset(); worker.updateProgress(); - done(ecSuccess, BuildResult::Substituted); -} - - -void PathSubstitutionGoal::handleChildOutput(Descriptor fd, std::string_view data) -{ + co_return done(ecSuccess, BuildResult::Substituted); } void PathSubstitutionGoal::handleEOF(Descriptor fd) { - if (fd == outPipe.readSide.get()) worker.wakeUp(shared_from_this()); + worker.wakeUp(shared_from_this()); } diff --git a/src/libstore/build/substitution-goal.hh b/src/libstore/build/substitution-goal.hh index 1a051fc1f..86e4f5423 100644 --- a/src/libstore/build/substitution-goal.hh +++ b/src/libstore/build/substitution-goal.hh @@ -1,14 +1,16 @@ #pragma once ///@file +#include "worker.hh" #include "store-api.hh" #include "goal.hh" #include "muxable-pipe.hh" +#include +#include +#include namespace nix { -class Worker; - struct PathSubstitutionGoal : public Goal { /** @@ -17,30 +19,9 @@ struct PathSubstitutionGoal : public Goal StorePath storePath; /** - * The path the substituter refers to the path as. This will be - * different when the stores have different names. + * Whether to try to repair a valid path. */ - std::optional subPath; - - /** - * The remaining substituters. - */ - std::list> subs; - - /** - * The current substituter. - */ - std::shared_ptr sub; - - /** - * Whether a substituter failed. - */ - bool substituterFailed = false; - - /** - * Path info returned by the substituter's query info operation. - */ - std::shared_ptr info; + RepairFlag repair; /** * Pipe for the substituter's standard output. @@ -52,31 +33,15 @@ struct PathSubstitutionGoal : public Goal */ std::thread thr; - std::promise promise; - - /** - * Whether to try to repair a valid path. - */ - RepairFlag repair; - - /** - * Location where we're downloading the substitute. Differs from - * storePath when doing a repair. - */ - Path destPath; - std::unique_ptr> maintainExpectedSubstitutions, maintainRunningSubstitutions, maintainExpectedNar, maintainExpectedDownload; - typedef void (PathSubstitutionGoal::*GoalState)(); - GoalState state; - /** * Content address for recomputing store path */ std::optional ca; - void done( + Done done( ExitCode result, BuildResult::Status status, std::optional errorMsg = {}); @@ -96,22 +61,18 @@ public: return "a$" + std::string(storePath.name()) + "$" + worker.store.printStorePath(storePath); } - void work() override; - /** * The states. */ - void init(); - void tryNext(); - void gotInfo(); - void referencesValid(); - void tryToRun(); - void finished(); + Co init() override; + Co gotInfo(); + Co tryToRun(StorePath subPath, nix::ref sub, std::shared_ptr info, bool& substituterFailed); + Co finished(); /** * Callback used by the worker to write to the log. */ - void handleChildOutput(Descriptor fd, std::string_view data) override; + void handleChildOutput(Descriptor fd, std::string_view data) override {}; void handleEOF(Descriptor fd) override; /* Called by destructor, can't be overridden */ diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc index 8a5d6de72..7fc41b121 100644 --- a/src/libstore/build/worker.cc +++ b/src/libstore/build/worker.cc @@ -337,31 +337,27 @@ void Worker::run(const Goals & _topGoals) /* Wait for input. */ if (!children.empty() || !waitingForAWhile.empty()) waitForInput(); - else { - if (awake.empty() && 0U == settings.maxBuildJobs) - { - if (getMachines().empty()) - throw Error( - R"( - Unable to start any build; - either increase '--max-jobs' or enable remote builds. + else if (awake.empty() && 0U == settings.maxBuildJobs) { + if (getMachines().empty()) + throw Error( + R"( + Unable to start any build; + either increase '--max-jobs' or enable remote builds. - For more information run 'man nix.conf' and search for '/machines'. - )" - ); - else - throw Error( - R"( - Unable to start any build; - remote machines may not have all required system features. + For more information run 'man nix.conf' and search for '/machines'. + )" + ); + else + throw Error( + R"( + Unable to start any build; + remote machines may not have all required system features. - For more information run 'man nix.conf' and search for '/machines'. - )" - ); + For more information run 'man nix.conf' and search for '/machines'. + )" + ); - } - assert(!awake.empty()); - } + } else assert(!awake.empty()); } /* If --keep-going is not set, it's possible that the main goal diff --git a/src/libstore/unix/build/local-derivation-goal.cc b/src/libstore/unix/build/local-derivation-goal.cc index c3a65e34b..f968bbc5b 100644 --- a/src/libstore/unix/build/local-derivation-goal.cc +++ b/src/libstore/unix/build/local-derivation-goal.cc @@ -177,7 +177,7 @@ void LocalDerivationGoal::killSandbox(bool getStats) } -void LocalDerivationGoal::tryLocalBuild() +Goal::Co LocalDerivationGoal::tryLocalBuild() { #if __APPLE__ additionalSandboxProfile = parsedDrv->getStringAttr("__sandboxProfile").value_or(""); @@ -185,10 +185,10 @@ void LocalDerivationGoal::tryLocalBuild() unsigned int curBuilds = worker.getNrLocalBuilds(); if (curBuilds >= settings.maxBuildJobs) { - state = &DerivationGoal::tryToBuild; worker.waitForBuildSlot(shared_from_this()); outputLocks.unlock(); - return; + co_await Suspend{}; + co_return tryToBuild(); } assert(derivationType); @@ -242,7 +242,8 @@ void LocalDerivationGoal::tryLocalBuild() actLock = std::make_unique(*logger, lvlWarn, actBuildWaiting, fmt("waiting for a free build user ID for '%s'", Magenta(worker.store.printStorePath(drvPath)))); worker.waitForAWhile(shared_from_this()); - return; + co_await Suspend{}; + co_return tryLocalBuild(); } } @@ -257,15 +258,13 @@ void LocalDerivationGoal::tryLocalBuild() outputLocks.unlock(); buildUser.reset(); worker.permanentFailure = true; - done(BuildResult::InputRejected, {}, std::move(e)); - return; + co_return done(BuildResult::InputRejected, {}, std::move(e)); } - /* This state will be reached when we get EOF on the child's - log pipe. */ - state = &DerivationGoal::buildDone; - started(); + co_await Suspend{}; + // after EOF on child + co_return buildDone(); } static void chmod_(const Path & path, mode_t mode) diff --git a/src/libstore/unix/build/local-derivation-goal.hh b/src/libstore/unix/build/local-derivation-goal.hh index 4bcf5c9d4..bf25cf2a6 100644 --- a/src/libstore/unix/build/local-derivation-goal.hh +++ b/src/libstore/unix/build/local-derivation-goal.hh @@ -198,7 +198,7 @@ struct LocalDerivationGoal : public DerivationGoal /** * The additional states. */ - void tryLocalBuild() override; + Goal::Co tryLocalBuild() override; /** * Start building a derivation.