nix-super/src/libstore/build/substitution-goal.cc
Eelco Dolstra 8a29052cb2 PathSubstitutionGoal: Clean up pipe
If there were many top-level goals (which are not destroyed until the
very end), commands like

  $ nix copy --to 'ssh://localhost?remote-store=/tmp/nix' \
    /run/current-system --no-check-sigs --substitute-on-destination

could fail with "Too many open files". So now we do some explicit
cleanup from amDone(). It would be cleaner to separate goals from
their temporary internal state, but that would be a bigger refactor.
2021-04-07 12:21:31 +02:00

303 lines
8.3 KiB
C++

#include "worker.hh"
#include "substitution-goal.hh"
#include "nar-info.hh"
#include "finally.hh"
namespace nix {
PathSubstitutionGoal::PathSubstitutionGoal(const StorePath & storePath, Worker & worker, RepairFlag repair, std::optional<ContentAddress> ca)
: Goal(worker)
, storePath(storePath)
, repair(repair)
, ca(ca)
{
state = &PathSubstitutionGoal::init;
name = fmt("substitution of '%s'", worker.store.printStorePath(this->storePath));
trace("created");
maintainExpectedSubstitutions = std::make_unique<MaintainCount<uint64_t>>(worker.expectedSubstitutions);
}
PathSubstitutionGoal::~PathSubstitutionGoal()
{
cleanup();
}
void PathSubstitutionGoal::work()
{
(this->*state)();
}
void PathSubstitutionGoal::init()
{
trace("init");
worker.store.addTempRoot(storePath);
/* If the path already exists we're done. */
if (!repair && worker.store.isValidPath(storePath)) {
amDone(ecSuccess);
return;
}
if (settings.readOnlyMode)
throw Error("cannot substitute path '%s' - no write access to the Nix store", worker.store.printStorePath(storePath));
subs = settings.useSubstitutes ? getDefaultSubstituters() : std::list<ref<Store>>();
tryNext();
}
void PathSubstitutionGoal::tryNext()
{
trace("trying next substituter");
cleanup();
if (subs.size() == 0) {
/* None left. Terminate this goal and let someone else deal
with it. */
debug("path '%s' is required, but there is no substituter that can build it", worker.store.printStorePath(storePath));
/* Hack: don't indicate failure if there were no substituters.
In that case the calling derivation should just do a
build. */
amDone(substituterFailed ? ecFailed : ecNoSubstituters);
if (substituterFailed) {
worker.failedSubstitutions++;
worker.updateProgress();
}
return;
}
sub = subs.front();
subs.pop_front();
if (ca) {
subPath = sub->makeFixedOutputPathFromCA(storePath.name(), *ca);
if (sub->storeDir == worker.store.storeDir)
assert(subPath == storePath);
} else if (sub->storeDir != worker.store.storeDir) {
tryNext();
return;
}
try {
// FIXME: make async
info = sub->queryPathInfo(subPath ? *subPath : storePath);
} catch (InvalidPath &) {
tryNext();
return;
} catch (SubstituterDisabled &) {
if (settings.tryFallback) {
tryNext();
return;
}
throw;
} catch (Error & e) {
if (settings.tryFallback) {
logError(e.info());
tryNext();
return;
}
throw;
}
if (info->path != storePath) {
if (info->isContentAddressed(*sub) && info->references.empty()) {
auto info2 = std::make_shared<ValidPathInfo>(*info);
info2->path = storePath;
info = info2;
} else {
printError("asked '%s' for '%s' but got '%s'",
sub->getUri(), worker.store.printStorePath(storePath), sub->printStorePath(info->path));
tryNext();
return;
}
}
/* Update the total expected download size. */
auto narInfo = std::dynamic_pointer_cast<const NarInfo>(info);
maintainExpectedNar = std::make_unique<MaintainCount<uint64_t>>(worker.expectedNarSize, info->narSize);
maintainExpectedDownload =
narInfo && narInfo->fileSize
? std::make_unique<MaintainCount<uint64_t>>(worker.expectedDownloadSize, narInfo->fileSize)
: nullptr;
worker.updateProgress();
/* Bail out early if this substituter lacks a valid
signature. LocalStore::addToStore() also checks for this, but
only after we've downloaded the path. */
if (!sub->isTrusted && worker.store.pathInfoIsUntrusted(*info))
{
warn("substituter '%s' does not have a valid signature for path '%s'",
sub->getUri(), worker.store.printStorePath(storePath));
tryNext();
return;
}
/* To maintain the closure invariant, we first have to realise the
paths referenced by this one. */
for (auto & i : info->references)
if (i != storePath) /* ignore self-references */
addWaitee(worker.makePathSubstitutionGoal(i));
if (waitees.empty()) /* to prevent hang (no wake-up event) */
referencesValid();
else
state = &PathSubstitutionGoal::referencesValid;
}
void PathSubstitutionGoal::referencesValid()
{
trace("all references realised");
if (nrFailed > 0) {
debug("some references of path '%s' could not be realised", worker.store.printStorePath(storePath));
amDone(nrNoSubstituters > 0 || nrIncompleteClosure > 0 ? ecIncompleteClosure : ecFailed);
return;
}
for (auto & i : info->references)
if (i != storePath) /* ignore self-references */
assert(worker.store.isValidPath(i));
state = &PathSubstitutionGoal::tryToRun;
worker.wakeUp(shared_from_this());
}
void PathSubstitutionGoal::tryToRun()
{
trace("trying to run");
/* Make sure that we are allowed to start a build. Note that even
if maxBuildJobs == 0 (no local builds allowed), we still allow
a substituter to run. This is because substitutions cannot be
distributed to another machine via the build hook. */
if (worker.getNrLocalBuilds() >= std::max(1U, (unsigned int) settings.maxBuildJobs)) {
worker.waitForBuildSlot(shared_from_this());
return;
}
maintainRunningSubstitutions = std::make_unique<MaintainCount<uint64_t>>(worker.runningSubstitutions);
worker.updateProgress();
outPipe.create();
promise = std::promise<void>();
thr = std::thread([this]() {
try {
/* Wake up the worker loop when we're done. */
Finally updateStats([this]() { outPipe.writeSide.close(); });
Activity act(*logger, actSubstitute, Logger::Fields{worker.store.printStorePath(storePath), sub->getUri()});
PushActivity pact(act.id);
copyStorePath(ref<Store>(sub), ref<Store>(worker.store.shared_from_this()),
subPath ? *subPath : storePath, repair, sub->isTrusted ? NoCheckSigs : CheckSigs);
promise.set_value();
} catch (...) {
promise.set_exception(std::current_exception());
}
});
worker.childStarted(shared_from_this(), {outPipe.readSide.get()}, true, false);
state = &PathSubstitutionGoal::finished;
}
void PathSubstitutionGoal::finished()
{
trace("substitute finished");
thr.join();
worker.childTerminated(this);
try {
promise.get_future().get();
} catch (std::exception & e) {
printError(e.what());
/* Cause the parent build to fail unless --fallback is given,
or the substitute has disappeared. The latter case behaves
the same as the substitute never having existed in the
first place. */
try {
throw;
} catch (SubstituteGone &) {
} catch (...) {
substituterFailed = true;
}
/* Try the next substitute. */
state = &PathSubstitutionGoal::tryNext;
worker.wakeUp(shared_from_this());
return;
}
worker.markContentsGood(storePath);
printMsg(lvlChatty, "substitution of path '%s' succeeded", worker.store.printStorePath(storePath));
maintainRunningSubstitutions.reset();
maintainExpectedSubstitutions.reset();
worker.doneSubstitutions++;
if (maintainExpectedDownload) {
auto fileSize = maintainExpectedDownload->delta;
maintainExpectedDownload.reset();
worker.doneDownloadSize += fileSize;
}
worker.doneNarSize += maintainExpectedNar->delta;
maintainExpectedNar.reset();
worker.updateProgress();
amDone(ecSuccess);
}
void PathSubstitutionGoal::handleChildOutput(int fd, const string & data)
{
}
void PathSubstitutionGoal::handleEOF(int fd)
{
if (fd == outPipe.readSide.get()) worker.wakeUp(shared_from_this());
}
void PathSubstitutionGoal::cleanup()
{
try {
if (thr.joinable()) {
// FIXME: signal worker thread to quit.
thr.join();
worker.childTerminated(this);
}
outPipe.close();
} catch (...) {
ignoreException();
}
}
}