#pragma once ///@file #include #include "types.hh" #include "util.hh" #include "file-descriptor.hh" namespace boost::context { struct stack_context; } namespace nix { /** * Abstract destination of binary data. */ struct Sink { virtual ~Sink() { } virtual void operator () (std::string_view data) = 0; virtual bool good() { return true; } }; /** * Just throws away data. */ struct NullSink : Sink { void operator () (std::string_view data) override { } }; struct FinishSink : virtual Sink { virtual void finish() = 0; }; /** * A buffered abstract sink. Warning: a BufferedSink should not be * used from multiple threads concurrently. */ struct BufferedSink : virtual Sink { size_t bufSize, bufPos; std::unique_ptr buffer; BufferedSink(size_t bufSize = 32 * 1024) : bufSize(bufSize), bufPos(0), buffer(nullptr) { } void operator () (std::string_view data) override; void flush(); protected: virtual void writeUnbuffered(std::string_view data) = 0; }; /** * Abstract source of binary data. */ struct Source { virtual ~Source() { } /** * Store exactly ‘len’ bytes in the buffer pointed to by ‘data’. * It blocks until all the requested data is available, or throws * an error if it is not going to be available. */ void operator () (char * data, size_t len); void operator () (std::string_view data); /** * Store up to ‘len’ in the buffer pointed to by ‘data’, and * return the number of bytes stored. It blocks until at least * one byte is available. */ virtual size_t read(char * data, size_t len) = 0; virtual bool good() { return true; } void drainInto(Sink & sink); std::string drain(); }; /** * A buffered abstract source. Warning: a BufferedSource should not be * used from multiple threads concurrently. */ struct BufferedSource : Source { size_t bufSize, bufPosIn, bufPosOut; std::unique_ptr buffer; BufferedSource(size_t bufSize = 32 * 1024) : bufSize(bufSize), bufPosIn(0), bufPosOut(0), buffer(nullptr) { } size_t read(char * data, size_t len) override; bool hasData(); protected: /** * Underlying read call, to be overridden. */ virtual size_t readUnbuffered(char * data, size_t len) = 0; }; /** * A sink that writes data to a file descriptor. */ struct FdSink : BufferedSink { int fd; size_t written = 0; FdSink() : fd(-1) { } FdSink(int fd) : fd(fd) { } FdSink(FdSink&&) = default; FdSink & operator=(FdSink && s) { flush(); fd = s.fd; s.fd = -1; written = s.written; return *this; } ~FdSink(); void writeUnbuffered(std::string_view data) override; bool good() override; private: bool _good = true; }; /** * A source that reads data from a file descriptor. */ struct FdSource : BufferedSource { int fd; size_t read = 0; FdSource() : fd(-1) { } FdSource(int fd) : fd(fd) { } FdSource(FdSource&&) = default; FdSource& operator=(FdSource && s) { fd = s.fd; s.fd = -1; read = s.read; return *this; } bool good() override; protected: size_t readUnbuffered(char * data, size_t len) override; private: bool _good = true; }; /** * A sink that writes data to a string. */ struct StringSink : Sink { std::string s; StringSink() { } explicit StringSink(const size_t reservedSize) { s.reserve(reservedSize); }; StringSink(std::string && s) : s(std::move(s)) { }; void operator () (std::string_view data) override; }; /** * A source that reads data from a string. */ struct StringSource : Source { std::string_view s; size_t pos; StringSource(std::string_view s) : s(s), pos(0) { } size_t read(char * data, size_t len) override; }; /** * A sink that writes all incoming data to two other sinks. */ struct TeeSink : Sink { Sink & sink1, & sink2; TeeSink(Sink & sink1, Sink & sink2) : sink1(sink1), sink2(sink2) { } virtual void operator () (std::string_view data) { sink1(data); sink2(data); } }; /** * Adapter class of a Source that saves all data read to a sink. */ struct TeeSource : Source { Source & orig; Sink & sink; TeeSource(Source & orig, Sink & sink) : orig(orig), sink(sink) { } size_t read(char * data, size_t len) { size_t n = orig.read(data, len); sink({data, n}); return n; } }; /** * A reader that consumes the original Source until 'size'. */ struct SizedSource : Source { Source & orig; size_t remain; SizedSource(Source & orig, size_t size) : orig(orig), remain(size) { } size_t read(char * data, size_t len) { if (this->remain <= 0) { throw EndOfFile("sized: unexpected end-of-file"); } len = std::min(len, this->remain); size_t n = this->orig.read(data, len); this->remain -= n; return n; } /** * Consume the original source until no remain data is left to consume. */ size_t drainAll() { std::vector buf(8192); size_t sum = 0; while (this->remain > 0) { size_t n = read(buf.data(), buf.size()); sum += n; } return sum; } }; /** * A sink that that just counts the number of bytes given to it */ struct LengthSink : Sink { uint64_t length = 0; void operator () (std::string_view data) override { length += data.size(); } }; /** * Convert a function into a sink. */ struct LambdaSink : Sink { typedef std::function lambda_t; lambda_t lambda; LambdaSink(const lambda_t & lambda) : lambda(lambda) { } void operator () (std::string_view data) override { lambda(data); } }; /** * Convert a function into a source. */ struct LambdaSource : Source { typedef std::function lambda_t; lambda_t lambda; LambdaSource(const lambda_t & lambda) : lambda(lambda) { } size_t read(char * data, size_t len) override { return lambda(data, len); } }; /** * Chain two sources together so after the first is exhausted, the second is * used */ struct ChainSource : Source { Source & source1, & source2; bool useSecond = false; ChainSource(Source & s1, Source & s2) : source1(s1), source2(s2) { } size_t read(char * data, size_t len) override; }; std::unique_ptr sourceToSink(std::function fun); /** * Convert a function that feeds data into a Sink into a Source. The * Source executes the function as a coroutine. */ std::unique_ptr sinkToSource( std::function fun, std::function eof = []() { throw EndOfFile("coroutine has finished"); }); void writePadding(size_t len, Sink & sink); void writeString(std::string_view s, Sink & sink); inline Sink & operator << (Sink & sink, uint64_t n) { unsigned char buf[8]; buf[0] = n & 0xff; buf[1] = (n >> 8) & 0xff; buf[2] = (n >> 16) & 0xff; buf[3] = (n >> 24) & 0xff; buf[4] = (n >> 32) & 0xff; buf[5] = (n >> 40) & 0xff; buf[6] = (n >> 48) & 0xff; buf[7] = (unsigned char) (n >> 56) & 0xff; sink({(char *) buf, sizeof(buf)}); return sink; } Sink & operator << (Sink & in, const Error & ex); Sink & operator << (Sink & sink, std::string_view s); Sink & operator << (Sink & sink, const Strings & s); Sink & operator << (Sink & sink, const StringSet & s); MakeError(SerialisationError, Error); template T readNum(Source & source) { unsigned char buf[8]; source((char *) buf, sizeof(buf)); auto n = readLittleEndian(buf); if (n > (uint64_t) std::numeric_limits::max()) throw SerialisationError("serialised integer %d is too large for type '%s'", n, typeid(T).name()); return (T) n; } inline unsigned int readInt(Source & source) { return readNum(source); } inline uint64_t readLongLong(Source & source) { return readNum(source); } void readPadding(size_t len, Source & source); size_t readString(char * buf, size_t max, Source & source); std::string readString(Source & source, size_t max = std::numeric_limits::max()); template T readStrings(Source & source); Source & operator >> (Source & in, std::string & s); template Source & operator >> (Source & in, T & n) { n = readNum(in); return in; } template Source & operator >> (Source & in, bool & b) { b = readNum(in); return in; } Error readError(Source & source); /** * An adapter that converts a std::basic_istream into a source. */ struct StreamToSourceAdapter : Source { std::shared_ptr> istream; StreamToSourceAdapter(std::shared_ptr> istream) : istream(istream) { } size_t read(char * data, size_t len) override { if (!istream->read(data, len)) { if (istream->eof()) { if (istream->gcount() == 0) throw EndOfFile("end of file"); } else throw Error("I/O error in StreamToSourceAdapter"); } return istream->gcount(); } }; /** * A source that reads a distinct format of concatenated chunks back into its * logical form, in order to guarantee a known state to the original stream, * even in the event of errors. * * Use with FramedSink, which also allows the logical stream to be terminated * in the event of an exception. */ struct FramedSource : Source { Source & from; bool eof = false; std::vector pending; size_t pos = 0; FramedSource(Source & from) : from(from) { } ~FramedSource() { if (!eof) { while (true) { auto n = readInt(from); if (!n) break; std::vector data(n); from(data.data(), n); } } } size_t read(char * data, size_t len) override { if (eof) throw EndOfFile("reached end of FramedSource"); if (pos >= pending.size()) { size_t len = readInt(from); if (!len) { eof = true; return 0; } pending = std::vector(len); pos = 0; from(pending.data(), len); } auto n = std::min(len, pending.size() - pos); memcpy(data, pending.data() + pos, n); pos += n; return n; } }; /** * Write as chunks in the format expected by FramedSource. * * The exception_ptr reference can be used to terminate the stream when you * detect that an error has occurred on the remote end. */ struct FramedSink : nix::BufferedSink { BufferedSink & to; std::exception_ptr & ex; FramedSink(BufferedSink & to, std::exception_ptr & ex) : to(to), ex(ex) { } ~FramedSink() { try { to << 0; to.flush(); } catch (...) { ignoreException(); } } void writeUnbuffered(std::string_view data) override { /* Don't send more data if the remote has encountered an error. */ if (ex) { auto ex2 = ex; ex = nullptr; std::rethrow_exception(ex2); } to << data.size(); to(data); }; }; /** * Stack allocation strategy for sinkToSource. * Mutable to avoid a boehm gc dependency in libutil. * * boost::context doesn't provide a virtual class, so we define our own. */ struct StackAllocator { virtual boost::context::stack_context allocate() = 0; virtual void deallocate(boost::context::stack_context sctx) = 0; /** * The stack allocator to use in sinkToSource and potentially elsewhere. * It is reassigned by the initGC() method in libexpr. */ static StackAllocator *defaultAllocator; }; /* Disabling GC when entering a coroutine (without the boehm patch). mutable to avoid boehm gc dependency in libutil. */ extern std::shared_ptr (*create_coro_gc_hook)(); }