Factor out bits of the worker protocol to use elsewhere

This introduces some shared infrastructure for our notion of protocols.
We can then define multiple protocols in terms of that notion.
We an also express how particular protocols depend on each other.

For example, we can define a common protocol and a worker protocol,
where the second depends on the first in terms of the data types it can
read and write.

The "serve" protocol can just use the common one for now, but will
eventually need its own machinary just like the worker protocol for
version-aware serialisers
This commit is contained in:
John Ericson 2022-03-25 04:39:57 +00:00
parent aaef47a08e
commit be81764320
13 changed files with 542 additions and 251 deletions

View file

@ -54,6 +54,23 @@ INPUT = \
src/nix-env \ src/nix-env \
src/nix-store src/nix-store
# If the MACRO_EXPANSION tag is set to YES, doxygen will expand all macro names
# in the source code. If set to NO, only conditional compilation will be
# performed. Macro expansion can be done in a controlled way by setting
# EXPAND_ONLY_PREDEF to YES.
# The default value is: NO.
# This tag requires that the tag ENABLE_PREPROCESSING is set to YES.
MACRO_EXPANSION = YES
# If the EXPAND_ONLY_PREDEF and MACRO_EXPANSION tags are both set to YES then
# the macro expansion is limited to the macros specified with the PREDEFINED and
# EXPAND_AS_DEFINED tags.
# The default value is: NO.
# This tag requires that the tag ENABLE_PREPROCESSING is set to YES.
EXPAND_ONLY_PREDEF = YES
# The INCLUDE_PATH tag can be used to specify one or more directories that # The INCLUDE_PATH tag can be used to specify one or more directories that
# contain include files that are not input files but should be processed by the # contain include files that are not input files but should be processed by the
# preprocessor. Note that the INCLUDE_PATH is not recursive, so the setting of # preprocessor. Note that the INCLUDE_PATH is not recursive, so the setting of
@ -61,3 +78,16 @@ INPUT = \
# This tag requires that the tag SEARCH_INCLUDES is set to YES. # This tag requires that the tag SEARCH_INCLUDES is set to YES.
INCLUDE_PATH = @RAPIDCHECK_HEADERS@ INCLUDE_PATH = @RAPIDCHECK_HEADERS@
# If the MACRO_EXPANSION and EXPAND_ONLY_PREDEF tags are set to YES then this
# tag can be used to specify a list of macro names that should be expanded. The
# macro definition that is found in the sources will be used. Use the PREDEFINED
# tag if you want to use a different macro definition that overrules the
# definition found in the source code.
# This tag requires that the tag ENABLE_PREPROCESSING is set to YES.
EXPAND_AS_DEFINED = \
DECLARE_COMMON_SERIALISER \
DECLARE_WORKER_SERIALISER \
DECLARE_SERVE_SERIALISER \
LENGTH_PREFIXED_PROTO_HELPER

View file

@ -8,8 +8,8 @@
#include "util.hh" #include "util.hh"
#include "archive.hh" #include "archive.hh"
#include "compression.hh" #include "compression.hh"
#include "worker-protocol.hh" #include "common-protocol.hh"
#include "worker-protocol-impl.hh" #include "common-protocol-impl.hh"
#include "topo-sort.hh" #include "topo-sort.hh"
#include "callback.hh" #include "callback.hh"
#include "local-store.hh" // TODO remove, along with remaining downcasts #include "local-store.hh" // TODO remove, along with remaining downcasts
@ -1185,11 +1185,11 @@ HookReply DerivationGoal::tryBuildHook()
throw; throw;
} }
WorkerProto::WriteConn conn { hook->sink }; CommonProto::WriteConn conn { hook->sink };
/* Tell the hook all the inputs that have to be copied to the /* Tell the hook all the inputs that have to be copied to the
remote system. */ remote system. */
WorkerProto::write(worker.store, conn, inputPaths); CommonProto::write(worker.store, conn, inputPaths);
/* Tell the hooks the missing outputs that have to be copied back /* Tell the hooks the missing outputs that have to be copied back
from the remote system. */ from the remote system. */
@ -1200,7 +1200,7 @@ HookReply DerivationGoal::tryBuildHook()
if (buildMode != bmCheck && status.known && status.known->isValid()) continue; if (buildMode != bmCheck && status.known && status.known->isValid()) continue;
missingOutputs.insert(outputName); missingOutputs.insert(outputName);
} }
WorkerProto::write(worker.store, conn, missingOutputs); CommonProto::write(worker.store, conn, missingOutputs);
} }
hook->sink = FdSink(); hook->sink = FdSink();

View file

@ -0,0 +1,41 @@
#pragma once
/**
* @file
*
* Template implementations (as opposed to mere declarations).
*
* This file is an exmample of the "impl.hh" pattern. See the
* contributing guide.
*/
#include "common-protocol.hh"
#include "length-prefixed-protocol-helper.hh"
namespace nix {
/* protocol-agnostic templates */
#define COMMON_USE_LENGTH_PREFIX_SERIALISER(TEMPLATE, T) \
TEMPLATE T CommonProto::Serialise< T >::read(const Store & store, CommonProto::ReadConn conn) \
{ \
return LengthPrefixedProtoHelper<CommonProto, T >::read(store, conn); \
} \
TEMPLATE void CommonProto::Serialise< T >::write(const Store & store, CommonProto::WriteConn conn, const T & t) \
{ \
LengthPrefixedProtoHelper<CommonProto, T >::write(store, conn, t); \
}
COMMON_USE_LENGTH_PREFIX_SERIALISER(template<typename T>, std::vector<T>)
COMMON_USE_LENGTH_PREFIX_SERIALISER(template<typename T>, std::set<T>)
COMMON_USE_LENGTH_PREFIX_SERIALISER(template<typename... Ts>, std::tuple<Ts...>)
#define COMMA_ ,
COMMON_USE_LENGTH_PREFIX_SERIALISER(
template<typename K COMMA_ typename V>,
std::map<K COMMA_ V>)
#undef COMMA_
/* protocol-specific templates */
}

View file

@ -0,0 +1,98 @@
#include "serialise.hh"
#include "util.hh"
#include "path-with-outputs.hh"
#include "store-api.hh"
#include "build-result.hh"
#include "common-protocol.hh"
#include "common-protocol-impl.hh"
#include "archive.hh"
#include "derivations.hh"
#include <nlohmann/json.hpp>
namespace nix {
/* protocol-agnostic definitions */
std::string CommonProto::Serialise<std::string>::read(const Store & store, CommonProto::ReadConn conn)
{
return readString(conn.from);
}
void CommonProto::Serialise<std::string>::write(const Store & store, CommonProto::WriteConn conn, const std::string & str)
{
conn.to << str;
}
StorePath CommonProto::Serialise<StorePath>::read(const Store & store, CommonProto::ReadConn conn)
{
return store.parseStorePath(readString(conn.from));
}
void CommonProto::Serialise<StorePath>::write(const Store & store, CommonProto::WriteConn conn, const StorePath & storePath)
{
conn.to << store.printStorePath(storePath);
}
ContentAddress CommonProto::Serialise<ContentAddress>::read(const Store & store, CommonProto::ReadConn conn)
{
return ContentAddress::parse(readString(conn.from));
}
void CommonProto::Serialise<ContentAddress>::write(const Store & store, CommonProto::WriteConn conn, const ContentAddress & ca)
{
conn.to << renderContentAddress(ca);
}
Realisation CommonProto::Serialise<Realisation>::read(const Store & store, CommonProto::ReadConn conn)
{
std::string rawInput = readString(conn.from);
return Realisation::fromJSON(
nlohmann::json::parse(rawInput),
"remote-protocol"
);
}
void CommonProto::Serialise<Realisation>::write(const Store & store, CommonProto::WriteConn conn, const Realisation & realisation)
{
conn.to << realisation.toJSON().dump();
}
DrvOutput CommonProto::Serialise<DrvOutput>::read(const Store & store, CommonProto::ReadConn conn)
{
return DrvOutput::parse(readString(conn.from));
}
void CommonProto::Serialise<DrvOutput>::write(const Store & store, CommonProto::WriteConn conn, const DrvOutput & drvOutput)
{
conn.to << drvOutput.to_string();
}
std::optional<StorePath> CommonProto::Serialise<std::optional<StorePath>>::read(const Store & store, CommonProto::ReadConn conn)
{
auto s = readString(conn.from);
return s == "" ? std::optional<StorePath> {} : store.parseStorePath(s);
}
void CommonProto::Serialise<std::optional<StorePath>>::write(const Store & store, CommonProto::WriteConn conn, const std::optional<StorePath> & storePathOpt)
{
conn.to << (storePathOpt ? store.printStorePath(*storePathOpt) : "");
}
std::optional<ContentAddress> CommonProto::Serialise<std::optional<ContentAddress>>::read(const Store & store, CommonProto::ReadConn conn)
{
return ContentAddress::parseOpt(readString(conn.from));
}
void CommonProto::Serialise<std::optional<ContentAddress>>::write(const Store & store, CommonProto::WriteConn conn, const std::optional<ContentAddress> & caOpt)
{
conn.to << (caOpt ? renderContentAddress(*caOpt) : "");
}
}

View file

@ -0,0 +1,106 @@
#pragma once
///@file
#include "serialise.hh"
namespace nix {
class Store;
struct Source;
// items being serialized
class StorePath;
struct ContentAddress;
struct DrvOutput;
struct Realisation;
/**
* Shared serializers between the worker protocol, serve protocol, and a
* few others.
*
* This `struct` is basically just a `namespace`; We use a type rather
* than a namespace just so we can use it as a template argument.
*/
struct CommonProto
{
/**
* A unidirectional read connection, to be used by the read half of the
* canonical serializers below.
*/
struct ReadConn {
Source & from;
};
/**
* A unidirectional write connection, to be used by the write half of the
* canonical serializers below.
*/
struct WriteConn {
Sink & to;
};
template<typename T>
struct Serialise;
/**
* Wrapper function around `CommonProto::Serialise<T>::write` that allows us to
* infer the type instead of having to write it down explicitly.
*/
template<typename T>
static void write(const Store & store, WriteConn conn, const T & t)
{
CommonProto::Serialise<T>::write(store, conn, t);
}
};
#define DECLARE_COMMON_SERIALISER(T) \
struct CommonProto::Serialise< T > \
{ \
static T read(const Store & store, CommonProto::ReadConn conn); \
static void write(const Store & store, CommonProto::WriteConn conn, const T & str); \
}
template<>
DECLARE_COMMON_SERIALISER(std::string);
template<>
DECLARE_COMMON_SERIALISER(StorePath);
template<>
DECLARE_COMMON_SERIALISER(ContentAddress);
template<>
DECLARE_COMMON_SERIALISER(DrvOutput);
template<>
DECLARE_COMMON_SERIALISER(Realisation);
template<typename T>
DECLARE_COMMON_SERIALISER(std::vector<T>);
template<typename T>
DECLARE_COMMON_SERIALISER(std::set<T>);
template<typename... Ts>
DECLARE_COMMON_SERIALISER(std::tuple<Ts...>);
#define COMMA_ ,
template<typename K, typename V>
DECLARE_COMMON_SERIALISER(std::map<K COMMA_ V>);
#undef COMMA_
/**
* These use the empty string for the null case, relying on the fact
* that the underlying types never serialize to the empty string.
*
* We do this instead of a generic std::optional<T> instance because
* ordinal tags (0 or 1, here) are a bit of a compatability hazard. For
* the same reason, we don't have a std::variant<T..> instances (ordinal
* tags 0...n).
*
* We could the generic instances and then these as specializations for
* compatability, but that's proven a bit finnicky, and also makes the
* worker protocol harder to implement in other languages where such
* specializations may not be allowed.
*/
template<>
DECLARE_COMMON_SERIALISER(std::optional<StorePath>);
template<>
DECLARE_COMMON_SERIALISER(std::optional<ContentAddress>);
}

View file

@ -4,8 +4,8 @@
#include "globals.hh" #include "globals.hh"
#include "util.hh" #include "util.hh"
#include "split.hh" #include "split.hh"
#include "worker-protocol.hh" #include "common-protocol.hh"
#include "worker-protocol-impl.hh" #include "common-protocol-impl.hh"
#include "fs-accessor.hh" #include "fs-accessor.hh"
#include <boost/container/small_vector.hpp> #include <boost/container/small_vector.hpp>
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
@ -895,8 +895,8 @@ Source & readDerivation(Source & in, const Store & store, BasicDerivation & drv,
drv.outputs.emplace(std::move(name), std::move(output)); drv.outputs.emplace(std::move(name), std::move(output));
} }
drv.inputSrcs = WorkerProto::Serialise<StorePathSet>::read(store, drv.inputSrcs = CommonProto::Serialise<StorePathSet>::read(store,
WorkerProto::ReadConn { .from = in }); CommonProto::ReadConn { .from = in });
in >> drv.platform >> drv.builder; in >> drv.platform >> drv.builder;
drv.args = readStrings<Strings>(in); drv.args = readStrings<Strings>(in);
@ -944,8 +944,8 @@ void writeDerivation(Sink & out, const Store & store, const BasicDerivation & dr
}, },
}, i.second.raw); }, i.second.raw);
} }
WorkerProto::write(store, CommonProto::write(store,
WorkerProto::WriteConn { .to = out }, CommonProto::WriteConn { .to = out },
drv.inputSrcs); drv.inputSrcs);
out << drv.platform << drv.builder << drv.args; out << drv.platform << drv.builder << drv.args;
out << drv.env.size(); out << drv.env.size();

View file

@ -1,8 +1,8 @@
#include "serialise.hh" #include "serialise.hh"
#include "store-api.hh" #include "store-api.hh"
#include "archive.hh" #include "archive.hh"
#include "worker-protocol.hh" #include "common-protocol.hh"
#include "worker-protocol-impl.hh" #include "common-protocol-impl.hh"
#include <algorithm> #include <algorithm>
@ -46,8 +46,8 @@ void Store::exportPath(const StorePath & path, Sink & sink)
teeSink teeSink
<< exportMagic << exportMagic
<< printStorePath(path); << printStorePath(path);
WorkerProto::write(*this, CommonProto::write(*this,
WorkerProto::WriteConn { .to = teeSink }, CommonProto::WriteConn { .to = teeSink },
info->references); info->references);
teeSink teeSink
<< (info->deriver ? printStorePath(*info->deriver) : "") << (info->deriver ? printStorePath(*info->deriver) : "")
@ -76,8 +76,8 @@ StorePaths Store::importPaths(Source & source, CheckSigsFlag checkSigs)
//Activity act(*logger, lvlInfo, "importing path '%s'", info.path); //Activity act(*logger, lvlInfo, "importing path '%s'", info.path);
auto references = WorkerProto::Serialise<StorePathSet>::read(*this, auto references = CommonProto::Serialise<StorePathSet>::read(*this,
WorkerProto::ReadConn { .from = source }); CommonProto::ReadConn { .from = source });
auto deriver = readString(source); auto deriver = readString(source);
auto narHash = hashString(htSHA256, saved.s); auto narHash = hashString(htSHA256, saved.s);

View file

@ -6,8 +6,8 @@
#include "build-result.hh" #include "build-result.hh"
#include "store-api.hh" #include "store-api.hh"
#include "path-with-outputs.hh" #include "path-with-outputs.hh"
#include "worker-protocol.hh" #include "common-protocol.hh"
#include "worker-protocol-impl.hh" #include "common-protocol-impl.hh"
#include "ssh.hh" #include "ssh.hh"
#include "derivations.hh" #include "derivations.hh"
#include "callback.hh" #include "callback.hh"
@ -50,37 +50,37 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
bool good = true; bool good = true;
/** /**
* Coercion to `WorkerProto::ReadConn`. This makes it easy to use the * Coercion to `CommonProto::ReadConn`. This makes it easy to use the
* factored out worker protocol searlizers with a * factored out common protocol serialisers with a
* `LegacySSHStore::Connection`. * `LegacySSHStore::Connection`.
* *
* The worker protocol connection types are unidirectional, unlike * The common protocol connection types are unidirectional, unlike
* this type. * this type.
* *
* @todo Use server protocol serializers, not worker protocol * @todo Use server protocol serializers, not common protocol
* serializers, once we have made that distiction. * serializers, once we have made that distiction.
*/ */
operator WorkerProto::ReadConn () operator CommonProto::ReadConn ()
{ {
return WorkerProto::ReadConn { return CommonProto::ReadConn {
.from = from, .from = from,
}; };
} }
/* /*
* Coercion to `WorkerProto::WriteConn`. This makes it easy to use the * Coercion to `CommonProto::WriteConn`. This makes it easy to use the
* factored out worker protocol searlizers with a * factored out common protocol searlizers with a
* `LegacySSHStore::Connection`. * `LegacySSHStore::Connection`.
* *
* The worker protocol connection types are unidirectional, unlike * The common protocol connection types are unidirectional, unlike
* this type. * this type.
* *
* @todo Use server protocol serializers, not worker protocol * @todo Use server protocol serializers, not common protocol
* serializers, once we have made that distiction. * serializers, once we have made that distiction.
*/ */
operator WorkerProto::WriteConn () operator CommonProto::WriteConn ()
{ {
return WorkerProto::WriteConn { return CommonProto::WriteConn {
.to = to, .to = to,
}; };
} }
@ -183,7 +183,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
auto deriver = readString(conn->from); auto deriver = readString(conn->from);
if (deriver != "") if (deriver != "")
info->deriver = parseStorePath(deriver); info->deriver = parseStorePath(deriver);
info->references = WorkerProto::Serialise<StorePathSet>::read(*this, *conn); info->references = CommonProto::Serialise<StorePathSet>::read(*this, *conn);
readLongLong(conn->from); // download size readLongLong(conn->from); // download size
info->narSize = readLongLong(conn->from); info->narSize = readLongLong(conn->from);
@ -217,7 +217,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
<< printStorePath(info.path) << printStorePath(info.path)
<< (info.deriver ? printStorePath(*info.deriver) : "") << (info.deriver ? printStorePath(*info.deriver) : "")
<< info.narHash.to_string(Base16, false); << info.narHash.to_string(Base16, false);
WorkerProto::write(*this, *conn, info.references); CommonProto::write(*this, *conn, info.references);
conn->to conn->to
<< info.registrationTime << info.registrationTime
<< info.narSize << info.narSize
@ -246,7 +246,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
conn->to conn->to
<< exportMagic << exportMagic
<< printStorePath(info.path); << printStorePath(info.path);
WorkerProto::write(*this, *conn, info.references); CommonProto::write(*this, *conn, info.references);
conn->to conn->to
<< (info.deriver ? printStorePath(*info.deriver) : "") << (info.deriver ? printStorePath(*info.deriver) : "")
<< 0 << 0
@ -331,7 +331,7 @@ public:
if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 3) if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 3)
conn->from >> status.timesBuilt >> status.isNonDeterministic >> status.startTime >> status.stopTime; conn->from >> status.timesBuilt >> status.isNonDeterministic >> status.startTime >> status.stopTime;
if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 6) { if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 6) {
auto builtOutputs = WorkerProto::Serialise<DrvOutputs>::read(*this, *conn); auto builtOutputs = CommonProto::Serialise<DrvOutputs>::read(*this, *conn);
for (auto && [output, realisation] : builtOutputs) for (auto && [output, realisation] : builtOutputs)
status.builtOutputs.insert_or_assign( status.builtOutputs.insert_or_assign(
std::move(output.outputName), std::move(output.outputName),
@ -409,10 +409,10 @@ public:
conn->to conn->to
<< ServeProto::Command::QueryClosure << ServeProto::Command::QueryClosure
<< includeOutputs; << includeOutputs;
WorkerProto::write(*this, *conn, paths); CommonProto::write(*this, *conn, paths);
conn->to.flush(); conn->to.flush();
for (auto & i : WorkerProto::Serialise<StorePathSet>::read(*this, *conn)) for (auto & i : CommonProto::Serialise<StorePathSet>::read(*this, *conn))
out.insert(i); out.insert(i);
} }
@ -425,10 +425,10 @@ public:
<< ServeProto::Command::QueryValidPaths << ServeProto::Command::QueryValidPaths
<< false // lock << false // lock
<< maybeSubstitute; << maybeSubstitute;
WorkerProto::write(*this, *conn, paths); CommonProto::write(*this, *conn, paths);
conn->to.flush(); conn->to.flush();
return WorkerProto::Serialise<StorePathSet>::read(*this, *conn); return CommonProto::Serialise<StorePathSet>::read(*this, *conn);
} }
void connect() override void connect() override

View file

@ -0,0 +1,162 @@
#pragma once
/**
* @file Reusable serialisers for serialization container types in a
* length-prefixed manner.
*
* Used by both the Worker and Serve protocols.
*/
#include "types.hh"
namespace nix {
class Store;
/**
* Reusable serialisers for serialization container types in a
* length-prefixed manner.
*
* @param T The type of the collection being serialised
*
* @param Inner This the most important parameter; this is the "inner"
* protocol. The user of this will substitute `MyProtocol` or similar
* when making a `MyProtocol::Serialiser<Collection<T>>`. Note that the
* inside is allowed to call to call `Inner::Serialiser` on different
* types. This is especially important for `std::map` which doesn't have
* a single `T` but one `K` and one `V`.
*/
template<class Inner, typename T>
struct LengthPrefixedProtoHelper;
/*!
* \typedef LengthPrefixedProtoHelper::S
*
* Read this as simply `using S = Inner::Serialise;`.
*
* It would be nice to use that directly, but C++ doesn't seem to allow
* it. The `typename` keyword needed to refer to `Inner` seems to greedy
* (low precedence), and then C++ complains that `Serialise` is not a
* type parameter but a real type.
*
* Making this `S` alias seems to be the only way to avoid these issues.
*/
#define LENGTH_PREFIXED_PROTO_HELPER(Inner, T) \
struct LengthPrefixedProtoHelper< Inner, T > \
{ \
static T read(const Store & store, typename Inner::ReadConn conn); \
static void write(const Store & store, typename Inner::WriteConn conn, const T & str); \
private: \
template<typename U> using S = typename Inner::template Serialise<U>; \
}
template<class Inner, typename T>
LENGTH_PREFIXED_PROTO_HELPER(Inner, std::vector<T>);
template<class Inner, typename T>
LENGTH_PREFIXED_PROTO_HELPER(Inner, std::set<T>);
template<class Inner, typename... Ts>
LENGTH_PREFIXED_PROTO_HELPER(Inner, std::tuple<Ts...>);
template<class Inner, typename K, typename V>
#define _X std::map<K, V>
LENGTH_PREFIXED_PROTO_HELPER(Inner, _X);
#undef _X
template<class Inner, typename T>
std::vector<T>
LengthPrefixedProtoHelper<Inner, std::vector<T>>::read(
const Store & store, typename Inner::ReadConn conn)
{
std::vector<T> resSet;
auto size = readNum<size_t>(conn.from);
while (size--) {
resSet.push_back(S<T>::read(store, conn));
}
return resSet;
}
template<class Inner, typename T>
void
LengthPrefixedProtoHelper<Inner, std::vector<T>>::write(
const Store & store, typename Inner::WriteConn conn, const std::vector<T> & resSet)
{
conn.to << resSet.size();
for (auto & key : resSet) {
S<T>::write(store, conn, key);
}
}
template<class Inner, typename T>
std::set<T>
LengthPrefixedProtoHelper<Inner, std::set<T>>::read(
const Store & store, typename Inner::ReadConn conn)
{
std::set<T> resSet;
auto size = readNum<size_t>(conn.from);
while (size--) {
resSet.insert(S<T>::read(store, conn));
}
return resSet;
}
template<class Inner, typename T>
void
LengthPrefixedProtoHelper<Inner, std::set<T>>::write(
const Store & store, typename Inner::WriteConn conn, const std::set<T> & resSet)
{
conn.to << resSet.size();
for (auto & key : resSet) {
S<T>::write(store, conn, key);
}
}
template<class Inner, typename K, typename V>
std::map<K, V>
LengthPrefixedProtoHelper<Inner, std::map<K, V>>::read(
const Store & store, typename Inner::ReadConn conn)
{
std::map<K, V> resMap;
auto size = readNum<size_t>(conn.from);
while (size--) {
auto k = S<K>::read(store, conn);
auto v = S<V>::read(store, conn);
resMap.insert_or_assign(std::move(k), std::move(v));
}
return resMap;
}
template<class Inner, typename K, typename V>
void
LengthPrefixedProtoHelper<Inner, std::map<K, V>>::write(
const Store & store, typename Inner::WriteConn conn, const std::map<K, V> & resMap)
{
conn.to << resMap.size();
for (auto & i : resMap) {
S<K>::write(store, conn, i.first);
S<V>::write(store, conn, i.second);
}
}
template<class Inner, typename... Ts>
std::tuple<Ts...>
LengthPrefixedProtoHelper<Inner, std::tuple<Ts...>>::read(
const Store & store, typename Inner::ReadConn conn)
{
return std::tuple<Ts...> {
S<Ts>::read(store, conn)...,
};
}
template<class Inner, typename... Ts>
void
LengthPrefixedProtoHelper<Inner, std::tuple<Ts...>>::write(
const Store & store, typename Inner::WriteConn conn, const std::tuple<Ts...> & res)
{
std::apply([&]<typename... Us>(const Us &... args) {
(S<Us>::write(store, conn, args), ...);
}, res);
}
}

View file

@ -9,86 +9,51 @@
*/ */
#include "worker-protocol.hh" #include "worker-protocol.hh"
#include "length-prefixed-protocol-helper.hh"
namespace nix { namespace nix {
/* protocol-agnostic templates */
#define WORKER_USE_LENGTH_PREFIX_SERIALISER(TEMPLATE, T) \
TEMPLATE T WorkerProto::Serialise< T >::read(const Store & store, WorkerProto::ReadConn conn) \
{ \
return LengthPrefixedProtoHelper<WorkerProto, T >::read(store, conn); \
} \
TEMPLATE void WorkerProto::Serialise< T >::write(const Store & store, WorkerProto::WriteConn conn, const T & t) \
{ \
LengthPrefixedProtoHelper<WorkerProto, T >::write(store, conn, t); \
}
WORKER_USE_LENGTH_PREFIX_SERIALISER(template<typename T>, std::vector<T>)
WORKER_USE_LENGTH_PREFIX_SERIALISER(template<typename T>, std::set<T>)
WORKER_USE_LENGTH_PREFIX_SERIALISER(template<typename... Ts>, std::tuple<Ts...>)
#define COMMA_ ,
WORKER_USE_LENGTH_PREFIX_SERIALISER(
template<typename K COMMA_ typename V>,
std::map<K COMMA_ V>)
#undef COMMA_
/**
* Use `CommonProto` where possible.
*/
template<typename T> template<typename T>
std::vector<T> WorkerProto::Serialise<std::vector<T>>::read(const Store & store, WorkerProto::ReadConn conn) struct WorkerProto::Serialise
{ {
std::vector<T> resSet; static T read(const Store & store, WorkerProto::ReadConn conn)
auto size = readNum<size_t>(conn.from); {
while (size--) { return CommonProto::Serialise<T>::read(store,
resSet.push_back(WorkerProto::Serialise<T>::read(store, conn)); CommonProto::ReadConn { .from = conn.from });
} }
return resSet; static void write(const Store & store, WorkerProto::WriteConn conn, const T & t)
} {
CommonProto::Serialise<T>::write(store,
template<typename T> CommonProto::WriteConn { .to = conn.to },
void WorkerProto::Serialise<std::vector<T>>::write(const Store & store, WorkerProto::WriteConn conn, const std::vector<T> & resSet) t);
{
conn.to << resSet.size();
for (auto & key : resSet) {
WorkerProto::Serialise<T>::write(store, conn, key);
} }
} };
template<typename T> /* protocol-specific templates */
std::set<T> WorkerProto::Serialise<std::set<T>>::read(const Store & store, WorkerProto::ReadConn conn)
{
std::set<T> resSet;
auto size = readNum<size_t>(conn.from);
while (size--) {
resSet.insert(WorkerProto::Serialise<T>::read(store, conn));
}
return resSet;
}
template<typename T>
void WorkerProto::Serialise<std::set<T>>::write(const Store & store, WorkerProto::WriteConn conn, const std::set<T> & resSet)
{
conn.to << resSet.size();
for (auto & key : resSet) {
WorkerProto::Serialise<T>::write(store, conn, key);
}
}
template<typename K, typename V>
std::map<K, V> WorkerProto::Serialise<std::map<K, V>>::read(const Store & store, WorkerProto::ReadConn conn)
{
std::map<K, V> resMap;
auto size = readNum<size_t>(conn.from);
while (size--) {
auto k = WorkerProto::Serialise<K>::read(store, conn);
auto v = WorkerProto::Serialise<V>::read(store, conn);
resMap.insert_or_assign(std::move(k), std::move(v));
}
return resMap;
}
template<typename K, typename V>
void WorkerProto::Serialise<std::map<K, V>>::write(const Store & store, WorkerProto::WriteConn conn, const std::map<K, V> & resMap)
{
conn.to << resMap.size();
for (auto & i : resMap) {
WorkerProto::Serialise<K>::write(store, conn, i.first);
WorkerProto::Serialise<V>::write(store, conn, i.second);
}
}
template<typename... Ts>
std::tuple<Ts...> WorkerProto::Serialise<std::tuple<Ts...>>::read(const Store & store, WorkerProto::ReadConn conn)
{
return std::tuple<Ts...> {
WorkerProto::Serialise<Ts>::read(store, conn)...,
};
}
template<typename... Ts>
void WorkerProto::Serialise<std::tuple<Ts...>>::write(const Store & store, WorkerProto::WriteConn conn, const std::tuple<Ts...> & res)
{
std::apply([&]<typename... Us>(const Us &... args) {
(WorkerProto::Serialise<Us>::write(store, conn, args), ...);
}, res);
}
} }

View file

@ -12,27 +12,7 @@
namespace nix { namespace nix {
std::string WorkerProto::Serialise<std::string>::read(const Store & store, WorkerProto::ReadConn conn) /* protocol-specific definitions */
{
return readString(conn.from);
}
void WorkerProto::Serialise<std::string>::write(const Store & store, WorkerProto::WriteConn conn, const std::string & str)
{
conn.to << str;
}
StorePath WorkerProto::Serialise<StorePath>::read(const Store & store, WorkerProto::ReadConn conn)
{
return store.parseStorePath(readString(conn.from));
}
void WorkerProto::Serialise<StorePath>::write(const Store & store, WorkerProto::WriteConn conn, const StorePath & storePath)
{
conn.to << store.printStorePath(storePath);
}
std::optional<TrustedFlag> WorkerProto::Serialise<std::optional<TrustedFlag>>::read(const Store & store, WorkerProto::ReadConn conn) std::optional<TrustedFlag> WorkerProto::Serialise<std::optional<TrustedFlag>>::read(const Store & store, WorkerProto::ReadConn conn)
{ {
@ -68,17 +48,6 @@ void WorkerProto::Serialise<std::optional<TrustedFlag>>::write(const Store & sto
} }
ContentAddress WorkerProto::Serialise<ContentAddress>::read(const Store & store, WorkerProto::ReadConn conn)
{
return ContentAddress::parse(readString(conn.from));
}
void WorkerProto::Serialise<ContentAddress>::write(const Store & store, WorkerProto::WriteConn conn, const ContentAddress & ca)
{
conn.to << renderContentAddress(ca);
}
DerivedPath WorkerProto::Serialise<DerivedPath>::read(const Store & store, WorkerProto::ReadConn conn) DerivedPath WorkerProto::Serialise<DerivedPath>::read(const Store & store, WorkerProto::ReadConn conn)
{ {
auto s = readString(conn.from); auto s = readString(conn.from);
@ -91,32 +60,6 @@ void WorkerProto::Serialise<DerivedPath>::write(const Store & store, WorkerProto
} }
Realisation WorkerProto::Serialise<Realisation>::read(const Store & store, WorkerProto::ReadConn conn)
{
std::string rawInput = readString(conn.from);
return Realisation::fromJSON(
nlohmann::json::parse(rawInput),
"remote-protocol"
);
}
void WorkerProto::Serialise<Realisation>::write(const Store & store, WorkerProto::WriteConn conn, const Realisation & realisation)
{
conn.to << realisation.toJSON().dump();
}
DrvOutput WorkerProto::Serialise<DrvOutput>::read(const Store & store, WorkerProto::ReadConn conn)
{
return DrvOutput::parse(readString(conn.from));
}
void WorkerProto::Serialise<DrvOutput>::write(const Store & store, WorkerProto::WriteConn conn, const DrvOutput & drvOutput)
{
conn.to << drvOutput.to_string();
}
KeyedBuildResult WorkerProto::Serialise<KeyedBuildResult>::read(const Store & store, WorkerProto::ReadConn conn) KeyedBuildResult WorkerProto::Serialise<KeyedBuildResult>::read(const Store & store, WorkerProto::ReadConn conn)
{ {
auto path = WorkerProto::Serialise<DerivedPath>::read(store, conn); auto path = WorkerProto::Serialise<DerivedPath>::read(store, conn);
@ -168,26 +111,4 @@ void WorkerProto::Serialise<BuildResult>::write(const Store & store, WorkerProto
} }
std::optional<StorePath> WorkerProto::Serialise<std::optional<StorePath>>::read(const Store & store, WorkerProto::ReadConn conn)
{
auto s = readString(conn.from);
return s == "" ? std::optional<StorePath> {} : store.parseStorePath(s);
}
void WorkerProto::Serialise<std::optional<StorePath>>::write(const Store & store, WorkerProto::WriteConn conn, const std::optional<StorePath> & storePathOpt)
{
conn.to << (storePathOpt ? store.printStorePath(*storePathOpt) : "");
}
std::optional<ContentAddress> WorkerProto::Serialise<std::optional<ContentAddress>>::read(const Store & store, WorkerProto::ReadConn conn)
{
return ContentAddress::parseOpt(readString(conn.from));
}
void WorkerProto::Serialise<std::optional<ContentAddress>>::write(const Store & store, WorkerProto::WriteConn conn, const std::optional<ContentAddress> & caOpt)
{
conn.to << (caOpt ? renderContentAddress(*caOpt) : "");
}
} }

View file

@ -1,7 +1,7 @@
#pragma once #pragma once
///@file ///@file
#include "serialise.hh" #include "common-protocol.hh"
namespace nix { namespace nix {
@ -28,11 +28,7 @@ class Store;
struct Source; struct Source;
// items being serialised // items being serialised
class StorePath;
struct ContentAddress;
struct DerivedPath; struct DerivedPath;
struct DrvOutput;
struct Realisation;
struct BuildResult; struct BuildResult;
struct KeyedBuildResult; struct KeyedBuildResult;
enum TrustedFlag : bool; enum TrustedFlag : bool;
@ -193,60 +189,32 @@ inline std::ostream & operator << (std::ostream & s, WorkerProto::Op op)
* be legal specialization syntax. See below for what that looks like in * be legal specialization syntax. See below for what that looks like in
* practice. * practice.
*/ */
#define MAKE_WORKER_PROTO(T) \ #define DECLARE_WORKER_SERIALISER(T) \
struct WorkerProto::Serialise< T > { \ struct WorkerProto::Serialise< T > \
{ \
static T read(const Store & store, WorkerProto::ReadConn conn); \ static T read(const Store & store, WorkerProto::ReadConn conn); \
static void write(const Store & store, WorkerProto::WriteConn conn, const T & t); \ static void write(const Store & store, WorkerProto::WriteConn conn, const T & t); \
}; };
template<> template<>
MAKE_WORKER_PROTO(std::string); DECLARE_WORKER_SERIALISER(DerivedPath);
template<> template<>
MAKE_WORKER_PROTO(StorePath); DECLARE_WORKER_SERIALISER(BuildResult);
template<> template<>
MAKE_WORKER_PROTO(ContentAddress); DECLARE_WORKER_SERIALISER(KeyedBuildResult);
template<> template<>
MAKE_WORKER_PROTO(DerivedPath); DECLARE_WORKER_SERIALISER(std::optional<TrustedFlag>);
template<>
MAKE_WORKER_PROTO(DrvOutput);
template<>
MAKE_WORKER_PROTO(Realisation);
template<>
MAKE_WORKER_PROTO(BuildResult);
template<>
MAKE_WORKER_PROTO(KeyedBuildResult);
template<>
MAKE_WORKER_PROTO(std::optional<TrustedFlag>);
template<typename T> template<typename T>
MAKE_WORKER_PROTO(std::vector<T>); DECLARE_WORKER_SERIALISER(std::vector<T>);
template<typename T> template<typename T>
MAKE_WORKER_PROTO(std::set<T>); DECLARE_WORKER_SERIALISER(std::set<T>);
template<typename... Ts> template<typename... Ts>
MAKE_WORKER_PROTO(std::tuple<Ts...>); DECLARE_WORKER_SERIALISER(std::tuple<Ts...>);
#define COMMA_ ,
template<typename K, typename V> template<typename K, typename V>
#define X_ std::map<K, V> DECLARE_WORKER_SERIALISER(std::map<K COMMA_ V>);
MAKE_WORKER_PROTO(X_); #undef COMMA_
#undef X_
/**
* These use the empty string for the null case, relying on the fact
* that the underlying types never serialise to the empty string.
*
* We do this instead of a generic std::optional<T> instance because
* ordinal tags (0 or 1, here) are a bit of a compatability hazard. For
* the same reason, we don't have a std::variant<T..> instances (ordinal
* tags 0...n).
*
* We could the generic instances and then these as specializations for
* compatability, but that's proven a bit finnicky, and also makes the
* worker protocol harder to implement in other languages where such
* specializations may not be allowed.
*/
template<>
MAKE_WORKER_PROTO(std::optional<StorePath>);
template<>
MAKE_WORKER_PROTO(std::optional<ContentAddress>);
} }

View file

@ -11,8 +11,8 @@
#include "serve-protocol.hh" #include "serve-protocol.hh"
#include "shared.hh" #include "shared.hh"
#include "util.hh" #include "util.hh"
#include "worker-protocol.hh" #include "common-protocol.hh"
#include "worker-protocol-impl.hh" #include "common-protocol-impl.hh"
#include "graphml.hh" #include "graphml.hh"
#include "legacy.hh" #include "legacy.hh"
#include "path-with-outputs.hh" #include "path-with-outputs.hh"
@ -821,8 +821,8 @@ static void opServe(Strings opFlags, Strings opArgs)
out.flush(); out.flush();
unsigned int clientVersion = readInt(in); unsigned int clientVersion = readInt(in);
WorkerProto::ReadConn rconn { .from = in }; CommonProto::ReadConn rconn { .from = in };
WorkerProto::WriteConn wconn { .to = out }; CommonProto::WriteConn wconn { .to = out };
auto getBuildSettings = [&]() { auto getBuildSettings = [&]() {
// FIXME: changing options here doesn't work if we're // FIXME: changing options here doesn't work if we're
@ -867,7 +867,7 @@ static void opServe(Strings opFlags, Strings opArgs)
case ServeProto::Command::QueryValidPaths: { case ServeProto::Command::QueryValidPaths: {
bool lock = readInt(in); bool lock = readInt(in);
bool substitute = readInt(in); bool substitute = readInt(in);
auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn); auto paths = CommonProto::Serialise<StorePathSet>::read(*store, rconn);
if (lock && writeAllowed) if (lock && writeAllowed)
for (auto & path : paths) for (auto & path : paths)
store->addTempRoot(path); store->addTempRoot(path);
@ -876,19 +876,19 @@ static void opServe(Strings opFlags, Strings opArgs)
store->substitutePaths(paths); store->substitutePaths(paths);
} }
WorkerProto::write(*store, wconn, store->queryValidPaths(paths)); CommonProto::write(*store, wconn, store->queryValidPaths(paths));
break; break;
} }
case ServeProto::Command::QueryPathInfos: { case ServeProto::Command::QueryPathInfos: {
auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn); auto paths = CommonProto::Serialise<StorePathSet>::read(*store, rconn);
// !!! Maybe we want a queryPathInfos? // !!! Maybe we want a queryPathInfos?
for (auto & i : paths) { for (auto & i : paths) {
try { try {
auto info = store->queryPathInfo(i); auto info = store->queryPathInfo(i);
out << store->printStorePath(info->path) out << store->printStorePath(info->path)
<< (info->deriver ? store->printStorePath(*info->deriver) : ""); << (info->deriver ? store->printStorePath(*info->deriver) : "");
WorkerProto::write(*store, wconn, info->references); CommonProto::write(*store, wconn, info->references);
// !!! Maybe we want compression? // !!! Maybe we want compression?
out << info->narSize // downloadSize out << info->narSize // downloadSize
<< info->narSize; << info->narSize;
@ -916,7 +916,7 @@ static void opServe(Strings opFlags, Strings opArgs)
case ServeProto::Command::ExportPaths: { case ServeProto::Command::ExportPaths: {
readInt(in); // obsolete readInt(in); // obsolete
store->exportPaths(WorkerProto::Serialise<StorePathSet>::read(*store, rconn), out); store->exportPaths(CommonProto::Serialise<StorePathSet>::read(*store, rconn), out);
break; break;
} }
@ -962,7 +962,7 @@ static void opServe(Strings opFlags, Strings opArgs)
DrvOutputs builtOutputs; DrvOutputs builtOutputs;
for (auto & [output, realisation] : status.builtOutputs) for (auto & [output, realisation] : status.builtOutputs)
builtOutputs.insert_or_assign(realisation.id, realisation); builtOutputs.insert_or_assign(realisation.id, realisation);
WorkerProto::write(*store, wconn, builtOutputs); CommonProto::write(*store, wconn, builtOutputs);
} }
break; break;
@ -971,9 +971,9 @@ static void opServe(Strings opFlags, Strings opArgs)
case ServeProto::Command::QueryClosure: { case ServeProto::Command::QueryClosure: {
bool includeOutputs = readInt(in); bool includeOutputs = readInt(in);
StorePathSet closure; StorePathSet closure;
store->computeFSClosure(WorkerProto::Serialise<StorePathSet>::read(*store, rconn), store->computeFSClosure(CommonProto::Serialise<StorePathSet>::read(*store, rconn),
closure, false, includeOutputs); closure, false, includeOutputs);
WorkerProto::write(*store, wconn, closure); CommonProto::write(*store, wconn, closure);
break; break;
} }
@ -988,7 +988,7 @@ static void opServe(Strings opFlags, Strings opArgs)
}; };
if (deriver != "") if (deriver != "")
info.deriver = store->parseStorePath(deriver); info.deriver = store->parseStorePath(deriver);
info.references = WorkerProto::Serialise<StorePathSet>::read(*store, rconn); info.references = CommonProto::Serialise<StorePathSet>::read(*store, rconn);
in >> info.registrationTime >> info.narSize >> info.ultimate; in >> info.registrationTime >> info.narSize >> info.ultimate;
info.sigs = readStrings<StringSet>(in); info.sigs = readStrings<StringSet>(in);
info.ca = ContentAddress::parseOpt(readString(in)); info.ca = ContentAddress::parseOpt(readString(in));