Merge pull request #11125 from DeterminateSystems/basic-connection

Factor out commonality between WorkerProto::Basic{Client,Server}Connection
This commit is contained in:
Eelco Dolstra 2024-07-19 14:26:04 +02:00 committed by GitHub
commit c4213f0e6c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 225 additions and 267 deletions

View file

@ -269,26 +269,21 @@ struct ClientSettings
}; };
static void performOp(TunnelLogger * logger, ref<Store> store, static void performOp(TunnelLogger * logger, ref<Store> store,
TrustedFlag trusted, RecursiveFlag recursive, WorkerProto::Version clientVersion, TrustedFlag trusted, RecursiveFlag recursive,
Source & from, BufferedSink & to, WorkerProto::Op op) WorkerProto::BasicServerConnection & conn,
WorkerProto::Op op)
{ {
WorkerProto::ReadConn rconn { WorkerProto::ReadConn rconn(conn);
.from = from, WorkerProto::WriteConn wconn(conn);
.version = clientVersion,
};
WorkerProto::WriteConn wconn {
.to = to,
.version = clientVersion,
};
switch (op) { switch (op) {
case WorkerProto::Op::IsValidPath: { case WorkerProto::Op::IsValidPath: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(conn.from));
logger->startWork(); logger->startWork();
bool result = store->isValidPath(path); bool result = store->isValidPath(path);
logger->stopWork(); logger->stopWork();
to << result; conn.to << result;
break; break;
} }
@ -296,8 +291,8 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn); auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
SubstituteFlag substitute = NoSubstitute; SubstituteFlag substitute = NoSubstitute;
if (GET_PROTOCOL_MINOR(clientVersion) >= 27) { if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 27) {
substitute = readInt(from) ? Substitute : NoSubstitute; substitute = readInt(conn.from) ? Substitute : NoSubstitute;
} }
logger->startWork(); logger->startWork();
@ -311,13 +306,13 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
} }
case WorkerProto::Op::HasSubstitutes: { case WorkerProto::Op::HasSubstitutes: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(conn.from));
logger->startWork(); logger->startWork();
StorePathSet paths; // FIXME StorePathSet paths; // FIXME
paths.insert(path); paths.insert(path);
auto res = store->querySubstitutablePaths(paths); auto res = store->querySubstitutablePaths(paths);
logger->stopWork(); logger->stopWork();
to << (res.count(path) != 0); conn.to << (res.count(path) != 0);
break; break;
} }
@ -331,11 +326,11 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
} }
case WorkerProto::Op::QueryPathHash: { case WorkerProto::Op::QueryPathHash: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(conn.from));
logger->startWork(); logger->startWork();
auto hash = store->queryPathInfo(path)->narHash; auto hash = store->queryPathInfo(path)->narHash;
logger->stopWork(); logger->stopWork();
to << hash.to_string(HashFormat::Base16, false); conn.to << hash.to_string(HashFormat::Base16, false);
break; break;
} }
@ -343,7 +338,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
case WorkerProto::Op::QueryReferrers: case WorkerProto::Op::QueryReferrers:
case WorkerProto::Op::QueryValidDerivers: case WorkerProto::Op::QueryValidDerivers:
case WorkerProto::Op::QueryDerivationOutputs: { case WorkerProto::Op::QueryDerivationOutputs: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(conn.from));
logger->startWork(); logger->startWork();
StorePathSet paths; StorePathSet paths;
if (op == WorkerProto::Op::QueryReferences) if (op == WorkerProto::Op::QueryReferences)
@ -360,16 +355,16 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
} }
case WorkerProto::Op::QueryDerivationOutputNames: { case WorkerProto::Op::QueryDerivationOutputNames: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(conn.from));
logger->startWork(); logger->startWork();
auto names = store->readDerivation(path).outputNames(); auto names = store->readDerivation(path).outputNames();
logger->stopWork(); logger->stopWork();
to << names; conn.to << names;
break; break;
} }
case WorkerProto::Op::QueryDerivationOutputMap: { case WorkerProto::Op::QueryDerivationOutputMap: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(conn.from));
logger->startWork(); logger->startWork();
auto outputs = store->queryPartialDerivationOutputMap(path); auto outputs = store->queryPartialDerivationOutputMap(path);
logger->stopWork(); logger->stopWork();
@ -378,37 +373,37 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
} }
case WorkerProto::Op::QueryDeriver: { case WorkerProto::Op::QueryDeriver: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(conn.from));
logger->startWork(); logger->startWork();
auto info = store->queryPathInfo(path); auto info = store->queryPathInfo(path);
logger->stopWork(); logger->stopWork();
to << (info->deriver ? store->printStorePath(*info->deriver) : ""); conn.to << (info->deriver ? store->printStorePath(*info->deriver) : "");
break; break;
} }
case WorkerProto::Op::QueryPathFromHashPart: { case WorkerProto::Op::QueryPathFromHashPart: {
auto hashPart = readString(from); auto hashPart = readString(conn.from);
logger->startWork(); logger->startWork();
auto path = store->queryPathFromHashPart(hashPart); auto path = store->queryPathFromHashPart(hashPart);
logger->stopWork(); logger->stopWork();
to << (path ? store->printStorePath(*path) : ""); conn.to << (path ? store->printStorePath(*path) : "");
break; break;
} }
case WorkerProto::Op::AddToStore: { case WorkerProto::Op::AddToStore: {
if (GET_PROTOCOL_MINOR(clientVersion) >= 25) { if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 25) {
auto name = readString(from); auto name = readString(conn.from);
auto camStr = readString(from); auto camStr = readString(conn.from);
auto refs = WorkerProto::Serialise<StorePathSet>::read(*store, rconn); auto refs = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
bool repairBool; bool repairBool;
from >> repairBool; conn.from >> repairBool;
auto repair = RepairFlag{repairBool}; auto repair = RepairFlag{repairBool};
logger->startWork(); logger->startWork();
auto pathInfo = [&]() { auto pathInfo = [&]() {
// NB: FramedSource must be out of scope before logger->stopWork(); // NB: FramedSource must be out of scope before logger->stopWork();
auto [contentAddressMethod, hashAlgo] = ContentAddressMethod::parseWithAlgo(camStr); auto [contentAddressMethod, hashAlgo] = ContentAddressMethod::parseWithAlgo(camStr);
FramedSource source(from); FramedSource source(conn.from);
FileSerialisationMethod dumpMethod; FileSerialisationMethod dumpMethod;
switch (contentAddressMethod.getFileIngestionMethod()) { switch (contentAddressMethod.getFileIngestionMethod()) {
case FileIngestionMethod::Flat: case FileIngestionMethod::Flat:
@ -439,7 +434,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
bool fixed; bool fixed;
uint8_t recursive; uint8_t recursive;
std::string hashAlgoRaw; std::string hashAlgoRaw;
from >> baseName >> fixed /* obsolete */ >> recursive >> hashAlgoRaw; conn.from >> baseName >> fixed /* obsolete */ >> recursive >> hashAlgoRaw;
if (recursive > true) if (recursive > true)
throw Error("unsupported FileIngestionMethod with value of %i; you may need to upgrade nix-daemon", recursive); throw Error("unsupported FileIngestionMethod with value of %i; you may need to upgrade nix-daemon", recursive);
method = recursive method = recursive
@ -459,11 +454,11 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
so why all this extra work? We still parse the NAR so so why all this extra work? We still parse the NAR so
that we aren't sending arbitrary data to `saved` that we aren't sending arbitrary data to `saved`
unwittingly`, and we know when the NAR ends so we don't unwittingly`, and we know when the NAR ends so we don't
consume the rest of `from` and can't parse another consume the rest of `conn.from` and can't parse another
command. (We don't trust `addToStoreFromDump` to not command. (We don't trust `addToStoreFromDump` to not
eagerly consume the entire stream it's given, past the eagerly consume the entire stream it's given, past the
length of the Nar. */ length of the Nar. */
TeeSource savedNARSource(from, saved); TeeSource savedNARSource(conn.from, saved);
NullFileSystemObjectSink sink; /* just parse the NAR */ NullFileSystemObjectSink sink; /* just parse the NAR */
parseDump(sink, savedNARSource); parseDump(sink, savedNARSource);
}); });
@ -472,20 +467,20 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
*dumpSource, baseName, FileSerialisationMethod::NixArchive, method, hashAlgo); *dumpSource, baseName, FileSerialisationMethod::NixArchive, method, hashAlgo);
logger->stopWork(); logger->stopWork();
to << store->printStorePath(path); conn.to << store->printStorePath(path);
} }
break; break;
} }
case WorkerProto::Op::AddMultipleToStore: { case WorkerProto::Op::AddMultipleToStore: {
bool repair, dontCheckSigs; bool repair, dontCheckSigs;
from >> repair >> dontCheckSigs; conn.from >> repair >> dontCheckSigs;
if (!trusted && dontCheckSigs) if (!trusted && dontCheckSigs)
dontCheckSigs = false; dontCheckSigs = false;
logger->startWork(); logger->startWork();
{ {
FramedSource source(from); FramedSource source(conn.from);
store->addMultipleToStore(source, store->addMultipleToStore(source,
RepairFlag{repair}, RepairFlag{repair},
dontCheckSigs ? NoCheckSigs : CheckSigs); dontCheckSigs ? NoCheckSigs : CheckSigs);
@ -495,8 +490,8 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
} }
case WorkerProto::Op::AddTextToStore: { case WorkerProto::Op::AddTextToStore: {
std::string suffix = readString(from); std::string suffix = readString(conn.from);
std::string s = readString(from); std::string s = readString(conn.from);
auto refs = WorkerProto::Serialise<StorePathSet>::read(*store, rconn); auto refs = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
logger->startWork(); logger->startWork();
auto path = ({ auto path = ({
@ -504,37 +499,37 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
store->addToStoreFromDump(source, suffix, FileSerialisationMethod::Flat, ContentAddressMethod::Raw::Text, HashAlgorithm::SHA256, refs, NoRepair); store->addToStoreFromDump(source, suffix, FileSerialisationMethod::Flat, ContentAddressMethod::Raw::Text, HashAlgorithm::SHA256, refs, NoRepair);
}); });
logger->stopWork(); logger->stopWork();
to << store->printStorePath(path); conn.to << store->printStorePath(path);
break; break;
} }
case WorkerProto::Op::ExportPath: { case WorkerProto::Op::ExportPath: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(conn.from));
readInt(from); // obsolete readInt(conn.from); // obsolete
logger->startWork(); logger->startWork();
TunnelSink sink(to); TunnelSink sink(conn.to);
store->exportPath(path, sink); store->exportPath(path, sink);
logger->stopWork(); logger->stopWork();
to << 1; conn.to << 1;
break; break;
} }
case WorkerProto::Op::ImportPaths: { case WorkerProto::Op::ImportPaths: {
logger->startWork(); logger->startWork();
TunnelSource source(from, to); TunnelSource source(conn.from, conn.to);
auto paths = store->importPaths(source, auto paths = store->importPaths(source,
trusted ? NoCheckSigs : CheckSigs); trusted ? NoCheckSigs : CheckSigs);
logger->stopWork(); logger->stopWork();
Strings paths2; Strings paths2;
for (auto & i : paths) paths2.push_back(store->printStorePath(i)); for (auto & i : paths) paths2.push_back(store->printStorePath(i));
to << paths2; conn.to << paths2;
break; break;
} }
case WorkerProto::Op::BuildPaths: { case WorkerProto::Op::BuildPaths: {
auto drvs = WorkerProto::Serialise<DerivedPaths>::read(*store, rconn); auto drvs = WorkerProto::Serialise<DerivedPaths>::read(*store, rconn);
BuildMode mode = bmNormal; BuildMode mode = bmNormal;
if (GET_PROTOCOL_MINOR(clientVersion) >= 15) { if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 15) {
mode = WorkerProto::Serialise<BuildMode>::read(*store, rconn); mode = WorkerProto::Serialise<BuildMode>::read(*store, rconn);
/* Repairing is not atomic, so disallowed for "untrusted" /* Repairing is not atomic, so disallowed for "untrusted"
@ -552,7 +547,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
logger->startWork(); logger->startWork();
store->buildPaths(drvs, mode); store->buildPaths(drvs, mode);
logger->stopWork(); logger->stopWork();
to << 1; conn.to << 1;
break; break;
} }
@ -578,7 +573,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
} }
case WorkerProto::Op::BuildDerivation: { case WorkerProto::Op::BuildDerivation: {
auto drvPath = store->parseStorePath(readString(from)); auto drvPath = store->parseStorePath(readString(conn.from));
BasicDerivation drv; BasicDerivation drv;
/* /*
* Note: unlike wopEnsurePath, this operation reads a * Note: unlike wopEnsurePath, this operation reads a
@ -589,7 +584,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
* it cannot be trusted that its outPath was calculated * it cannot be trusted that its outPath was calculated
* correctly. * correctly.
*/ */
readDerivation(from, *store, drv, Derivation::nameFromPath(drvPath)); readDerivation(conn.from, *store, drv, Derivation::nameFromPath(drvPath));
auto buildMode = WorkerProto::Serialise<BuildMode>::read(*store, rconn); auto buildMode = WorkerProto::Serialise<BuildMode>::read(*store, rconn);
logger->startWork(); logger->startWork();
@ -655,20 +650,20 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
} }
case WorkerProto::Op::EnsurePath: { case WorkerProto::Op::EnsurePath: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(conn.from));
logger->startWork(); logger->startWork();
store->ensurePath(path); store->ensurePath(path);
logger->stopWork(); logger->stopWork();
to << 1; conn.to << 1;
break; break;
} }
case WorkerProto::Op::AddTempRoot: { case WorkerProto::Op::AddTempRoot: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(conn.from));
logger->startWork(); logger->startWork();
store->addTempRoot(path); store->addTempRoot(path);
logger->stopWork(); logger->stopWork();
to << 1; conn.to << 1;
break; break;
} }
@ -678,24 +673,24 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
"you are not privileged to create perm roots\n\n" "you are not privileged to create perm roots\n\n"
"hint: you can just do this client-side without special privileges, and probably want to do that instead."); "hint: you can just do this client-side without special privileges, and probably want to do that instead.");
auto storePath = WorkerProto::Serialise<StorePath>::read(*store, rconn); auto storePath = WorkerProto::Serialise<StorePath>::read(*store, rconn);
Path gcRoot = absPath(readString(from)); Path gcRoot = absPath(readString(conn.from));
logger->startWork(); logger->startWork();
auto & localFSStore = require<LocalFSStore>(*store); auto & localFSStore = require<LocalFSStore>(*store);
localFSStore.addPermRoot(storePath, gcRoot); localFSStore.addPermRoot(storePath, gcRoot);
logger->stopWork(); logger->stopWork();
to << gcRoot; conn.to << gcRoot;
break; break;
} }
case WorkerProto::Op::AddIndirectRoot: { case WorkerProto::Op::AddIndirectRoot: {
Path path = absPath(readString(from)); Path path = absPath(readString(conn.from));
logger->startWork(); logger->startWork();
auto & indirectRootStore = require<IndirectRootStore>(*store); auto & indirectRootStore = require<IndirectRootStore>(*store);
indirectRootStore.addIndirectRoot(path); indirectRootStore.addIndirectRoot(path);
logger->stopWork(); logger->stopWork();
to << 1; conn.to << 1;
break; break;
} }
@ -703,7 +698,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
case WorkerProto::Op::SyncWithGC: { case WorkerProto::Op::SyncWithGC: {
logger->startWork(); logger->startWork();
logger->stopWork(); logger->stopWork();
to << 1; conn.to << 1;
break; break;
} }
@ -717,24 +712,24 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
for (auto & i : roots) for (auto & i : roots)
size += i.second.size(); size += i.second.size();
to << size; conn.to << size;
for (auto & [target, links] : roots) for (auto & [target, links] : roots)
for (auto & link : links) for (auto & link : links)
to << link << store->printStorePath(target); conn.to << link << store->printStorePath(target);
break; break;
} }
case WorkerProto::Op::CollectGarbage: { case WorkerProto::Op::CollectGarbage: {
GCOptions options; GCOptions options;
options.action = (GCOptions::GCAction) readInt(from); options.action = (GCOptions::GCAction) readInt(conn.from);
options.pathsToDelete = WorkerProto::Serialise<StorePathSet>::read(*store, rconn); options.pathsToDelete = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
from >> options.ignoreLiveness >> options.maxFreed; conn.from >> options.ignoreLiveness >> options.maxFreed;
// obsolete fields // obsolete fields
readInt(from); readInt(conn.from);
readInt(from); readInt(conn.from);
readInt(from); readInt(conn.from);
GCResults results; GCResults results;
@ -745,7 +740,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
gcStore.collectGarbage(options, results); gcStore.collectGarbage(options, results);
logger->stopWork(); logger->stopWork();
to << results.paths << results.bytesFreed << 0 /* obsolete */; conn.to << results.paths << results.bytesFreed << 0 /* obsolete */;
break; break;
} }
@ -754,24 +749,24 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
ClientSettings clientSettings; ClientSettings clientSettings;
clientSettings.keepFailed = readInt(from); clientSettings.keepFailed = readInt(conn.from);
clientSettings.keepGoing = readInt(from); clientSettings.keepGoing = readInt(conn.from);
clientSettings.tryFallback = readInt(from); clientSettings.tryFallback = readInt(conn.from);
clientSettings.verbosity = (Verbosity) readInt(from); clientSettings.verbosity = (Verbosity) readInt(conn.from);
clientSettings.maxBuildJobs = readInt(from); clientSettings.maxBuildJobs = readInt(conn.from);
clientSettings.maxSilentTime = readInt(from); clientSettings.maxSilentTime = readInt(conn.from);
readInt(from); // obsolete useBuildHook readInt(conn.from); // obsolete useBuildHook
clientSettings.verboseBuild = lvlError == (Verbosity) readInt(from); clientSettings.verboseBuild = lvlError == (Verbosity) readInt(conn.from);
readInt(from); // obsolete logType readInt(conn.from); // obsolete logType
readInt(from); // obsolete printBuildTrace readInt(conn.from); // obsolete printBuildTrace
clientSettings.buildCores = readInt(from); clientSettings.buildCores = readInt(conn.from);
clientSettings.useSubstitutes = readInt(from); clientSettings.useSubstitutes = readInt(conn.from);
if (GET_PROTOCOL_MINOR(clientVersion) >= 12) { if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 12) {
unsigned int n = readInt(from); unsigned int n = readInt(conn.from);
for (unsigned int i = 0; i < n; i++) { for (unsigned int i = 0; i < n; i++) {
auto name = readString(from); auto name = readString(conn.from);
auto value = readString(from); auto value = readString(conn.from);
clientSettings.overrides.emplace(name, value); clientSettings.overrides.emplace(name, value);
} }
} }
@ -788,19 +783,19 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
} }
case WorkerProto::Op::QuerySubstitutablePathInfo: { case WorkerProto::Op::QuerySubstitutablePathInfo: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(conn.from));
logger->startWork(); logger->startWork();
SubstitutablePathInfos infos; SubstitutablePathInfos infos;
store->querySubstitutablePathInfos({{path, std::nullopt}}, infos); store->querySubstitutablePathInfos({{path, std::nullopt}}, infos);
logger->stopWork(); logger->stopWork();
auto i = infos.find(path); auto i = infos.find(path);
if (i == infos.end()) if (i == infos.end())
to << 0; conn.to << 0;
else { else {
to << 1 conn.to << 1
<< (i->second.deriver ? store->printStorePath(*i->second.deriver) : ""); << (i->second.deriver ? store->printStorePath(*i->second.deriver) : "");
WorkerProto::write(*store, wconn, i->second.references); WorkerProto::write(*store, wconn, i->second.references);
to << i->second.downloadSize conn.to << i->second.downloadSize
<< i->second.narSize; << i->second.narSize;
} }
break; break;
@ -809,7 +804,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
case WorkerProto::Op::QuerySubstitutablePathInfos: { case WorkerProto::Op::QuerySubstitutablePathInfos: {
SubstitutablePathInfos infos; SubstitutablePathInfos infos;
StorePathCAMap pathsMap = {}; StorePathCAMap pathsMap = {};
if (GET_PROTOCOL_MINOR(clientVersion) < 22) { if (GET_PROTOCOL_MINOR(conn.protoVersion) < 22) {
auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn); auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
for (auto & path : paths) for (auto & path : paths)
pathsMap.emplace(path, std::nullopt); pathsMap.emplace(path, std::nullopt);
@ -818,12 +813,12 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
logger->startWork(); logger->startWork();
store->querySubstitutablePathInfos(pathsMap, infos); store->querySubstitutablePathInfos(pathsMap, infos);
logger->stopWork(); logger->stopWork();
to << infos.size(); conn.to << infos.size();
for (auto & i : infos) { for (auto & i : infos) {
to << store->printStorePath(i.first) conn.to << store->printStorePath(i.first)
<< (i.second.deriver ? store->printStorePath(*i.second.deriver) : ""); << (i.second.deriver ? store->printStorePath(*i.second.deriver) : "");
WorkerProto::write(*store, wconn, i.second.references); WorkerProto::write(*store, wconn, i.second.references);
to << i.second.downloadSize << i.second.narSize; conn.to << i.second.downloadSize << i.second.narSize;
} }
break; break;
} }
@ -837,22 +832,22 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
} }
case WorkerProto::Op::QueryPathInfo: { case WorkerProto::Op::QueryPathInfo: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(conn.from));
std::shared_ptr<const ValidPathInfo> info; std::shared_ptr<const ValidPathInfo> info;
logger->startWork(); logger->startWork();
try { try {
info = store->queryPathInfo(path); info = store->queryPathInfo(path);
} catch (InvalidPath &) { } catch (InvalidPath &) {
if (GET_PROTOCOL_MINOR(clientVersion) < 17) throw; if (GET_PROTOCOL_MINOR(conn.protoVersion) < 17) throw;
} }
logger->stopWork(); logger->stopWork();
if (info) { if (info) {
if (GET_PROTOCOL_MINOR(clientVersion) >= 17) if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 17)
to << 1; conn.to << 1;
WorkerProto::write(*store, wconn, static_cast<const UnkeyedValidPathInfo &>(*info)); WorkerProto::write(*store, wconn, static_cast<const UnkeyedValidPathInfo &>(*info));
} else { } else {
assert(GET_PROTOCOL_MINOR(clientVersion) >= 17); assert(GET_PROTOCOL_MINOR(conn.protoVersion) >= 17);
to << 0; conn.to << 0;
} }
break; break;
} }
@ -861,61 +856,61 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
logger->startWork(); logger->startWork();
store->optimiseStore(); store->optimiseStore();
logger->stopWork(); logger->stopWork();
to << 1; conn.to << 1;
break; break;
case WorkerProto::Op::VerifyStore: { case WorkerProto::Op::VerifyStore: {
bool checkContents, repair; bool checkContents, repair;
from >> checkContents >> repair; conn.from >> checkContents >> repair;
logger->startWork(); logger->startWork();
if (repair && !trusted) if (repair && !trusted)
throw Error("you are not privileged to repair paths"); throw Error("you are not privileged to repair paths");
bool errors = store->verifyStore(checkContents, (RepairFlag) repair); bool errors = store->verifyStore(checkContents, (RepairFlag) repair);
logger->stopWork(); logger->stopWork();
to << errors; conn.to << errors;
break; break;
} }
case WorkerProto::Op::AddSignatures: { case WorkerProto::Op::AddSignatures: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(conn.from));
StringSet sigs = readStrings<StringSet>(from); StringSet sigs = readStrings<StringSet>(conn.from);
logger->startWork(); logger->startWork();
store->addSignatures(path, sigs); store->addSignatures(path, sigs);
logger->stopWork(); logger->stopWork();
to << 1; conn.to << 1;
break; break;
} }
case WorkerProto::Op::NarFromPath: { case WorkerProto::Op::NarFromPath: {
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(conn.from));
logger->startWork(); logger->startWork();
logger->stopWork(); logger->stopWork();
dumpPath(store->toRealPath(path), to); dumpPath(store->toRealPath(path), conn.to);
break; break;
} }
case WorkerProto::Op::AddToStoreNar: { case WorkerProto::Op::AddToStoreNar: {
bool repair, dontCheckSigs; bool repair, dontCheckSigs;
auto path = store->parseStorePath(readString(from)); auto path = store->parseStorePath(readString(conn.from));
auto deriver = readString(from); auto deriver = readString(conn.from);
auto narHash = Hash::parseAny(readString(from), HashAlgorithm::SHA256); auto narHash = Hash::parseAny(readString(conn.from), HashAlgorithm::SHA256);
ValidPathInfo info { path, narHash }; ValidPathInfo info { path, narHash };
if (deriver != "") if (deriver != "")
info.deriver = store->parseStorePath(deriver); info.deriver = store->parseStorePath(deriver);
info.references = WorkerProto::Serialise<StorePathSet>::read(*store, rconn); info.references = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
from >> info.registrationTime >> info.narSize >> info.ultimate; conn.from >> info.registrationTime >> info.narSize >> info.ultimate;
info.sigs = readStrings<StringSet>(from); info.sigs = readStrings<StringSet>(conn.from);
info.ca = ContentAddress::parseOpt(readString(from)); info.ca = ContentAddress::parseOpt(readString(conn.from));
from >> repair >> dontCheckSigs; conn.from >> repair >> dontCheckSigs;
if (!trusted && dontCheckSigs) if (!trusted && dontCheckSigs)
dontCheckSigs = false; dontCheckSigs = false;
if (!trusted) if (!trusted)
info.ultimate = false; info.ultimate = false;
if (GET_PROTOCOL_MINOR(clientVersion) >= 23) { if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 23) {
logger->startWork(); logger->startWork();
{ {
FramedSource source(from); FramedSource source(conn.from);
store->addToStore(info, source, (RepairFlag) repair, store->addToStore(info, source, (RepairFlag) repair,
dontCheckSigs ? NoCheckSigs : CheckSigs); dontCheckSigs ? NoCheckSigs : CheckSigs);
} }
@ -925,10 +920,10 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
else { else {
std::unique_ptr<Source> source; std::unique_ptr<Source> source;
StringSink saved; StringSink saved;
if (GET_PROTOCOL_MINOR(clientVersion) >= 21) if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 21)
source = std::make_unique<TunnelSource>(from, to); source = std::make_unique<TunnelSource>(conn.from, conn.to);
else { else {
TeeSource tee { from, saved }; TeeSource tee { conn.from, saved };
NullFileSystemObjectSink ether; NullFileSystemObjectSink ether;
parseDump(ether, tee); parseDump(ether, tee);
source = std::make_unique<StringSource>(saved.s); source = std::make_unique<StringSource>(saved.s);
@ -956,15 +951,15 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
WorkerProto::write(*store, wconn, willBuild); WorkerProto::write(*store, wconn, willBuild);
WorkerProto::write(*store, wconn, willSubstitute); WorkerProto::write(*store, wconn, willSubstitute);
WorkerProto::write(*store, wconn, unknown); WorkerProto::write(*store, wconn, unknown);
to << downloadSize << narSize; conn.to << downloadSize << narSize;
break; break;
} }
case WorkerProto::Op::RegisterDrvOutput: { case WorkerProto::Op::RegisterDrvOutput: {
logger->startWork(); logger->startWork();
if (GET_PROTOCOL_MINOR(clientVersion) < 31) { if (GET_PROTOCOL_MINOR(conn.protoVersion) < 31) {
auto outputId = DrvOutput::parse(readString(from)); auto outputId = DrvOutput::parse(readString(conn.from));
auto outputPath = StorePath(readString(from)); auto outputPath = StorePath(readString(conn.from));
store->registerDrvOutput(Realisation{ store->registerDrvOutput(Realisation{
.id = outputId, .outPath = outputPath}); .id = outputId, .outPath = outputPath});
} else { } else {
@ -977,10 +972,10 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
case WorkerProto::Op::QueryRealisation: { case WorkerProto::Op::QueryRealisation: {
logger->startWork(); logger->startWork();
auto outputId = DrvOutput::parse(readString(from)); auto outputId = DrvOutput::parse(readString(conn.from));
auto info = store->queryRealisation(outputId); auto info = store->queryRealisation(outputId);
logger->stopWork(); logger->stopWork();
if (GET_PROTOCOL_MINOR(clientVersion) < 31) { if (GET_PROTOCOL_MINOR(conn.protoVersion) < 31) {
std::set<StorePath> outPaths; std::set<StorePath> outPaths;
if (info) outPaths.insert(info->outPath); if (info) outPaths.insert(info->outPath);
WorkerProto::write(*store, wconn, outPaths); WorkerProto::write(*store, wconn, outPaths);
@ -993,19 +988,19 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
} }
case WorkerProto::Op::AddBuildLog: { case WorkerProto::Op::AddBuildLog: {
StorePath path{readString(from)}; StorePath path{readString(conn.from)};
logger->startWork(); logger->startWork();
if (!trusted) if (!trusted)
throw Error("you are not privileged to add logs"); throw Error("you are not privileged to add logs");
auto & logStore = require<LogStore>(*store); auto & logStore = require<LogStore>(*store);
{ {
FramedSource source(from); FramedSource source(conn.from);
StringSink sink; StringSink sink;
source.drainInto(sink); source.drainInto(sink);
logStore.addBuildLog(path, sink.s); logStore.addBuildLog(path, sink.s);
} }
logger->stopWork(); logger->stopWork();
to << 1; conn.to << 1;
break; break;
} }
@ -1020,8 +1015,8 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
void processConnection( void processConnection(
ref<Store> store, ref<Store> store,
FdSource & from, FdSource && from,
FdSink & to, FdSink && to,
TrustedFlag trusted, TrustedFlag trusted,
RecursiveFlag recursive) RecursiveFlag recursive)
{ {
@ -1037,7 +1032,12 @@ void processConnection(
if (clientVersion < 0x10a) if (clientVersion < 0x10a)
throw Error("the Nix client version is too old"); throw Error("the Nix client version is too old");
auto tunnelLogger = new TunnelLogger(to, clientVersion); WorkerProto::BasicServerConnection conn;
conn.to = std::move(to);
conn.from = std::move(from);
conn.protoVersion = clientVersion;
auto tunnelLogger = new TunnelLogger(conn.to, clientVersion);
auto prevLogger = nix::logger; auto prevLogger = nix::logger;
// FIXME // FIXME
if (!recursive) if (!recursive)
@ -1050,12 +1050,6 @@ void processConnection(
printMsgUsing(prevLogger, lvlDebug, "%d operations", opCount); printMsgUsing(prevLogger, lvlDebug, "%d operations", opCount);
}); });
WorkerProto::BasicServerConnection conn {
.to = to,
.from = from,
.clientVersion = clientVersion,
};
conn.postHandshake(*store, { conn.postHandshake(*store, {
.daemonNixVersion = nixVersion, .daemonNixVersion = nixVersion,
// We and the underlying store both need to trust the client for // We and the underlying store both need to trust the client for
@ -1071,13 +1065,13 @@ void processConnection(
try { try {
tunnelLogger->stopWork(); tunnelLogger->stopWork();
to.flush(); conn.to.flush();
/* Process client requests. */ /* Process client requests. */
while (true) { while (true) {
WorkerProto::Op op; WorkerProto::Op op;
try { try {
op = (enum WorkerProto::Op) readInt(from); op = (enum WorkerProto::Op) readInt(conn.from);
} catch (Interrupted & e) { } catch (Interrupted & e) {
break; break;
} catch (EndOfFile & e) { } catch (EndOfFile & e) {
@ -1091,7 +1085,7 @@ void processConnection(
debug("performing daemon worker op: %d", op); debug("performing daemon worker op: %d", op);
try { try {
performOp(tunnelLogger, store, trusted, recursive, clientVersion, from, to, op); performOp(tunnelLogger, store, trusted, recursive, conn, op);
} catch (Error & e) { } catch (Error & e) {
/* If we're not in a state where we can send replies, then /* If we're not in a state where we can send replies, then
something went wrong processing the input of the something went wrong processing the input of the
@ -1107,19 +1101,19 @@ void processConnection(
throw; throw;
} }
to.flush(); conn.to.flush();
assert(!tunnelLogger->state_.lock()->canSendStderr); assert(!tunnelLogger->state_.lock()->canSendStderr);
}; };
} catch (Error & e) { } catch (Error & e) {
tunnelLogger->stopWork(&e); tunnelLogger->stopWork(&e);
to.flush(); conn.to.flush();
return; return;
} catch (std::exception & e) { } catch (std::exception & e) {
auto ex = Error(e.what()); auto ex = Error(e.what());
tunnelLogger->stopWork(&ex); tunnelLogger->stopWork(&ex);
to.flush(); conn.to.flush();
return; return;
} }
} }

View file

@ -10,8 +10,8 @@ enum RecursiveFlag : bool { NotRecursive = false, Recursive = true };
void processConnection( void processConnection(
ref<Store> store, ref<Store> store,
FdSource & from, FdSource && from,
FdSink & to, FdSink && to,
TrustedFlag trusted, TrustedFlag trusted,
RecursiveFlag recursive); RecursiveFlag recursive);

View file

@ -73,7 +73,7 @@ void RemoteStore::initConnection(Connection & conn)
StringSink saved; StringSink saved;
TeeSource tee(conn.from, saved); TeeSource tee(conn.from, saved);
try { try {
conn.daemonVersion = WorkerProto::BasicClientConnection::handshake( conn.protoVersion = WorkerProto::BasicClientConnection::handshake(
conn.to, tee, PROTOCOL_VERSION); conn.to, tee, PROTOCOL_VERSION);
} catch (SerialisationError & e) { } catch (SerialisationError & e) {
/* In case the other side is waiting for our input, close /* In case the other side is waiting for our input, close
@ -115,7 +115,7 @@ void RemoteStore::setOptions(Connection & conn)
<< settings.buildCores << settings.buildCores
<< settings.useSubstitutes; << settings.useSubstitutes;
if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 12) { if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 12) {
std::map<std::string, Config::SettingInfo> overrides; std::map<std::string, Config::SettingInfo> overrides;
settings.getSettings(overrides, true); // libstore settings settings.getSettings(overrides, true); // libstore settings
fileTransferSettings.getSettings(overrides, true); fileTransferSettings.getSettings(overrides, true);
@ -175,7 +175,7 @@ bool RemoteStore::isValidPathUncached(const StorePath & path)
StorePathSet RemoteStore::queryValidPaths(const StorePathSet & paths, SubstituteFlag maybeSubstitute) StorePathSet RemoteStore::queryValidPaths(const StorePathSet & paths, SubstituteFlag maybeSubstitute)
{ {
auto conn(getConnection()); auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) { if (GET_PROTOCOL_MINOR(conn->protoVersion) < 12) {
StorePathSet res; StorePathSet res;
for (auto & i : paths) for (auto & i : paths)
if (isValidPath(i)) res.insert(i); if (isValidPath(i)) res.insert(i);
@ -198,7 +198,7 @@ StorePathSet RemoteStore::queryAllValidPaths()
StorePathSet RemoteStore::querySubstitutablePaths(const StorePathSet & paths) StorePathSet RemoteStore::querySubstitutablePaths(const StorePathSet & paths)
{ {
auto conn(getConnection()); auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) { if (GET_PROTOCOL_MINOR(conn->protoVersion) < 12) {
StorePathSet res; StorePathSet res;
for (auto & i : paths) { for (auto & i : paths) {
conn->to << WorkerProto::Op::HasSubstitutes << printStorePath(i); conn->to << WorkerProto::Op::HasSubstitutes << printStorePath(i);
@ -221,7 +221,7 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S
auto conn(getConnection()); auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) { if (GET_PROTOCOL_MINOR(conn->protoVersion) < 12) {
for (auto & i : pathsMap) { for (auto & i : pathsMap) {
SubstitutablePathInfo info; SubstitutablePathInfo info;
@ -241,7 +241,7 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S
} else { } else {
conn->to << WorkerProto::Op::QuerySubstitutablePathInfos; conn->to << WorkerProto::Op::QuerySubstitutablePathInfos;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 22) { if (GET_PROTOCOL_MINOR(conn->protoVersion) < 22) {
StorePathSet paths; StorePathSet paths;
for (auto & path : pathsMap) for (auto & path : pathsMap)
paths.insert(path.first); paths.insert(path.first);
@ -368,7 +368,7 @@ ref<const ValidPathInfo> RemoteStore::addCAToStore(
std::optional<ConnectionHandle> conn_(getConnection()); std::optional<ConnectionHandle> conn_(getConnection());
auto & conn = *conn_; auto & conn = *conn_;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 25) { if (GET_PROTOCOL_MINOR(conn->protoVersion) >= 25) {
conn->to conn->to
<< WorkerProto::Op::AddToStore << WorkerProto::Op::AddToStore
@ -485,7 +485,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
{ {
auto conn(getConnection()); auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 18) { if (GET_PROTOCOL_MINOR(conn->protoVersion) < 18) {
auto source2 = sinkToSource([&](Sink & sink) { auto source2 = sinkToSource([&](Sink & sink) {
sink << 1 // == path follows sink << 1 // == path follows
; ;
@ -513,11 +513,11 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
<< info.ultimate << info.sigs << renderContentAddress(info.ca) << info.ultimate << info.sigs << renderContentAddress(info.ca)
<< repair << !checkSigs; << repair << !checkSigs;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 23) { if (GET_PROTOCOL_MINOR(conn->protoVersion) >= 23) {
conn.withFramedSink([&](Sink & sink) { conn.withFramedSink([&](Sink & sink) {
copyNAR(source, sink); copyNAR(source, sink);
}); });
} else if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21) { } else if (GET_PROTOCOL_MINOR(conn->protoVersion) >= 21) {
conn.processStderr(0, &source); conn.processStderr(0, &source);
} else { } else {
copyNAR(source, conn->to); copyNAR(source, conn->to);
@ -554,7 +554,7 @@ void RemoteStore::addMultipleToStore(
RepairFlag repair, RepairFlag repair,
CheckSigsFlag checkSigs) CheckSigsFlag checkSigs)
{ {
if (GET_PROTOCOL_MINOR(getConnection()->daemonVersion) >= 32) { if (GET_PROTOCOL_MINOR(getConnection()->protoVersion) >= 32) {
auto conn(getConnection()); auto conn(getConnection());
conn->to conn->to
<< WorkerProto::Op::AddMultipleToStore << WorkerProto::Op::AddMultipleToStore
@ -572,7 +572,7 @@ void RemoteStore::registerDrvOutput(const Realisation & info)
{ {
auto conn(getConnection()); auto conn(getConnection());
conn->to << WorkerProto::Op::RegisterDrvOutput; conn->to << WorkerProto::Op::RegisterDrvOutput;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 31) { if (GET_PROTOCOL_MINOR(conn->protoVersion) < 31) {
conn->to << info.id.to_string(); conn->to << info.id.to_string();
conn->to << std::string(info.outPath.to_string()); conn->to << std::string(info.outPath.to_string());
} else { } else {
@ -587,7 +587,7 @@ void RemoteStore::queryRealisationUncached(const DrvOutput & id,
try { try {
auto conn(getConnection()); auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 27) { if (GET_PROTOCOL_MINOR(conn->protoVersion) < 27) {
warn("the daemon is too old to support content-addressed derivations, please upgrade it to 2.4"); warn("the daemon is too old to support content-addressed derivations, please upgrade it to 2.4");
return callback(nullptr); return callback(nullptr);
} }
@ -597,7 +597,7 @@ void RemoteStore::queryRealisationUncached(const DrvOutput & id,
conn.processStderr(); conn.processStderr();
auto real = [&]() -> std::shared_ptr<const Realisation> { auto real = [&]() -> std::shared_ptr<const Realisation> {
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 31) { if (GET_PROTOCOL_MINOR(conn->protoVersion) < 31) {
auto outPaths = WorkerProto::Serialise<std::set<StorePath>>::read( auto outPaths = WorkerProto::Serialise<std::set<StorePath>>::read(
*this, *conn); *this, *conn);
if (outPaths.empty()) if (outPaths.empty())
@ -644,9 +644,9 @@ void RemoteStore::buildPaths(const std::vector<DerivedPath> & drvPaths, BuildMod
auto conn(getConnection()); auto conn(getConnection());
conn->to << WorkerProto::Op::BuildPaths; conn->to << WorkerProto::Op::BuildPaths;
assert(GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13); assert(GET_PROTOCOL_MINOR(conn->protoVersion) >= 13);
WorkerProto::write(*this, *conn, drvPaths); WorkerProto::write(*this, *conn, drvPaths);
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 15) if (GET_PROTOCOL_MINOR(conn->protoVersion) >= 15)
conn->to << buildMode; conn->to << buildMode;
else else
/* Old daemons did not take a 'buildMode' parameter, so we /* Old daemons did not take a 'buildMode' parameter, so we
@ -667,7 +667,7 @@ std::vector<KeyedBuildResult> RemoteStore::buildPathsWithResults(
std::optional<ConnectionHandle> conn_(getConnection()); std::optional<ConnectionHandle> conn_(getConnection());
auto & conn = *conn_; auto & conn = *conn_;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 34) { if (GET_PROTOCOL_MINOR(conn->protoVersion) >= 34) {
conn->to << WorkerProto::Op::BuildPathsWithResults; conn->to << WorkerProto::Op::BuildPathsWithResults;
WorkerProto::write(*this, *conn, paths); WorkerProto::write(*this, *conn, paths);
conn->to << buildMode; conn->to << buildMode;
@ -841,7 +841,7 @@ void RemoteStore::queryMissing(const std::vector<DerivedPath> & targets,
{ {
{ {
auto conn(getConnection()); auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 19) if (GET_PROTOCOL_MINOR(conn->protoVersion) < 19)
// Don't hold the connection handle in the fallback case // Don't hold the connection handle in the fallback case
// to prevent a deadlock. // to prevent a deadlock.
goto fallback; goto fallback;
@ -889,7 +889,7 @@ void RemoteStore::connect()
unsigned int RemoteStore::getProtocol() unsigned int RemoteStore::getProtocol()
{ {
auto conn(connections->get()); auto conn(connections->get());
return conn->daemonVersion; return conn->protoVersion;
} }
std::optional<TrustedFlag> RemoteStore::isTrustedClient() std::optional<TrustedFlag> RemoteStore::isTrustedClient()

View file

@ -1526,10 +1526,11 @@ void LocalDerivationGoal::startDaemon()
debug("received daemon connection"); debug("received daemon connection");
auto workerThread = std::thread([store, remote{std::move(remote)}]() { auto workerThread = std::thread([store, remote{std::move(remote)}]() {
FdSource from(remote.get());
FdSink to(remote.get());
try { try {
daemon::processConnection(store, from, to, daemon::processConnection(
store,
FdSource(remote.get()),
FdSink(remote.get()),
NotTrusted, daemon::Recursive); NotTrusted, daemon::Recursive);
debug("terminated daemon connection"); debug("terminated daemon connection");
} catch (SystemError &) { } catch (SystemError &) {

View file

@ -58,7 +58,7 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink
} }
else if (msg == STDERR_ERROR) { else if (msg == STDERR_ERROR) {
if (GET_PROTOCOL_MINOR(daemonVersion) >= 26) { if (GET_PROTOCOL_MINOR(protoVersion) >= 26) {
ex = std::make_exception_ptr(readError(from)); ex = std::make_exception_ptr(readError(from));
} else { } else {
auto error = readString(from); auto error = readString(from);
@ -114,7 +114,7 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink
// explain to users what's going on when their daemon is // explain to users what's going on when their daemon is
// older than #4628 (2023). // older than #4628 (2023).
if (experimentalFeatureSettings.isEnabled(Xp::DynamicDerivations) if (experimentalFeatureSettings.isEnabled(Xp::DynamicDerivations)
&& GET_PROTOCOL_MINOR(daemonVersion) <= 35) { && GET_PROTOCOL_MINOR(protoVersion) <= 35) {
auto m = e.msg(); auto m = e.msg();
if (m.find("parsing derivation") != std::string::npos && m.find("expected string") != std::string::npos if (m.find("parsing derivation") != std::string::npos && m.find("expected string") != std::string::npos
&& m.find("Derive([") != std::string::npos) && m.find("Derive([") != std::string::npos)
@ -172,15 +172,15 @@ WorkerProto::ClientHandshakeInfo WorkerProto::BasicClientConnection::postHandsha
{ {
WorkerProto::ClientHandshakeInfo res; WorkerProto::ClientHandshakeInfo res;
if (GET_PROTOCOL_MINOR(daemonVersion) >= 14) { if (GET_PROTOCOL_MINOR(protoVersion) >= 14) {
// Obsolete CPU affinity. // Obsolete CPU affinity.
to << 0; to << 0;
} }
if (GET_PROTOCOL_MINOR(daemonVersion) >= 11) if (GET_PROTOCOL_MINOR(protoVersion) >= 11)
to << false; // obsolete reserveSpace to << false; // obsolete reserveSpace
if (GET_PROTOCOL_MINOR(daemonVersion) >= 33) if (GET_PROTOCOL_MINOR(protoVersion) >= 33)
to.flush(); to.flush();
return WorkerProto::Serialise<ClientHandshakeInfo>::read(store, *this); return WorkerProto::Serialise<ClientHandshakeInfo>::read(store, *this);
@ -188,12 +188,12 @@ WorkerProto::ClientHandshakeInfo WorkerProto::BasicClientConnection::postHandsha
void WorkerProto::BasicServerConnection::postHandshake(const StoreDirConfig & store, const ClientHandshakeInfo & info) void WorkerProto::BasicServerConnection::postHandshake(const StoreDirConfig & store, const ClientHandshakeInfo & info)
{ {
if (GET_PROTOCOL_MINOR(clientVersion) >= 14 && readInt(from)) { if (GET_PROTOCOL_MINOR(protoVersion) >= 14 && readInt(from)) {
// Obsolete CPU affinity. // Obsolete CPU affinity.
readInt(from); readInt(from);
} }
if (GET_PROTOCOL_MINOR(clientVersion) >= 11) if (GET_PROTOCOL_MINOR(protoVersion) >= 11)
readInt(from); // obsolete reserveSpace readInt(from); // obsolete reserveSpace
WorkerProto::write(store, *this, info); WorkerProto::write(store, *this, info);
@ -211,7 +211,7 @@ UnkeyedValidPathInfo WorkerProto::BasicClientConnection::queryPathInfo(
throw InvalidPath(std::move(e.info())); throw InvalidPath(std::move(e.info()));
throw; throw;
} }
if (GET_PROTOCOL_MINOR(daemonVersion) >= 17) { if (GET_PROTOCOL_MINOR(protoVersion) >= 17) {
bool valid; bool valid;
from >> valid; from >> valid;
if (!valid) if (!valid)
@ -223,10 +223,10 @@ UnkeyedValidPathInfo WorkerProto::BasicClientConnection::queryPathInfo(
StorePathSet WorkerProto::BasicClientConnection::queryValidPaths( StorePathSet WorkerProto::BasicClientConnection::queryValidPaths(
const StoreDirConfig & store, bool * daemonException, const StorePathSet & paths, SubstituteFlag maybeSubstitute) const StoreDirConfig & store, bool * daemonException, const StorePathSet & paths, SubstituteFlag maybeSubstitute)
{ {
assert(GET_PROTOCOL_MINOR(daemonVersion) >= 12); assert(GET_PROTOCOL_MINOR(protoVersion) >= 12);
to << WorkerProto::Op::QueryValidPaths; to << WorkerProto::Op::QueryValidPaths;
WorkerProto::write(store, *this, paths); WorkerProto::write(store, *this, paths);
if (GET_PROTOCOL_MINOR(daemonVersion) >= 27) { if (GET_PROTOCOL_MINOR(protoVersion) >= 27) {
to << maybeSubstitute; to << maybeSubstitute;
} }
processStderr(daemonException); processStderr(daemonException);

View file

@ -6,7 +6,7 @@
namespace nix { namespace nix {
struct WorkerProto::BasicClientConnection struct WorkerProto::BasicConnection
{ {
/** /**
* Send with this. * Send with this.
@ -19,14 +19,45 @@ struct WorkerProto::BasicClientConnection
FdSource from; FdSource from;
/** /**
* Worker protocol version used for the connection. * The protocol version agreed by both sides.
*
* Despite its name, it is actually the maximum version both
* sides support. (If the maximum doesn't exist, we would fail to
* establish a connection and produce a value of this type.)
*/ */
WorkerProto::Version daemonVersion; WorkerProto::Version protoVersion;
/**
* Coercion to `WorkerProto::ReadConn`. This makes it easy to use the
* factored out serve protocol serializers with a
* `LegacySSHStore::Connection`.
*
* The serve protocol connection types are unidirectional, unlike
* this type.
*/
operator WorkerProto::ReadConn()
{
return WorkerProto::ReadConn{
.from = from,
.version = protoVersion,
};
}
/**
* Coercion to `WorkerProto::WriteConn`. This makes it easy to use the
* factored out serve protocol serializers with a
* `LegacySSHStore::Connection`.
*
* The serve protocol connection types are unidirectional, unlike
* this type.
*/
operator WorkerProto::WriteConn()
{
return WorkerProto::WriteConn{
.to = to,
.version = protoVersion,
};
}
};
struct WorkerProto::BasicClientConnection : WorkerProto::BasicConnection
{
/** /**
* Flush to direction * Flush to direction
*/ */
@ -60,38 +91,6 @@ struct WorkerProto::BasicClientConnection
*/ */
ClientHandshakeInfo postHandshake(const StoreDirConfig & store); ClientHandshakeInfo postHandshake(const StoreDirConfig & store);
/**
* Coercion to `WorkerProto::ReadConn`. This makes it easy to use the
* factored out serve protocol serializers with a
* `LegacySSHStore::Connection`.
*
* The serve protocol connection types are unidirectional, unlike
* this type.
*/
operator WorkerProto::ReadConn()
{
return WorkerProto::ReadConn{
.from = from,
.version = daemonVersion,
};
}
/**
* Coercion to `WorkerProto::WriteConn`. This makes it easy to use the
* factored out serve protocol serializers with a
* `LegacySSHStore::Connection`.
*
* The serve protocol connection types are unidirectional, unlike
* this type.
*/
operator WorkerProto::WriteConn()
{
return WorkerProto::WriteConn{
.to = to,
.version = daemonVersion,
};
}
void addTempRoot(const StoreDirConfig & remoteStore, bool * daemonException, const StorePath & path); void addTempRoot(const StoreDirConfig & remoteStore, bool * daemonException, const StorePath & path);
StorePathSet queryValidPaths( StorePathSet queryValidPaths(
@ -124,43 +123,8 @@ struct WorkerProto::BasicClientConnection
void importPaths(const StoreDirConfig & store, bool * daemonException, Source & source); void importPaths(const StoreDirConfig & store, bool * daemonException, Source & source);
}; };
struct WorkerProto::BasicServerConnection struct WorkerProto::BasicServerConnection : WorkerProto::BasicConnection
{ {
/**
* Send with this.
*/
FdSink & to;
/**
* Receive with this.
*/
FdSource & from;
/**
* Worker protocol version used for the connection.
*
* Despite its name, it is actually the maximum version both
* sides support. (If the maximum doesn't exist, we would fail to
* establish a connection and produce a value of this type.)
*/
WorkerProto::Version clientVersion;
operator WorkerProto::ReadConn()
{
return WorkerProto::ReadConn{
.from = from,
.version = clientVersion,
};
}
operator WorkerProto::WriteConn()
{
return WorkerProto::WriteConn{
.to = to,
.version = clientVersion,
};
}
/** /**
* Establishes connection, negotiating version. * Establishes connection, negotiating version.
* *

View file

@ -82,6 +82,7 @@ struct WorkerProto
* *
* @todo remove once Hydra uses Store abstraction consistently. * @todo remove once Hydra uses Store abstraction consistently.
*/ */
struct BasicConnection;
struct BasicClientConnection; struct BasicClientConnection;
struct BasicServerConnection; struct BasicServerConnection;
@ -183,8 +184,7 @@ enum struct WorkerProto::Op : uint64_t
struct WorkerProto::ClientHandshakeInfo struct WorkerProto::ClientHandshakeInfo
{ {
/** /**
* The version of the Nix daemon that is processing our requests * The version of the Nix daemon that is processing our requests.
.
* *
* Do note, it may or may not communicating with another daemon, * Do note, it may or may not communicating with another daemon,
* rather than being an "end" `LocalStore` or similar. * rather than being an "end" `LocalStore` or similar.

View file

@ -159,13 +159,7 @@ struct FdSource : BufferedSource
FdSource(Descriptor fd) : fd(fd) { } FdSource(Descriptor fd) : fd(fd) { }
FdSource(FdSource &&) = default; FdSource(FdSource &&) = default;
FdSource & operator=(FdSource && s) FdSource & operator=(FdSource && s) = default;
{
fd = s.fd;
s.fd = INVALID_DESCRIPTOR;
read = s.read;
return *this;
}
bool good() override; bool good() override;
protected: protected:

View file

@ -370,9 +370,12 @@ static void daemonLoop(std::optional<TrustedFlag> forceTrustClientOpt)
} }
// Handle the connection. // Handle the connection.
FdSource from(remote.get()); processConnection(
FdSink to(remote.get()); openUncachedStore(),
processConnection(openUncachedStore(), from, to, trusted, NotRecursive); FdSource(remote.get()),
FdSink(remote.get()),
trusted,
NotRecursive);
exit(0); exit(0);
}, options); }, options);
@ -437,9 +440,11 @@ static void forwardStdioConnection(RemoteStore & store) {
*/ */
static void processStdioConnection(ref<Store> store, TrustedFlag trustClient) static void processStdioConnection(ref<Store> store, TrustedFlag trustClient)
{ {
FdSource from(STDIN_FILENO); processConnection(
FdSink to(STDOUT_FILENO); store,
processConnection(store, from, to, trustClient, NotRecursive); FdSource(STDIN_FILENO),
FdSink(STDOUT_FILENO),
trustClient, NotRecursive);
} }
/** /**