nix-super/src/libutil/serialise.hh

565 lines
12 KiB
C++
Raw Normal View History

#pragma once
///@file
2016-07-13 13:03:37 +03:00
#include <memory>
#include "types.hh"
2015-07-20 02:16:16 +03:00
#include "util.hh"
namespace boost::context { struct stack_context; }
namespace nix {
/**
* Abstract destination of binary data.
*/
2015-07-20 02:16:16 +03:00
struct Sink
{
virtual ~Sink() { }
2020-12-02 15:00:43 +02:00
virtual void operator () (std::string_view data) = 0;
virtual bool good() { return true; }
};
/**
* Just throws away data.
*/
2020-08-07 22:09:26 +03:00
struct NullSink : Sink
{
2020-12-02 15:00:43 +02:00
void operator () (std::string_view data) override
2020-08-07 22:09:26 +03:00
{ }
};
2019-12-10 10:47:38 +02:00
struct FinishSink : virtual Sink
{
virtual void finish() = 0;
};
/**
* A buffered abstract sink. Warning: a BufferedSink should not be
* used from multiple threads concurrently.
*/
Allow content-addressable paths to have references This adds a command 'nix make-content-addressable' that rewrites the specified store paths into content-addressable paths. The advantage of such paths is that 1) they can be imported without signatures; 2) they can enable deduplication in cases where derivation changes do not cause output changes (apart from store path hashes). For example, $ nix make-content-addressable -r nixpkgs.cowsay rewrote '/nix/store/g1g31ah55xdia1jdqabv1imf6mcw0nb1-glibc-2.25-49' to '/nix/store/48jfj7bg78a8n4f2nhg269rgw1936vj4-glibc-2.25-49' ... rewrote '/nix/store/qbi6rzpk0bxjw8lw6azn2mc7ynnn455q-cowsay-3.03+dfsg1-16' to '/nix/store/iq6g2x4q62xp7y7493bibx0qn5w7xz67-cowsay-3.03+dfsg1-16' We can then copy the resulting closure to another store without signatures: $ nix copy --trusted-public-keys '' ---to ~/my-nix /nix/store/iq6g2x4q62xp7y7493bibx0qn5w7xz67-cowsay-3.03+dfsg1-16 In order to support self-references in content-addressable paths, these paths are hashed "modulo" self-references, meaning that self-references are zeroed out during hashing. Somewhat annoyingly, this means that the NAR hash stored in the Nix database is no longer necessarily equal to the output of "nix hash-path"; for content-addressable paths, you need to pass the --modulo flag: $ nix path-info --json /nix/store/iq6g2x4q62xp7y7493bibx0qn5w7xz67-cowsay-3.03+dfsg1-16 | jq -r .[].narHash sha256:0ri611gdilz2c9rsibqhsipbfs9vwcqvs811a52i2bnkhv7w9mgw $ nix hash-path --type sha256 --base32 /nix/store/iq6g2x4q62xp7y7493bibx0qn5w7xz67-cowsay-3.03+dfsg1-16 1ggznh07khq0hz6id09pqws3a8q9pn03ya3c03nwck1kwq8rclzs $ nix hash-path --type sha256 --base32 /nix/store/iq6g2x4q62xp7y7493bibx0qn5w7xz67-cowsay-3.03+dfsg1-16 --modulo iq6g2x4q62xp7y7493bibx0qn5w7xz67 0ri611gdilz2c9rsibqhsipbfs9vwcqvs811a52i2bnkhv7w9mgw
2018-03-30 01:56:13 +03:00
struct BufferedSink : virtual Sink
{
size_t bufSize, bufPos;
2020-12-02 15:00:43 +02:00
std::unique_ptr<char[]> buffer;
BufferedSink(size_t bufSize = 32 * 1024)
2016-07-13 13:03:37 +03:00
: bufSize(bufSize), bufPos(0), buffer(nullptr) { }
2020-12-02 15:00:43 +02:00
void operator () (std::string_view data) override;
2016-05-04 16:46:25 +03:00
void flush();
2015-07-20 02:16:16 +03:00
2020-12-02 15:00:43 +02:00
virtual void write(std::string_view data) = 0;
};
/**
* Abstract source of binary data.
*/
struct Source
{
virtual ~Source() { }
2015-07-20 02:16:16 +03:00
/**
* Store exactly len bytes in the buffer pointed to by data.
* It blocks until all the requested data is available, or throws
* an error if it is not going to be available.
*/
void operator () (char * data, size_t len);
/**
* Store up to len in the buffer pointed to by data, and
* return the number of bytes stored. It blocks until at least
* one byte is available.
*/
virtual size_t read(char * data, size_t len) = 0;
virtual bool good() { return true; }
2020-09-13 15:39:11 +03:00
void drainInto(Sink & sink);
std::string drain();
};
/**
* A buffered abstract source. Warning: a BufferedSource should not be
* used from multiple threads concurrently.
*/
struct BufferedSource : Source
{
size_t bufSize, bufPosIn, bufPosOut;
std::unique_ptr<char[]> buffer;
BufferedSource(size_t bufSize = 32 * 1024)
2016-07-13 13:03:37 +03:00
: bufSize(bufSize), bufPosIn(0), bufPosOut(0), buffer(nullptr) { }
2015-07-20 02:16:16 +03:00
size_t read(char * data, size_t len) override;
2015-07-20 02:16:16 +03:00
bool hasData();
protected:
/**
* Underlying read call, to be overridden.
*/
virtual size_t readUnbuffered(char * data, size_t len) = 0;
};
/**
* A sink that writes data to a file descriptor.
*/
struct FdSink : BufferedSink
{
int fd;
2016-02-26 17:16:08 +02:00
size_t written = 0;
2016-02-26 17:16:08 +02:00
FdSink() : fd(-1) { }
FdSink(int fd) : fd(fd) { }
FdSink(FdSink&&) = default;
FdSink & operator=(FdSink && s)
{
flush();
fd = s.fd;
s.fd = -1;
written = s.written;
return *this;
}
2011-12-16 17:45:42 +02:00
~FdSink();
2015-07-20 02:16:16 +03:00
2020-12-02 15:00:43 +02:00
void write(std::string_view data) override;
bool good() override;
private:
bool _good = true;
};
/**
* A source that reads data from a file descriptor.
*/
struct FdSource : BufferedSource
{
int fd;
2016-02-26 17:16:08 +02:00
size_t read = 0;
FdSource() : fd(-1) { }
FdSource(int fd) : fd(fd) { }
FdSource(FdSource&&) = default;
FdSource& operator=(FdSource && s)
{
fd = s.fd;
s.fd = -1;
read = s.read;
return *this;
}
bool good() override;
protected:
size_t readUnbuffered(char * data, size_t len) override;
private:
bool _good = true;
};
/**
* A sink that writes data to a string.
*/
struct StringSink : Sink
{
std::string s;
StringSink() { }
explicit StringSink(const size_t reservedSize)
{
s.reserve(reservedSize);
};
StringSink(std::string && s) : s(std::move(s)) { };
2020-12-02 15:00:43 +02:00
void operator () (std::string_view data) override;
};
/**
* A source that reads data from a string.
*/
struct StringSource : Source
{
2022-01-17 20:38:17 +02:00
std::string_view s;
size_t pos;
2022-01-17 20:38:17 +02:00
StringSource(std::string_view s) : s(s), pos(0) { }
size_t read(char * data, size_t len) override;
};
/**
* A sink that writes all incoming data to two other sinks.
*/
struct TeeSink : Sink
{
Sink & sink1, & sink2;
TeeSink(Sink & sink1, Sink & sink2) : sink1(sink1), sink2(sink2) { }
2020-12-02 15:00:43 +02:00
virtual void operator () (std::string_view data)
{
2020-12-02 15:00:43 +02:00
sink1(data);
sink2(data);
}
};
/**
* Adapter class of a Source that saves all data read to a sink.
*/
struct TeeSource : Source
{
Source & orig;
2020-07-13 18:30:42 +03:00
Sink & sink;
TeeSource(Source & orig, Sink & sink)
: orig(orig), sink(sink) { }
size_t read(char * data, size_t len)
{
size_t n = orig.read(data, len);
sink({data, n});
return n;
}
};
/**
* A reader that consumes the original Source until 'size'.
*/
struct SizedSource : Source
{
Source & orig;
size_t remain;
SizedSource(Source & orig, size_t size)
: orig(orig), remain(size) { }
size_t read(char * data, size_t len)
{
if (this->remain <= 0) {
throw EndOfFile("sized: unexpected end-of-file");
}
len = std::min(len, this->remain);
size_t n = this->orig.read(data, len);
this->remain -= n;
return n;
}
/**
* Consume the original source until no remain data is left to consume.
*/
size_t drainAll()
{
std::vector<char> buf(8192);
size_t sum = 0;
while (this->remain > 0) {
size_t n = read(buf.data(), buf.size());
sum += n;
}
return sum;
}
};
/**
* A sink that that just counts the number of bytes given to it
*/
struct LengthSink : Sink
{
uint64_t length = 0;
2020-12-02 15:00:43 +02:00
void operator () (std::string_view data) override
{
2020-12-02 15:00:43 +02:00
length += data.size();
}
};
/**
* Convert a function into a sink.
*/
struct LambdaSink : Sink
{
2020-12-02 15:00:43 +02:00
typedef std::function<void(std::string_view data)> lambda_t;
lambda_t lambda;
LambdaSink(const lambda_t & lambda) : lambda(lambda) { }
2020-12-02 15:00:43 +02:00
void operator () (std::string_view data) override
{
2020-12-02 15:00:43 +02:00
lambda(data);
}
};
/**
* Convert a function into a source.
*/
struct LambdaSource : Source
{
typedef std::function<size_t(char *, size_t)> lambda_t;
lambda_t lambda;
LambdaSource(const lambda_t & lambda) : lambda(lambda) { }
size_t read(char * data, size_t len) override
{
return lambda(data, len);
}
};
/**
* Chain two sources together so after the first is exhausted, the second is
* used
*/
struct ChainSource : Source
{
Source & source1, & source2;
bool useSecond = false;
ChainSource(Source & s1, Source & s2)
: source1(s1), source2(s2)
{ }
size_t read(char * data, size_t len) override;
};
2019-12-10 10:47:38 +02:00
std::unique_ptr<FinishSink> sourceToSink(std::function<void(Source &)> fun);
/**
* Convert a function that feeds data into a Sink into a Source. The
* Source executes the function as a coroutine.
*/
std::unique_ptr<Source> sinkToSource(
std::function<void(Sink &)> fun,
std::function<void()> eof = []() {
throw EndOfFile("coroutine has finished");
});
void writePadding(size_t len, Sink & sink);
2020-12-02 15:00:43 +02:00
void writeString(std::string_view s, Sink & sink);
2015-07-20 02:16:16 +03:00
inline Sink & operator << (Sink & sink, uint64_t n)
{
unsigned char buf[8];
buf[0] = n & 0xff;
buf[1] = (n >> 8) & 0xff;
buf[2] = (n >> 16) & 0xff;
buf[3] = (n >> 24) & 0xff;
buf[4] = (n >> 32) & 0xff;
buf[5] = (n >> 40) & 0xff;
buf[6] = (n >> 48) & 0xff;
2018-05-02 14:56:34 +03:00
buf[7] = (unsigned char) (n >> 56) & 0xff;
2020-12-02 15:00:43 +02:00
sink({(char *) buf, sizeof(buf)});
2015-07-20 02:16:16 +03:00
return sink;
}
2022-01-17 20:28:42 +02:00
Sink & operator << (Sink & in, const Error & ex);
Sink & operator << (Sink & sink, std::string_view s);
2015-07-20 02:16:16 +03:00
Sink & operator << (Sink & sink, const Strings & s);
Sink & operator << (Sink & sink, const StringSet & s);
MakeError(SerialisationError, Error);
template<typename T>
T readNum(Source & source)
{
unsigned char buf[8];
source((char *) buf, sizeof(buf));
auto n = readLittleEndian<uint64_t>(buf);
if (n > (uint64_t) std::numeric_limits<T>::max())
throw SerialisationError("serialised integer %d is too large for type '%s'", n, typeid(T).name());
2018-05-02 14:56:34 +03:00
return (T) n;
}
inline unsigned int readInt(Source & source)
{
return readNum<unsigned int>(source);
}
inline uint64_t readLongLong(Source & source)
{
return readNum<uint64_t>(source);
}
void readPadding(size_t len, Source & source);
size_t readString(char * buf, size_t max, Source & source);
std::string readString(Source & source, size_t max = std::numeric_limits<size_t>::max());
template<class T> T readStrings(Source & source);
Source & operator >> (Source & in, std::string & s);
template<typename T>
Source & operator >> (Source & in, T & n)
{
n = readNum<T>(in);
return in;
}
template<typename T>
Source & operator >> (Source & in, bool & b)
{
b = readNum<uint64_t>(in);
return in;
}
Error readError(Source & source);
/**
* An adapter that converts a std::basic_istream into a source.
*/
struct StreamToSourceAdapter : Source
{
std::shared_ptr<std::basic_istream<char>> istream;
StreamToSourceAdapter(std::shared_ptr<std::basic_istream<char>> istream)
: istream(istream)
{ }
size_t read(char * data, size_t len) override
{
if (!istream->read(data, len)) {
if (istream->eof()) {
if (istream->gcount() == 0)
throw EndOfFile("end of file");
} else
throw Error("I/O error in StreamToSourceAdapter");
}
return istream->gcount();
}
};
/**
* A source that reads a distinct format of concatenated chunks back into its
* logical form, in order to guarantee a known state to the original stream,
* even in the event of errors.
*
* Use with FramedSink, which also allows the logical stream to be terminated
* in the event of an exception.
*/
struct FramedSource : Source
{
Source & from;
bool eof = false;
std::vector<char> pending;
size_t pos = 0;
FramedSource(Source & from) : from(from)
{ }
~FramedSource()
{
if (!eof) {
while (true) {
auto n = readInt(from);
if (!n) break;
std::vector<char> data(n);
from(data.data(), n);
}
}
}
size_t read(char * data, size_t len) override
{
if (eof) throw EndOfFile("reached end of FramedSource");
if (pos >= pending.size()) {
size_t len = readInt(from);
if (!len) {
eof = true;
return 0;
}
pending = std::vector<char>(len);
pos = 0;
from(pending.data(), len);
}
auto n = std::min(len, pending.size() - pos);
memcpy(data, pending.data() + pos, n);
pos += n;
return n;
}
};
/**
* Write as chunks in the format expected by FramedSource.
*
* The exception_ptr reference can be used to terminate the stream when you
* detect that an error has occurred on the remote end.
*/
2020-09-17 23:01:35 +03:00
struct FramedSink : nix::BufferedSink
{
BufferedSink & to;
std::exception_ptr & ex;
FramedSink(BufferedSink & to, std::exception_ptr & ex) : to(to), ex(ex)
{ }
~FramedSink()
{
try {
to << 0;
to.flush();
} catch (...) {
ignoreException();
}
}
2020-12-02 15:00:43 +02:00
void write(std::string_view data) override
2020-09-17 23:01:35 +03:00
{
/* Don't send more data if the remote has
encountered an error. */
if (ex) {
auto ex2 = ex;
ex = nullptr;
std::rethrow_exception(ex2);
}
2020-12-02 15:00:43 +02:00
to << data.size();
to(data);
2020-09-17 23:01:35 +03:00
};
};
/**
* Stack allocation strategy for sinkToSource.
* Mutable to avoid a boehm gc dependency in libutil.
*
* boost::context doesn't provide a virtual class, so we define our own.
*/
struct StackAllocator {
virtual boost::context::stack_context allocate() = 0;
virtual void deallocate(boost::context::stack_context sctx) = 0;
/**
* The stack allocator to use in sinkToSource and potentially elsewhere.
* It is reassigned by the initGC() method in libexpr.
*/
static StackAllocator *defaultAllocator;
};
2023-02-03 18:50:01 +02:00
/* Disabling GC when entering a coroutine (on macos).
::create is to avoid boehm gc dependency in libutil.
*/
class DisableGC {
public:
DisableGC() {};
virtual ~DisableGC() {};
static std::shared_ptr<DisableGC> (*create)();
};
}