Introduce separate Serve protocol serialisers

To start, it is just a clone of the common protocol. But now that we
have the separate protocol implementations, we can add versioning
information without the versions of one protocol leaking into another.

Using the infrastructure from the previous commit, we don't have to
duplicate code for shared behavior.

Motivation: No more perverse incentives. [0] did some awkward things
because the serialisers did not store the version. I don't want anyone
making changes to be pushed towards keeping the serialization logic with
the core data types just because it's easier or the alternative is
tedious.

The actual versioning of the Worker and Serve protocol serialisers
(Common remains unversioned as the underlying mini-protocols are not
versioned) will happen in subsequent commits / PRs.

[0]: fe1f34fa60
This commit is contained in:
John Ericson 2023-05-26 14:11:08 -04:00
parent c7f1d86b80
commit f7b8f8aff6
15 changed files with 344 additions and 39 deletions

View file

@ -3,11 +3,10 @@
#include "pool.hh" #include "pool.hh"
#include "remote-store.hh" #include "remote-store.hh"
#include "serve-protocol.hh" #include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "build-result.hh" #include "build-result.hh"
#include "store-api.hh" #include "store-api.hh"
#include "path-with-outputs.hh" #include "path-with-outputs.hh"
#include "common-protocol.hh"
#include "common-protocol-impl.hh"
#include "ssh.hh" #include "ssh.hh"
#include "derivations.hh" #include "derivations.hh"
#include "callback.hh" #include "callback.hh"
@ -50,37 +49,31 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
bool good = true; bool good = true;
/** /**
* Coercion to `CommonProto::ReadConn`. This makes it easy to use the * Coercion to `ServeProto::ReadConn`. This makes it easy to use the
* factored out common protocol serialisers with a * factored out serve protocol searlizers with a
* `LegacySSHStore::Connection`. * `LegacySSHStore::Connection`.
* *
* The common protocol connection types are unidirectional, unlike * The serve protocol connection types are unidirectional, unlike
* this type. * this type.
*
* @todo Use server protocol serializers, not common protocol
* serializers, once we have made that distiction.
*/ */
operator CommonProto::ReadConn () operator ServeProto::ReadConn ()
{ {
return CommonProto::ReadConn { return ServeProto::ReadConn {
.from = from, .from = from,
}; };
} }
/* /*
* Coercion to `CommonProto::WriteConn`. This makes it easy to use the * Coercion to `ServeProto::WriteConn`. This makes it easy to use the
* factored out common protocol searlizers with a * factored out serve protocol searlizers with a
* `LegacySSHStore::Connection`. * `LegacySSHStore::Connection`.
* *
* The common protocol connection types are unidirectional, unlike * The serve protocol connection types are unidirectional, unlike
* this type. * this type.
*
* @todo Use server protocol serializers, not common protocol
* serializers, once we have made that distiction.
*/ */
operator CommonProto::WriteConn () operator ServeProto::WriteConn ()
{ {
return CommonProto::WriteConn { return ServeProto::WriteConn {
.to = to, .to = to,
}; };
} }
@ -183,7 +176,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
auto deriver = readString(conn->from); auto deriver = readString(conn->from);
if (deriver != "") if (deriver != "")
info->deriver = parseStorePath(deriver); info->deriver = parseStorePath(deriver);
info->references = CommonProto::Serialise<StorePathSet>::read(*this, *conn); info->references = ServeProto::Serialise<StorePathSet>::read(*this, *conn);
readLongLong(conn->from); // download size readLongLong(conn->from); // download size
info->narSize = readLongLong(conn->from); info->narSize = readLongLong(conn->from);
@ -217,7 +210,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
<< printStorePath(info.path) << printStorePath(info.path)
<< (info.deriver ? printStorePath(*info.deriver) : "") << (info.deriver ? printStorePath(*info.deriver) : "")
<< info.narHash.to_string(Base16, false); << info.narHash.to_string(Base16, false);
CommonProto::write(*this, *conn, info.references); ServeProto::write(*this, *conn, info.references);
conn->to conn->to
<< info.registrationTime << info.registrationTime
<< info.narSize << info.narSize
@ -246,7 +239,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
conn->to conn->to
<< exportMagic << exportMagic
<< printStorePath(info.path); << printStorePath(info.path);
CommonProto::write(*this, *conn, info.references); ServeProto::write(*this, *conn, info.references);
conn->to conn->to
<< (info.deriver ? printStorePath(*info.deriver) : "") << (info.deriver ? printStorePath(*info.deriver) : "")
<< 0 << 0
@ -331,7 +324,7 @@ public:
if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 3) if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 3)
conn->from >> status.timesBuilt >> status.isNonDeterministic >> status.startTime >> status.stopTime; conn->from >> status.timesBuilt >> status.isNonDeterministic >> status.startTime >> status.stopTime;
if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 6) { if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 6) {
auto builtOutputs = CommonProto::Serialise<DrvOutputs>::read(*this, *conn); auto builtOutputs = ServeProto::Serialise<DrvOutputs>::read(*this, *conn);
for (auto && [output, realisation] : builtOutputs) for (auto && [output, realisation] : builtOutputs)
status.builtOutputs.insert_or_assign( status.builtOutputs.insert_or_assign(
std::move(output.outputName), std::move(output.outputName),
@ -409,10 +402,10 @@ public:
conn->to conn->to
<< ServeProto::Command::QueryClosure << ServeProto::Command::QueryClosure
<< includeOutputs; << includeOutputs;
CommonProto::write(*this, *conn, paths); ServeProto::write(*this, *conn, paths);
conn->to.flush(); conn->to.flush();
for (auto & i : CommonProto::Serialise<StorePathSet>::read(*this, *conn)) for (auto & i : ServeProto::Serialise<StorePathSet>::read(*this, *conn))
out.insert(i); out.insert(i);
} }
@ -425,10 +418,10 @@ public:
<< ServeProto::Command::QueryValidPaths << ServeProto::Command::QueryValidPaths
<< false // lock << false // lock
<< maybeSubstitute; << maybeSubstitute;
CommonProto::write(*this, *conn, paths); ServeProto::write(*this, *conn, paths);
conn->to.flush(); conn->to.flush();
return CommonProto::Serialise<StorePathSet>::read(*this, *conn); return ServeProto::Serialise<StorePathSet>::read(*this, *conn);
} }
void connect() override void connect() override

View file

@ -0,0 +1,59 @@
#pragma once
/**
* @file
*
* Template implementations (as opposed to mere declarations).
*
* This file is an exmample of the "impl.hh" pattern. See the
* contributing guide.
*/
#include "serve-protocol.hh"
#include "length-prefixed-protocol-helper.hh"
namespace nix {
/* protocol-agnostic templates */
#define SERVE_USE_LENGTH_PREFIX_SERIALISER(TEMPLATE, T) \
TEMPLATE T ServeProto::Serialise< T >::read(const Store & store, ServeProto::ReadConn conn) \
{ \
return LengthPrefixedProtoHelper<ServeProto, T >::read(store, conn); \
} \
TEMPLATE void ServeProto::Serialise< T >::write(const Store & store, ServeProto::WriteConn conn, const T & t) \
{ \
LengthPrefixedProtoHelper<ServeProto, T >::write(store, conn, t); \
}
SERVE_USE_LENGTH_PREFIX_SERIALISER(template<typename T>, std::vector<T>)
SERVE_USE_LENGTH_PREFIX_SERIALISER(template<typename T>, std::set<T>)
SERVE_USE_LENGTH_PREFIX_SERIALISER(template<typename... Ts>, std::tuple<Ts...>)
#define COMMA_ ,
SERVE_USE_LENGTH_PREFIX_SERIALISER(
template<typename K COMMA_ typename V>,
std::map<K COMMA_ V>)
#undef COMMA_
/**
* Use `CommonProto` where possible.
*/
template<typename T>
struct ServeProto::Serialise
{
static T read(const Store & store, ServeProto::ReadConn conn)
{
return CommonProto::Serialise<T>::read(store,
CommonProto::ReadConn { .from = conn.from });
}
static void write(const Store & store, ServeProto::WriteConn conn, const T & t)
{
CommonProto::Serialise<T>::write(store,
CommonProto::WriteConn { .to = conn.to },
t);
}
};
/* protocol-specific templates */
}

View file

@ -0,0 +1,15 @@
#include "serialise.hh"
#include "util.hh"
#include "path-with-outputs.hh"
#include "store-api.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "archive.hh"
#include <nlohmann/json.hpp>
namespace nix {
/* protocol-specific definitions */
}

View file

@ -1,6 +1,8 @@
#pragma once #pragma once
///@file ///@file
#include "common-protocol.hh"
namespace nix { namespace nix {
#define SERVE_MAGIC_1 0x390c9deb #define SERVE_MAGIC_1 0x390c9deb
@ -10,6 +12,11 @@ namespace nix {
#define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00) #define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00)
#define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff) #define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff)
class Store;
struct Source;
/** /**
* The "serve protocol", used by ssh:// stores. * The "serve protocol", used by ssh:// stores.
* *
@ -22,6 +29,57 @@ struct ServeProto
* Enumeration of all the request types for the protocol. * Enumeration of all the request types for the protocol.
*/ */
enum struct Command : uint64_t; enum struct Command : uint64_t;
/**
* A unidirectional read connection, to be used by the read half of the
* canonical serializers below.
*
* This currently is just a `Source &`, but more fields will be added
* later.
*/
struct ReadConn {
Source & from;
};
/**
* A unidirectional write connection, to be used by the write half of the
* canonical serializers below.
*
* This currently is just a `Sink &`, but more fields will be added
* later.
*/
struct WriteConn {
Sink & to;
};
/**
* Data type for canonical pairs of serialisers for the serve protocol.
*
* See https://en.cppreference.com/w/cpp/language/adl for the broader
* concept of what is going on here.
*/
template<typename T>
struct Serialise;
// This is the definition of `Serialise` we *want* to put here, but
// do not do so.
//
// See `worker-protocol.hh` for a longer explanation.
#if 0
{
static T read(const Store & store, ReadConn conn);
static void write(const Store & store, WriteConn conn, const T & t);
};
#endif
/**
* Wrapper function around `ServeProto::Serialise<T>::write` that allows us to
* infer the type instead of having to write it down explicitly.
*/
template<typename T>
static void write(const Store & store, WriteConn conn, const T & t)
{
ServeProto::Serialise<T>::write(store, conn, t);
}
}; };
enum struct ServeProto::Command : uint64_t enum struct ServeProto::Command : uint64_t
@ -58,4 +116,33 @@ inline std::ostream & operator << (std::ostream & s, ServeProto::Command op)
return s << (uint64_t) op; return s << (uint64_t) op;
} }
/**
* Declare a canonical serialiser pair for the worker protocol.
*
* We specialise the struct merely to indicate that we are implementing
* the function for the given type.
*
* Some sort of `template<...>` must be used with the caller for this to
* be legal specialization syntax. See below for what that looks like in
* practice.
*/
#define DECLARE_SERVE_SERIALISER(T) \
struct ServeProto::Serialise< T > \
{ \
static T read(const Store & store, ServeProto::ReadConn conn); \
static void write(const Store & store, ServeProto::WriteConn conn, const T & t); \
};
template<typename T>
DECLARE_SERVE_SERIALISER(std::vector<T>);
template<typename T>
DECLARE_SERVE_SERIALISER(std::set<T>);
template<typename... Ts>
DECLARE_SERVE_SERIALISER(std::tuple<Ts...>);
#define COMMA_ ,
template<typename K, typename V>
DECLARE_SERVE_SERIALISER(std::map<K COMMA_ V>);
#undef COMMA_
} }

View file

@ -0,0 +1,152 @@
#include <regex>
#include <nlohmann/json.hpp>
#include <gtest/gtest.h>
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "build-result.hh"
#include "tests/protocol.hh"
#include "tests/characterization.hh"
namespace nix {
const char commonProtoDir[] = "serve-protocol";
using ServeProtoTest = ProtoTest<ServeProto, commonProtoDir>;
CHARACTERIZATION_TEST(
ServeProtoTest,
string,
"string",
(std::tuple<std::string, std::string, std::string, std::string, std::string> {
"",
"hi",
"white rabbit",
"大白兔",
"oh no \0\0\0 what was that!",
}))
CHARACTERIZATION_TEST(
ServeProtoTest,
storePath,
"store-path",
(std::tuple<StorePath, StorePath> {
StorePath { "g1w7hy3qg1w7hy3qg1w7hy3qg1w7hy3q-foo" },
StorePath { "g1w7hy3qg1w7hy3qg1w7hy3qg1w7hy3q-foo-bar" },
}))
CHARACTERIZATION_TEST(
ServeProtoTest,
contentAddress,
"content-address",
(std::tuple<ContentAddress, ContentAddress, ContentAddress> {
ContentAddress {
.method = TextIngestionMethod {},
.hash = hashString(HashType::htSHA256, "Derive(...)"),
},
ContentAddress {
.method = FileIngestionMethod::Flat,
.hash = hashString(HashType::htSHA1, "blob blob..."),
},
ContentAddress {
.method = FileIngestionMethod::Recursive,
.hash = hashString(HashType::htSHA256, "(...)"),
},
}))
CHARACTERIZATION_TEST(
ServeProtoTest,
drvOutput,
"drv-output",
(std::tuple<DrvOutput, DrvOutput> {
{
.drvHash = Hash::parseSRI("sha256-FePFYIlMuycIXPZbWi7LGEiMmZSX9FMbaQenWBzm1Sc="),
.outputName = "baz",
},
DrvOutput {
.drvHash = Hash::parseSRI("sha256-b4afnqKCO9oWXgYHb9DeQ2berSwOjS27rSd9TxXDc/U="),
.outputName = "quux",
},
}))
CHARACTERIZATION_TEST(
ServeProtoTest,
realisation,
"realisation",
(std::tuple<Realisation, Realisation> {
Realisation {
.id = DrvOutput {
.drvHash = Hash::parseSRI("sha256-FePFYIlMuycIXPZbWi7LGEiMmZSX9FMbaQenWBzm1Sc="),
.outputName = "baz",
},
.outPath = StorePath { "g1w7hy3qg1w7hy3qg1w7hy3qg1w7hy3q-foo" },
.signatures = { "asdf", "qwer" },
},
Realisation {
.id = {
.drvHash = Hash::parseSRI("sha256-FePFYIlMuycIXPZbWi7LGEiMmZSX9FMbaQenWBzm1Sc="),
.outputName = "baz",
},
.outPath = StorePath { "g1w7hy3qg1w7hy3qg1w7hy3qg1w7hy3q-foo" },
.signatures = { "asdf", "qwer" },
.dependentRealisations = {
{
DrvOutput {
.drvHash = Hash::parseSRI("sha256-b4afnqKCO9oWXgYHb9DeQ2berSwOjS27rSd9TxXDc/U="),
.outputName = "quux",
},
StorePath { "g1w7hy3qg1w7hy3qg1w7hy3qg1w7hy3q-foo" },
},
},
},
}))
CHARACTERIZATION_TEST(
ServeProtoTest,
vector,
"vector",
(std::tuple<std::vector<std::string>, std::vector<std::string>, std::vector<std::string>, std::vector<std::vector<std::string>>> {
{ },
{ "" },
{ "", "foo", "bar" },
{ {}, { "" }, { "", "1", "2" } },
}))
CHARACTERIZATION_TEST(
ServeProtoTest,
set,
"set",
(std::tuple<std::set<std::string>, std::set<std::string>, std::set<std::string>, std::set<std::set<std::string>>> {
{ },
{ "" },
{ "", "foo", "bar" },
{ {}, { "" }, { "", "1", "2" } },
}))
CHARACTERIZATION_TEST(
ServeProtoTest,
optionalStorePath,
"optional-store-path",
(std::tuple<std::optional<StorePath>, std::optional<StorePath>> {
std::nullopt,
std::optional {
StorePath { "g1w7hy3qg1w7hy3qg1w7hy3qg1w7hy3q-foo-bar" },
},
}))
CHARACTERIZATION_TEST(
ServeProtoTest,
optionalContentAddress,
"optional-content-address",
(std::tuple<std::optional<ContentAddress>, std::optional<ContentAddress>> {
std::nullopt,
std::optional {
ContentAddress {
.method = FileIngestionMethod::Flat,
.hash = hashString(HashType::htSHA1, "blob blob..."),
},
},
}))
}

View file

@ -9,10 +9,9 @@
#include "local-store.hh" #include "local-store.hh"
#include "monitor-fd.hh" #include "monitor-fd.hh"
#include "serve-protocol.hh" #include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "shared.hh" #include "shared.hh"
#include "util.hh" #include "util.hh"
#include "common-protocol.hh"
#include "common-protocol-impl.hh"
#include "graphml.hh" #include "graphml.hh"
#include "legacy.hh" #include "legacy.hh"
#include "path-with-outputs.hh" #include "path-with-outputs.hh"
@ -821,8 +820,8 @@ static void opServe(Strings opFlags, Strings opArgs)
out.flush(); out.flush();
unsigned int clientVersion = readInt(in); unsigned int clientVersion = readInt(in);
CommonProto::ReadConn rconn { .from = in }; ServeProto::ReadConn rconn { .from = in };
CommonProto::WriteConn wconn { .to = out }; ServeProto::WriteConn wconn { .to = out };
auto getBuildSettings = [&]() { auto getBuildSettings = [&]() {
// FIXME: changing options here doesn't work if we're // FIXME: changing options here doesn't work if we're
@ -867,7 +866,7 @@ static void opServe(Strings opFlags, Strings opArgs)
case ServeProto::Command::QueryValidPaths: { case ServeProto::Command::QueryValidPaths: {
bool lock = readInt(in); bool lock = readInt(in);
bool substitute = readInt(in); bool substitute = readInt(in);
auto paths = CommonProto::Serialise<StorePathSet>::read(*store, rconn); auto paths = ServeProto::Serialise<StorePathSet>::read(*store, rconn);
if (lock && writeAllowed) if (lock && writeAllowed)
for (auto & path : paths) for (auto & path : paths)
store->addTempRoot(path); store->addTempRoot(path);
@ -876,19 +875,19 @@ static void opServe(Strings opFlags, Strings opArgs)
store->substitutePaths(paths); store->substitutePaths(paths);
} }
CommonProto::write(*store, wconn, store->queryValidPaths(paths)); ServeProto::write(*store, wconn, store->queryValidPaths(paths));
break; break;
} }
case ServeProto::Command::QueryPathInfos: { case ServeProto::Command::QueryPathInfos: {
auto paths = CommonProto::Serialise<StorePathSet>::read(*store, rconn); auto paths = ServeProto::Serialise<StorePathSet>::read(*store, rconn);
// !!! Maybe we want a queryPathInfos? // !!! Maybe we want a queryPathInfos?
for (auto & i : paths) { for (auto & i : paths) {
try { try {
auto info = store->queryPathInfo(i); auto info = store->queryPathInfo(i);
out << store->printStorePath(info->path) out << store->printStorePath(info->path)
<< (info->deriver ? store->printStorePath(*info->deriver) : ""); << (info->deriver ? store->printStorePath(*info->deriver) : "");
CommonProto::write(*store, wconn, info->references); ServeProto::write(*store, wconn, info->references);
// !!! Maybe we want compression? // !!! Maybe we want compression?
out << info->narSize // downloadSize out << info->narSize // downloadSize
<< info->narSize; << info->narSize;
@ -916,7 +915,7 @@ static void opServe(Strings opFlags, Strings opArgs)
case ServeProto::Command::ExportPaths: { case ServeProto::Command::ExportPaths: {
readInt(in); // obsolete readInt(in); // obsolete
store->exportPaths(CommonProto::Serialise<StorePathSet>::read(*store, rconn), out); store->exportPaths(ServeProto::Serialise<StorePathSet>::read(*store, rconn), out);
break; break;
} }
@ -962,7 +961,7 @@ static void opServe(Strings opFlags, Strings opArgs)
DrvOutputs builtOutputs; DrvOutputs builtOutputs;
for (auto & [output, realisation] : status.builtOutputs) for (auto & [output, realisation] : status.builtOutputs)
builtOutputs.insert_or_assign(realisation.id, realisation); builtOutputs.insert_or_assign(realisation.id, realisation);
CommonProto::write(*store, wconn, builtOutputs); ServeProto::write(*store, wconn, builtOutputs);
} }
break; break;
@ -971,9 +970,9 @@ static void opServe(Strings opFlags, Strings opArgs)
case ServeProto::Command::QueryClosure: { case ServeProto::Command::QueryClosure: {
bool includeOutputs = readInt(in); bool includeOutputs = readInt(in);
StorePathSet closure; StorePathSet closure;
store->computeFSClosure(CommonProto::Serialise<StorePathSet>::read(*store, rconn), store->computeFSClosure(ServeProto::Serialise<StorePathSet>::read(*store, rconn),
closure, false, includeOutputs); closure, false, includeOutputs);
CommonProto::write(*store, wconn, closure); ServeProto::write(*store, wconn, closure);
break; break;
} }
@ -988,7 +987,7 @@ static void opServe(Strings opFlags, Strings opArgs)
}; };
if (deriver != "") if (deriver != "")
info.deriver = store->parseStorePath(deriver); info.deriver = store->parseStorePath(deriver);
info.references = CommonProto::Serialise<StorePathSet>::read(*store, rconn); info.references = ServeProto::Serialise<StorePathSet>::read(*store, rconn);
in >> info.registrationTime >> info.narSize >> info.ultimate; in >> info.registrationTime >> info.narSize >> info.ultimate;
info.sigs = readStrings<StringSet>(in); info.sigs = readStrings<StringSet>(in);
info.ca = ContentAddress::parseOpt(readString(in)); info.ca = ContentAddress::parseOpt(readString(in));

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.