Merge pull request #11343 from DeterminateSystems/no-framedsink-threads

withFramedSink(): Don't use a thread to monitor the other side
This commit is contained in:
Eelco Dolstra 2024-08-22 14:23:19 +02:00 committed by GitHub
commit 915db74dbf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 69 additions and 52 deletions

View file

@ -402,6 +402,9 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
logger->startWork(); logger->startWork();
auto pathInfo = [&]() { auto pathInfo = [&]() {
// NB: FramedSource must be out of scope before logger->stopWork(); // NB: FramedSource must be out of scope before logger->stopWork();
// FIXME: this means that if there is an error
// half-way through, the client will keep sending
// data, since we haven't sent it the error yet.
auto [contentAddressMethod, hashAlgo] = ContentAddressMethod::parseWithAlgo(camStr); auto [contentAddressMethod, hashAlgo] = ContentAddressMethod::parseWithAlgo(camStr);
FramedSource source(conn.from); FramedSource source(conn.from);
FileSerialisationMethod dumpMethod; FileSerialisationMethod dumpMethod;

View file

@ -49,7 +49,7 @@ struct RemoteStore::ConnectionHandle
RemoteStore::Connection & operator * () { return *handle; } RemoteStore::Connection & operator * () { return *handle; }
RemoteStore::Connection * operator -> () { return &*handle; } RemoteStore::Connection * operator -> () { return &*handle; }
void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true); void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true, bool block = true);
void withFramedSink(std::function<void(Sink & sink)> fun); void withFramedSink(std::function<void(Sink & sink)> fun);
}; };

View file

@ -153,9 +153,9 @@ RemoteStore::ConnectionHandle::~ConnectionHandle()
} }
} }
void RemoteStore::ConnectionHandle::processStderr(Sink * sink, Source * source, bool flush) void RemoteStore::ConnectionHandle::processStderr(Sink * sink, Source * source, bool flush, bool block)
{ {
handle->processStderr(&daemonException, sink, source, flush); handle->processStderr(&daemonException, sink, source, flush, block);
} }
@ -926,43 +926,17 @@ void RemoteStore::ConnectionHandle::withFramedSink(std::function<void(Sink & sin
{ {
(*this)->to.flush(); (*this)->to.flush();
std::exception_ptr ex;
/* Handle log messages / exceptions from the remote on a separate
thread. */
std::thread stderrThread([&]()
{ {
try { FramedSink sink((*this)->to, [&]() {
ReceiveInterrupts receiveInterrupts; /* Periodically process stderr messages and exceptions
processStderr(nullptr, nullptr, false); from the daemon. */
} catch (...) { processStderr(nullptr, nullptr, false, false);
ex = std::current_exception(); });
}
});
Finally joinStderrThread([&]()
{
if (stderrThread.joinable()) {
stderrThread.join();
if (ex) {
try {
std::rethrow_exception(ex);
} catch (...) {
ignoreException();
}
}
}
});
{
FramedSink sink((*this)->to, ex);
fun(sink); fun(sink);
sink.flush(); sink.flush();
} }
stderrThread.join(); processStderr(nullptr, nullptr, false);
if (ex)
std::rethrow_exception(ex);
} }
} }

View file

@ -32,7 +32,8 @@ static Logger::Fields readFields(Source & from)
return fields; return fields;
} }
std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink * sink, Source * source, bool flush) std::exception_ptr
WorkerProto::BasicClientConnection::processStderrReturn(Sink * sink, Source * source, bool flush, bool block)
{ {
if (flush) if (flush)
to.flush(); to.flush();
@ -41,6 +42,9 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink
while (true) { while (true) {
if (!block && !from.hasData())
break;
auto msg = readNum<uint64_t>(from); auto msg = readNum<uint64_t>(from);
if (msg == STDERR_WRITE) { if (msg == STDERR_WRITE) {
@ -95,8 +99,10 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink
logger->result(act, type, fields); logger->result(act, type, fields);
} }
else if (msg == STDERR_LAST) else if (msg == STDERR_LAST) {
assert(block);
break; break;
}
else else
throw Error("got unknown message type %x from Nix daemon", msg); throw Error("got unknown message type %x from Nix daemon", msg);
@ -130,9 +136,10 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink
} }
} }
void WorkerProto::BasicClientConnection::processStderr(bool * daemonException, Sink * sink, Source * source, bool flush) void WorkerProto::BasicClientConnection::processStderr(
bool * daemonException, Sink * sink, Source * source, bool flush, bool block)
{ {
auto ex = processStderrReturn(sink, source, flush); auto ex = processStderrReturn(sink, source, flush, block);
if (ex) { if (ex) {
*daemonException = true; *daemonException = true;
std::rethrow_exception(ex); std::rethrow_exception(ex);

View file

@ -70,9 +70,10 @@ struct WorkerProto::BasicClientConnection : WorkerProto::BasicConnection
virtual void closeWrite() = 0; virtual void closeWrite() = 0;
std::exception_ptr processStderrReturn(Sink * sink = 0, Source * source = 0, bool flush = true); std::exception_ptr processStderrReturn(Sink * sink = 0, Source * source = 0, bool flush = true, bool block = true);
void processStderr(bool * daemonException, Sink * sink = 0, Source * source = 0, bool flush = true); void
processStderr(bool * daemonException, Sink * sink = 0, Source * source = 0, bool flush = true, bool block = true);
/** /**
* Establishes connection, negotiating version. * Establishes connection, negotiating version.

View file

@ -10,6 +10,8 @@
#ifdef _WIN32 #ifdef _WIN32
# include <fileapi.h> # include <fileapi.h>
# include "windows-error.hh" # include "windows-error.hh"
#else
# include <poll.h>
#endif #endif
@ -158,6 +160,29 @@ bool FdSource::good()
} }
bool FdSource::hasData()
{
if (BufferedSource::hasData()) return true;
while (true) {
fd_set fds;
FD_ZERO(&fds);
FD_SET(fd, &fds);
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 0;
auto n = select(fd + 1, &fds, nullptr, nullptr, &timeout);
if (n < 0) {
if (errno == EINTR) continue;
throw SysError("polling file descriptor");
}
return FD_ISSET(fd, &fds);
}
}
size_t StringSource::read(char * data, size_t len) size_t StringSource::read(char * data, size_t len)
{ {
if (pos == s.size()) throw EndOfFile("end of string reached"); if (pos == s.size()) throw EndOfFile("end of string reached");

View file

@ -104,6 +104,9 @@ struct BufferedSource : Source
size_t read(char * data, size_t len) override; size_t read(char * data, size_t len) override;
/**
* Return true if the buffer is not empty.
*/
bool hasData(); bool hasData();
protected: protected:
@ -162,6 +165,13 @@ struct FdSource : BufferedSource
FdSource & operator=(FdSource && s) = default; FdSource & operator=(FdSource && s) = default;
bool good() override; bool good() override;
/**
* Return true if the buffer is not empty after a non-blocking
* read.
*/
bool hasData();
protected: protected:
size_t readUnbuffered(char * data, size_t len) override; size_t readUnbuffered(char * data, size_t len) override;
private: private:
@ -522,15 +532,16 @@ struct FramedSource : Source
/** /**
* Write as chunks in the format expected by FramedSource. * Write as chunks in the format expected by FramedSource.
* *
* The exception_ptr reference can be used to terminate the stream when you * The `checkError` function can be used to terminate the stream when you
* detect that an error has occurred on the remote end. * detect that an error has occurred. It does so by throwing an exception.
*/ */
struct FramedSink : nix::BufferedSink struct FramedSink : nix::BufferedSink
{ {
BufferedSink & to; BufferedSink & to;
std::exception_ptr & ex; std::function<void()> checkError;
FramedSink(BufferedSink & to, std::exception_ptr & ex) : to(to), ex(ex) FramedSink(BufferedSink & to, std::function<void()> && checkError)
: to(to), checkError(checkError)
{ } { }
~FramedSink() ~FramedSink()
@ -545,13 +556,9 @@ struct FramedSink : nix::BufferedSink
void writeUnbuffered(std::string_view data) override void writeUnbuffered(std::string_view data) override
{ {
/* Don't send more data if the remote has /* Don't send more data if an error has occured. */
encountered an error. */ checkError();
if (ex) {
auto ex2 = ex;
ex = nullptr;
std::rethrow_exception(ex2);
}
to << data.size(); to << data.size();
to(data); to(data);
}; };