mirror of
https://github.com/privatevoid-net/nix-super.git
synced 2024-11-28 08:36:15 +02:00
325 lines
11 KiB
C++
325 lines
11 KiB
C++
#include "worker-protocol-connection.hh"
|
|
#include "worker-protocol-impl.hh"
|
|
#include "build-result.hh"
|
|
#include "derivations.hh"
|
|
|
|
namespace nix {
|
|
|
|
const std::set<WorkerProto::Feature> WorkerProto::allFeatures{};
|
|
|
|
WorkerProto::BasicClientConnection::~BasicClientConnection()
|
|
{
|
|
try {
|
|
to.flush();
|
|
} catch (...) {
|
|
ignoreExceptionInDestructor();
|
|
}
|
|
}
|
|
|
|
static Logger::Fields readFields(Source & from)
|
|
{
|
|
Logger::Fields fields;
|
|
size_t size = readInt(from);
|
|
for (size_t n = 0; n < size; n++) {
|
|
auto type = (decltype(Logger::Field::type)) readInt(from);
|
|
if (type == Logger::Field::tInt)
|
|
fields.push_back(readNum<uint64_t>(from));
|
|
else if (type == Logger::Field::tString)
|
|
fields.push_back(readString(from));
|
|
else
|
|
throw Error("got unsupported field type %x from Nix daemon", (int) type);
|
|
}
|
|
return fields;
|
|
}
|
|
|
|
std::exception_ptr
|
|
WorkerProto::BasicClientConnection::processStderrReturn(Sink * sink, Source * source, bool flush, bool block)
|
|
{
|
|
if (flush)
|
|
to.flush();
|
|
|
|
std::exception_ptr ex;
|
|
|
|
while (true) {
|
|
|
|
if (!block && !from.hasData())
|
|
break;
|
|
|
|
auto msg = readNum<uint64_t>(from);
|
|
|
|
if (msg == STDERR_WRITE) {
|
|
auto s = readString(from);
|
|
if (!sink)
|
|
throw Error("no sink");
|
|
(*sink)(s);
|
|
}
|
|
|
|
else if (msg == STDERR_READ) {
|
|
if (!source)
|
|
throw Error("no source");
|
|
size_t len = readNum<size_t>(from);
|
|
auto buf = std::make_unique<char[]>(len);
|
|
writeString({(const char *) buf.get(), source->read(buf.get(), len)}, to);
|
|
to.flush();
|
|
}
|
|
|
|
else if (msg == STDERR_ERROR) {
|
|
if (GET_PROTOCOL_MINOR(protoVersion) >= 26) {
|
|
ex = std::make_exception_ptr(readError(from));
|
|
} else {
|
|
auto error = readString(from);
|
|
unsigned int status = readInt(from);
|
|
ex = std::make_exception_ptr(Error(status, error));
|
|
}
|
|
break;
|
|
}
|
|
|
|
else if (msg == STDERR_NEXT)
|
|
printError(chomp(readString(from)));
|
|
|
|
else if (msg == STDERR_START_ACTIVITY) {
|
|
auto act = readNum<ActivityId>(from);
|
|
auto lvl = (Verbosity) readInt(from);
|
|
auto type = (ActivityType) readInt(from);
|
|
auto s = readString(from);
|
|
auto fields = readFields(from);
|
|
auto parent = readNum<ActivityId>(from);
|
|
logger->startActivity(act, lvl, type, s, fields, parent);
|
|
}
|
|
|
|
else if (msg == STDERR_STOP_ACTIVITY) {
|
|
auto act = readNum<ActivityId>(from);
|
|
logger->stopActivity(act);
|
|
}
|
|
|
|
else if (msg == STDERR_RESULT) {
|
|
auto act = readNum<ActivityId>(from);
|
|
auto type = (ResultType) readInt(from);
|
|
auto fields = readFields(from);
|
|
logger->result(act, type, fields);
|
|
}
|
|
|
|
else if (msg == STDERR_LAST) {
|
|
assert(block);
|
|
break;
|
|
}
|
|
|
|
else
|
|
throw Error("got unknown message type %x from Nix daemon", msg);
|
|
}
|
|
|
|
if (!ex) {
|
|
return ex;
|
|
} else {
|
|
try {
|
|
std::rethrow_exception(ex);
|
|
} catch (const Error & e) {
|
|
// Nix versions before #4628 did not have an adequate
|
|
// behavior for reporting that the derivation format was
|
|
// upgraded. To avoid having to add compatibility logic in
|
|
// many places, we expect to catch almost all occurrences of
|
|
// the old incomprehensible error here, so that we can
|
|
// explain to users what's going on when their daemon is
|
|
// older than #4628 (2023).
|
|
if (experimentalFeatureSettings.isEnabled(Xp::DynamicDerivations)
|
|
&& GET_PROTOCOL_MINOR(protoVersion) <= 35) {
|
|
auto m = e.msg();
|
|
if (m.find("parsing derivation") != std::string::npos && m.find("expected string") != std::string::npos
|
|
&& m.find("Derive([") != std::string::npos)
|
|
return std::make_exception_ptr(Error(
|
|
"%s, this might be because the daemon is too old to understand dependencies on dynamic derivations. Check to see if the raw derivation is in the form '%s'",
|
|
std::move(m),
|
|
"Drv WithVersion(..)"));
|
|
}
|
|
return std::current_exception();
|
|
}
|
|
}
|
|
}
|
|
|
|
void WorkerProto::BasicClientConnection::processStderr(
|
|
bool * daemonException, Sink * sink, Source * source, bool flush, bool block)
|
|
{
|
|
auto ex = processStderrReturn(sink, source, flush, block);
|
|
if (ex) {
|
|
*daemonException = true;
|
|
std::rethrow_exception(ex);
|
|
}
|
|
}
|
|
|
|
static std::set<WorkerProto::Feature>
|
|
intersectFeatures(const std::set<WorkerProto::Feature> & a, const std::set<WorkerProto::Feature> & b)
|
|
{
|
|
std::set<WorkerProto::Feature> res;
|
|
for (auto & x : a)
|
|
if (b.contains(x))
|
|
res.insert(x);
|
|
return res;
|
|
}
|
|
|
|
std::tuple<WorkerProto::Version, std::set<WorkerProto::Feature>> WorkerProto::BasicClientConnection::handshake(
|
|
BufferedSink & to,
|
|
Source & from,
|
|
WorkerProto::Version localVersion,
|
|
const std::set<WorkerProto::Feature> & supportedFeatures)
|
|
{
|
|
to << WORKER_MAGIC_1 << localVersion;
|
|
to.flush();
|
|
|
|
unsigned int magic = readInt(from);
|
|
if (magic != WORKER_MAGIC_2)
|
|
throw Error("nix-daemon protocol mismatch from");
|
|
auto daemonVersion = readInt(from);
|
|
|
|
if (GET_PROTOCOL_MAJOR(daemonVersion) != GET_PROTOCOL_MAJOR(PROTOCOL_VERSION))
|
|
throw Error("Nix daemon protocol version not supported");
|
|
if (GET_PROTOCOL_MINOR(daemonVersion) < 10)
|
|
throw Error("the Nix daemon version is too old");
|
|
|
|
auto protoVersion = std::min(daemonVersion, localVersion);
|
|
|
|
/* Exchange features. */
|
|
std::set<WorkerProto::Feature> daemonFeatures;
|
|
if (GET_PROTOCOL_MINOR(protoVersion) >= 38) {
|
|
to << supportedFeatures;
|
|
to.flush();
|
|
daemonFeatures = readStrings<std::set<WorkerProto::Feature>>(from);
|
|
}
|
|
|
|
return {protoVersion, intersectFeatures(daemonFeatures, supportedFeatures)};
|
|
}
|
|
|
|
std::tuple<WorkerProto::Version, std::set<WorkerProto::Feature>> WorkerProto::BasicServerConnection::handshake(
|
|
BufferedSink & to,
|
|
Source & from,
|
|
WorkerProto::Version localVersion,
|
|
const std::set<WorkerProto::Feature> & supportedFeatures)
|
|
{
|
|
unsigned int magic = readInt(from);
|
|
if (magic != WORKER_MAGIC_1)
|
|
throw Error("protocol mismatch");
|
|
to << WORKER_MAGIC_2 << localVersion;
|
|
to.flush();
|
|
auto clientVersion = readInt(from);
|
|
|
|
auto protoVersion = std::min(clientVersion, localVersion);
|
|
|
|
/* Exchange features. */
|
|
std::set<WorkerProto::Feature> clientFeatures;
|
|
if (GET_PROTOCOL_MINOR(protoVersion) >= 38) {
|
|
clientFeatures = readStrings<std::set<WorkerProto::Feature>>(from);
|
|
to << supportedFeatures;
|
|
to.flush();
|
|
}
|
|
|
|
return {protoVersion, intersectFeatures(clientFeatures, supportedFeatures)};
|
|
}
|
|
|
|
WorkerProto::ClientHandshakeInfo WorkerProto::BasicClientConnection::postHandshake(const StoreDirConfig & store)
|
|
{
|
|
WorkerProto::ClientHandshakeInfo res;
|
|
|
|
if (GET_PROTOCOL_MINOR(protoVersion) >= 14) {
|
|
// Obsolete CPU affinity.
|
|
to << 0;
|
|
}
|
|
|
|
if (GET_PROTOCOL_MINOR(protoVersion) >= 11)
|
|
to << false; // obsolete reserveSpace
|
|
|
|
if (GET_PROTOCOL_MINOR(protoVersion) >= 33)
|
|
to.flush();
|
|
|
|
return WorkerProto::Serialise<ClientHandshakeInfo>::read(store, *this);
|
|
}
|
|
|
|
void WorkerProto::BasicServerConnection::postHandshake(const StoreDirConfig & store, const ClientHandshakeInfo & info)
|
|
{
|
|
if (GET_PROTOCOL_MINOR(protoVersion) >= 14 && readInt(from)) {
|
|
// Obsolete CPU affinity.
|
|
readInt(from);
|
|
}
|
|
|
|
if (GET_PROTOCOL_MINOR(protoVersion) >= 11)
|
|
readInt(from); // obsolete reserveSpace
|
|
|
|
WorkerProto::write(store, *this, info);
|
|
}
|
|
|
|
UnkeyedValidPathInfo WorkerProto::BasicClientConnection::queryPathInfo(
|
|
const StoreDirConfig & store, bool * daemonException, const StorePath & path)
|
|
{
|
|
to << WorkerProto::Op::QueryPathInfo << store.printStorePath(path);
|
|
try {
|
|
processStderr(daemonException);
|
|
} catch (Error & e) {
|
|
// Ugly backwards compatibility hack.
|
|
if (e.msg().find("is not valid") != std::string::npos)
|
|
throw InvalidPath(std::move(e.info()));
|
|
throw;
|
|
}
|
|
if (GET_PROTOCOL_MINOR(protoVersion) >= 17) {
|
|
bool valid;
|
|
from >> valid;
|
|
if (!valid)
|
|
throw InvalidPath("path '%s' is not valid", store.printStorePath(path));
|
|
}
|
|
return WorkerProto::Serialise<UnkeyedValidPathInfo>::read(store, *this);
|
|
}
|
|
|
|
StorePathSet WorkerProto::BasicClientConnection::queryValidPaths(
|
|
const StoreDirConfig & store, bool * daemonException, const StorePathSet & paths, SubstituteFlag maybeSubstitute)
|
|
{
|
|
assert(GET_PROTOCOL_MINOR(protoVersion) >= 12);
|
|
to << WorkerProto::Op::QueryValidPaths;
|
|
WorkerProto::write(store, *this, paths);
|
|
if (GET_PROTOCOL_MINOR(protoVersion) >= 27) {
|
|
to << maybeSubstitute;
|
|
}
|
|
processStderr(daemonException);
|
|
return WorkerProto::Serialise<StorePathSet>::read(store, *this);
|
|
}
|
|
|
|
void WorkerProto::BasicClientConnection::addTempRoot(
|
|
const StoreDirConfig & store, bool * daemonException, const StorePath & path)
|
|
{
|
|
to << WorkerProto::Op::AddTempRoot << store.printStorePath(path);
|
|
processStderr(daemonException);
|
|
readInt(from);
|
|
}
|
|
|
|
void WorkerProto::BasicClientConnection::putBuildDerivationRequest(
|
|
const StoreDirConfig & store,
|
|
bool * daemonException,
|
|
const StorePath & drvPath,
|
|
const BasicDerivation & drv,
|
|
BuildMode buildMode)
|
|
{
|
|
to << WorkerProto::Op::BuildDerivation << store.printStorePath(drvPath);
|
|
writeDerivation(to, store, drv);
|
|
to << buildMode;
|
|
}
|
|
|
|
BuildResult
|
|
WorkerProto::BasicClientConnection::getBuildDerivationResponse(const StoreDirConfig & store, bool * daemonException)
|
|
{
|
|
return WorkerProto::Serialise<BuildResult>::read(store, *this);
|
|
}
|
|
|
|
void WorkerProto::BasicClientConnection::narFromPath(
|
|
const StoreDirConfig & store, bool * daemonException, const StorePath & path, std::function<void(Source &)> fun)
|
|
{
|
|
to << WorkerProto::Op::NarFromPath << store.printStorePath(path);
|
|
processStderr(daemonException);
|
|
|
|
fun(from);
|
|
}
|
|
|
|
void WorkerProto::BasicClientConnection::importPaths(
|
|
const StoreDirConfig & store, bool * daemonException, Source & source)
|
|
{
|
|
to << WorkerProto::Op::ImportPaths;
|
|
processStderr(daemonException, 0, &source);
|
|
auto importedPaths = WorkerProto::Serialise<StorePathSet>::read(store, *this);
|
|
assert(importedPaths.size() <= importedPaths.size());
|
|
}
|
|
}
|