Handle SIGINT etc. via a sigwait() signal handler thread

This allows other threads to install callbacks that run in a regular,
non-signal context. In particular, we can use this to signal the
downloader thread to quit.

Closes #1183.
This commit is contained in:
Eelco Dolstra 2017-01-17 18:21:02 +01:00
parent c0d55f9183
commit cc3b93c991
No known key found for this signature in database
GPG key ID: 8170B4726D7198DE
4 changed files with 101 additions and 28 deletions

View file

@ -24,12 +24,6 @@
namespace nix { namespace nix {
static void sigintHandler(int signo)
{
_isInterrupted = 1;
}
static bool gcWarning = true; static bool gcWarning = true;
void printGCWarning() void printGCWarning()
@ -120,19 +114,11 @@ void initNix()
settings.processEnvironment(); settings.processEnvironment();
settings.loadConfFile(); settings.loadConfFile();
/* Catch SIGINT. */ startSignalHandlerThread();
struct sigaction act;
act.sa_handler = sigintHandler;
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
if (sigaction(SIGINT, &act, 0))
throw SysError("installing handler for SIGINT");
if (sigaction(SIGTERM, &act, 0))
throw SysError("installing handler for SIGTERM");
if (sigaction(SIGHUP, &act, 0))
throw SysError("installing handler for SIGHUP");
/* Ignore SIGPIPE. */ /* Ignore SIGPIPE. */
struct sigaction act;
sigemptyset(&act.sa_mask);
act.sa_handler = SIG_IGN; act.sa_handler = SIG_IGN;
act.sa_flags = 0; act.sa_flags = 0;
if (sigaction(SIGPIPE, &act, 0)) if (sigaction(SIGPIPE, &act, 0))

View file

@ -324,20 +324,30 @@ struct CurlDownloader : public Downloader
~CurlDownloader() ~CurlDownloader()
{ {
/* Signal the worker thread to exit. */ stopWorkerThread();
{
auto state(state_.lock());
state->quit = true;
}
writeFull(wakeupPipe.writeSide.get(), " ");
workerThread.join(); workerThread.join();
if (curlm) curl_multi_cleanup(curlm); if (curlm) curl_multi_cleanup(curlm);
} }
void stopWorkerThread()
{
/* Signal the worker thread to exit. */
{
auto state(state_.lock());
state->quit = true;
}
writeFull(wakeupPipe.writeSide.get(), " ", false);
}
void workerThreadMain() void workerThreadMain()
{ {
/* Cause this thread to be notified on SIGINT. */
auto callback = createInterruptCallback([&]() {
stopWorkerThread();
});
std::map<CURL *, std::shared_ptr<DownloadItem>> items; std::map<CURL *, std::shared_ptr<DownloadItem>> items;
bool quit = false; bool quit = false;

View file

@ -2,14 +2,16 @@
#include "util.hh" #include "util.hh"
#include "affinity.hh" #include "affinity.hh"
#include "sync.hh"
#include <iostream> #include <cctype>
#include <cerrno> #include <cerrno>
#include <cstdio> #include <cstdio>
#include <cstdlib> #include <cstdlib>
#include <sstream>
#include <cstring> #include <cstring>
#include <cctype> #include <iostream>
#include <sstream>
#include <thread>
#include <sys/wait.h> #include <sys/wait.h>
#include <unistd.h> #include <unistd.h>
@ -933,7 +935,7 @@ void restoreSIGPIPE()
////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////
volatile sig_atomic_t _isInterrupted = 0; bool _isInterrupted = false;
thread_local bool interruptThrown = false; thread_local bool interruptThrown = false;
@ -1200,4 +1202,64 @@ void callFailure(const std::function<void(std::exception_ptr exc)> & failure, st
} }
static Sync<std::list<std::function<void()>>> _interruptCallbacks;
static void signalHandlerThread(sigset_t set)
{
while (true) {
int signal = 0;
sigwait(&set, &signal);
if (signal == SIGINT || signal == SIGTERM || signal == SIGHUP) {
_isInterrupted = 1;
{
auto interruptCallbacks(_interruptCallbacks.lock());
for (auto & callback : *interruptCallbacks) {
try {
callback();
} catch (...) {
ignoreException();
}
}
}
}
}
}
void startSignalHandlerThread()
{
sigset_t set;
sigemptyset(&set);
sigaddset(&set, SIGINT);
sigaddset(&set, SIGTERM);
sigaddset(&set, SIGHUP);
if (pthread_sigmask(SIG_BLOCK, &set, nullptr))
throw SysError("blocking signals");
std::thread(signalHandlerThread, set).detach();
}
/* RAII helper to automatically deregister a callback. */
struct InterruptCallbackImpl : InterruptCallback
{
std::list<std::function<void()>>::iterator it;
~InterruptCallbackImpl() override
{
_interruptCallbacks.lock()->erase(it);
}
};
std::unique_ptr<InterruptCallback> createInterruptCallback(std::function<void()> callback)
{
auto interruptCallbacks(_interruptCallbacks.lock());
interruptCallbacks->push_back(callback);
auto res = std::make_unique<InterruptCallbackImpl>();
res->it = interruptCallbacks->end();
res->it--;
return res;
}
} }

View file

@ -263,7 +263,7 @@ void restoreSIGPIPE();
/* User interruption. */ /* User interruption. */
extern volatile sig_atomic_t _isInterrupted; extern bool _isInterrupted;
extern thread_local bool interruptThrown; extern thread_local bool interruptThrown;
@ -416,4 +416,19 @@ void callSuccess(
} }
/* Start a thread that handles various signals. Also block those signals
on the current thread (and thus any threads created by it). */
void startSignalHandlerThread();
struct InterruptCallback
{
virtual ~InterruptCallback() { };
};
/* Register a function that gets called on SIGINT (in a non-signal
context). */
std::unique_ptr<InterruptCallback> createInterruptCallback(
std::function<void()> callback);
} }