WIP: PostgreSQL Cluster Auto-Upgrade #92

Draft
max wants to merge 5 commits from pg-auto-migrations into master
4 changed files with 475 additions and 4 deletions

View file

@ -183,6 +183,13 @@ in
};
description = mdDoc "Environment variables made available to Patroni as files content, useful for providing secrets from files.";
};
migrations = {
enable = mkEnableOption "automatic migrations";
};
};
config = mkIf cfg.enable {
@ -192,6 +199,10 @@ in
inherit (cfg) name;
inherit (cfg) namespace;
bootstrap = mkIf cfg.migrations.enable {
dcs.postgresql.parameters.wal_level = "logical";
};
restapi = {
listen = "${cfg.nodeIp}:${toString cfg.restApiPort}";
connect_address = "${cfg.nodeIp}:${toString cfg.restApiPort}";
@ -238,6 +249,190 @@ in
wantedBy = [ "multi-user.target" ];
after = [ "network.target" ];
preStart = let
upgradeScript = pkgs.writers.writePython3 "patroni-migration-self-upgrade" {
libraries = [ (pkgs.python3Packages.toPythonModule patroni) ];
flakeIgnore = [
"E501"
];
} ''
import os
import subprocess
from datetime import datetime
from patroni.__main__ import Patroni
from patroni.config import Config
from patroni.utils import polling_loop
if __name__ == "__main__":
print("creating patroni control object")
ctl = Patroni(Config(os.getenv("PATRONICTL_CONFIG_FILE")))
pg = ctl.postgresql
print("running initdb")
pg.bootstrap._initdb(ctl.config.get("initdb"))
configuration = pg.config.effective_configuration
print("configuring postgres")
pg.config.check_directories()
pg.config.write_postgresql_conf(configuration)
pg.config.resolve_connection_addresses()
pg.config.replace_pg_hba()
pg.config.replace_pg_ident()
print("starting postgres")
pg.start()
auth = pg.config.get("authentication")
listen = pg.config.get("listen").split(":")
leader_host = os.getenv("PATRONIMIGRATOR_LEADER_HOST")
leader_port = os.getenv("PATRONIMIGRATOR_LEADER_PORT")
def psql(host=listen[0], port=listen[1], user=auth["superuser"]["username"]):
return lambda query: subprocess.run([
pg.pgcommand("psql"),
"-h", host,
"-p", port,
"-U", user,
"-f", "-"
], input=(query if type(query) is bytes else query.encode("utf8")))
print("dumping schema")
args = [
pg.pgcommand("pg_dumpall"),
"-h", leader_host,
"-p", leader_port,
"-U", auth["superuser"]["username"],
"-s"
]
print("running with args:")
print(args)
dump = subprocess.run(args, capture_output=True)
psql_self = psql()
psql_leader = psql(
host=leader_host,
port=leader_port
)
print("applying schema")
psql_self(dump.stdout)
ctime = int((datetime.utcnow() - datetime(1970, 1, 1)).total_seconds())
pub = f"pub_live_upgrade_{ctime}"
sub = f"sub_live_upgrade_{ctime}"
replication_user = auth["superuser"]["username"]
all_databases = pg.query("SELECT datname FROM pg_database WHERE datistemplate = false;").fetchall()
for (db,) in all_databases:
print(f"creating pub/sub for database {db}")
psql_leader(f"""
\\connect {db}
CREATE PUBLICATION {pub}_{db} FOR ALL TABLES;
""")
psql_self(f"""
\\connect {db}
CREATE SUBSCRIPTION {sub}_{db} CONNECTION 'host={leader_host} port={leader_port} dbname={db} user={replication_user}' PUBLICATION {pub}_{db};
""")
# TODO: should probably wait longer
for _ in polling_loop(300):
print("waiting for synchronization to complete")
laststate = "?"
for (state,) in pg.query("""
SELECT srsubstate FROM pg_subscription_rel;
"""):
laststate = state
if state != "r":
print(f"sync state={state}")
break
if laststate == "r":
break
print("synchronized!")
for (db,) in all_databases:
print(f"dropping pub/sub for database {db}")
psql_self(f"""
\\connect {db}
DROP SUBSCRIPTION {sub}_{db};
""")
psql_leader(f"""
\\connect {db}
DROP PUBLICATION {pub}_{db};
""")
[(sysid,)] = pg.query("SELECT system_identifier FROM pg_control_system();").fetchall()
print(f"setting system identifier to {sysid}")
ctl.dcs.initialize(create_new=False, sysid=str(sysid))
pg.stop()
os.system("patronictl resume")
'';
migrationScript = pkgs.writeShellScript "patroni-migration-replicate-or-self-upgrade" ''
if [[ "$(consul catalog nodes --service='${cfg.scope}' 2>/dev/null | wc -l)" -gt 0 ]]; then
# check if there's an active leader
leader="$(patronictl list -f json | jq -r 'map(select(.Role == "Leader" and .State == "running") | .Member) | .[0]')"
if [[ -n "$leader" ]]; then
leaderVersion="$(patronictl version '${cfg.scope}' "$leader" | grep -o 'PostgreSQL [0-9]*' | cut -d' ' -f2)"
if [[ "$leaderVersion" == '${postgresql.psqlSchema}' ]]; then
# leader is the same version as our target
echo leader is at target version, preparing for reinit
# TODO: need to wipe data dir, or will patroni do it for us?
rm -rf '${cfg.postgresqlDataDir}'
exit 0
else
echo leader version $leaderVersion differs from target version ${postgresql.psqlSchema}, trying to find an upgraded replica
for replica in $(patronictl list -f json | jq -r 'map(select(.Role == "Replica" and .State == "running") | .Member) | .[]'); do
replicaVersion="$(patronictl version '${cfg.scope}' "$replica" | grep -o 'PostgreSQL [0-9]*' | cut -d' ' -f2)"
if [[ "$replicaVersion" == '${postgresql.psqlSchema}' ]]; then
# another replica is the same version as us, make leader
echo found a replica with the same target version, attempting to promote it to leader
# TODO: do we need to force it to become the leader or is there another way?
if ! patronictl switchover '${cfg.scope}' --master "$leader" --candidate "$replica" --force --scheduled now; then
echo switchover failed! attempting failover
patronictl failover '${cfg.scope}' --candidate "$replica" --force
fi
while [[ "$(patronictl list -f json | jq -r 'map(select(.Role == "Leader" and .State == "running") | .Member) | .[0]')" != "$replica" ]]; do
echo waiting for "$replica" to become the leader
patronictl list
sleep 1
done
echo preparing for reinit after leader promotion
# TODO: need to wipe data dir, or will patroni do it for us?
rm -rf '${cfg.postgresqlDataDir}'
exit 0
fi
done
echo no other nodes are at the target version, performing self-upgrade
patronictl pause
leaderHost="$(patronictl list -f json | jq -r 'map(select(.Role == "Leader" and .State == "running") | .Host) | .[0]')"
# this is where it gets spicy
rm -rf '${cfg.postgresqlDataDir}'
install -dm700 '${cfg.postgresqlDataDir}'
# give the migration script 1800 seconds
systemd-notify EXTEND_TIMEOUT_USEC=1800000000
export PATRONIMIGRATOR_LEADER_HOST="$leaderHost"
# HACK: find a way to get the port
export PATRONIMIGRATOR_LEADER_PORT="5432"
export PYTHONUNBUFFERED=1
exec ${upgradeScript}
fi
fi
fi
echo consul returned no nodes, proceeding with cluster bootstrap
# no other nodes around, nothing we can do
'';
in mkIf cfg.migrations.enable /*bash*/ ''
export PATH=${makeBinPath [ pkgs.jq pkgs.gnugrep config.services.consul.package patroni ]}:$PATH
export PATRONICTL_CONFIG_FILE=${configFile}
set -e
pgVersion='${cfg.postgresqlDataDir}/PG_VERSION'
# don't do anything if already at the target version
if [[ -e "$pgVersion" && "$(<"$pgVersion")" == '${postgresql.psqlSchema}' ]]; then
echo data directory version is target, no migrations to run
exit 0
fi
# HACK:
export CONSUL_HTTP_ADDR=192.168.1.4:8500
# ask consul if there are any other nodes around
exec consul lock --verbose --child-exit-code --shell=false '/patroni-migrator-upgrade/${cfg.scope}' ${migrationScript}
'';
script = ''
${concatStringsSep "\n" (attrValues (mapAttrs (name: path: "export ${name}=\"$(<'${path}')\"") cfg.environmentFiles))}
exec ${patroni}/bin/patroni ${configFile}
@ -288,10 +483,10 @@ in
StateDirectoryMode = "0750";
})
(mkIf cfg.softwareWatchdog {
ExecStartPre = "+" + pkgs.writeShellScript "configure-software-watchdog.sh" ''
ExecStartPre = [("+" + pkgs.writeShellScript "configure-software-watchdog.sh" ''
${pkgs.kmod}/bin/modprobe softdog
${pkgs.coreutils}/bin/chown ${cfg.user} /dev/watchdog
'';
'')];
})];
};
};
@ -300,6 +495,7 @@ in
patroni
postgresql
(mkIf cfg.raft pkgs.python310Packages.pysyncobj)
(pkgs.python3.withPackages (_: [ (pkgs.python3Packages.toPythonModule patroni) ]))
];
environment.sessionVariables = {

View file

@ -1,7 +1,7 @@
{ self, ... }:
{
perSystem = { filters, pkgs, self', ... }: {
perSystem = { filters, pkgs, self', timeTravel', ... }: {
checks = filters.doFilter filters.checks {
keycloak = pkgs.callPackage ./keycloak-custom-jre.nix {
jre = self'.packages.jre17_standard;
@ -11,6 +11,16 @@
inherit (self) nixosModules;
inherit (self'.packages) postgresql;
};
patroni-migration = pkgs.callPackage ./patroni-migration.nix {
previous = timeTravel' "486161b78e45e94a6f314b65bb05080605f0cd01";
inherit (self) nixosModules;
inherit (self'.packages) postgresql;
exampleData = pkgs.fetchurl {
name = "omdb-2022-10-18.dump";
url = "https://github.com/credativ/omdb-postgresql/releases/download/2022-10-18/omdb.dump";
hash = "sha256-7ENUTHrpdrB44AyHT3aB44AFY/vFsKTzt70Fnb9ynq8=";
};
};
searxng = pkgs.callPackage ./searxng.nix {
inherit (self'.packages) searxng;
};

View file

@ -0,0 +1,265 @@
{ nixosTest, nixosModules, postgresql, previous, exampleData }:
nixosTest (
let
pgOld = previous.packages.postgresql;
pgNew = postgresql;
nodesIps = [
"192.168.1.1"
"192.168.1.2"
"192.168.1.3"
];
createNode = index: postgresql: { pkgs, ... }:
let
ip = builtins.elemAt nodesIps index; # since we already use IPs to identify servers
in
{
imports = [
nixosModules.patroni
nixosModules.systemd-extras
];
networking.interfaces.eth1.ipv4.addresses = pkgs.lib.mkOverride 0 [
{ address = ip; prefixLength = 16; }
];
networking.firewall.allowedTCPPorts = [ 5432 8008 5010 ];
environment.systemPackages = [ pkgs.jq ];
systemd.tmpfiles.rules = [
"d /data 0700 patroni patroni - -"
];
services.patroni = {
enable = true;
migrations = {
enable = true;
};
dataDir = "/data/patroni";
postgresqlDataDir = "/data/postgres";
postgresqlPackage = postgresql.withPackages (p: [ p.pg_safeupdate ]);
scope = "cluster1";
name = "node${toString(index + 1)}";
nodeIp = ip;
otherNodesIps = builtins.filter (h: h != ip) nodesIps;
softwareWatchdog = true;
settings = {
bootstrap = {
dcs = {
ttl = 30;
loop_wait = 10;
retry_timeout = 10;
maximum_lag_on_failover = 1048576;
};
initdb = [
{ encoding = "UTF8"; }
"data-checksums"
];
};
postgresql = {
use_pg_rewind = true;
use_slots = true;
authentication = {
replication = {
username = "replicator";
};
superuser = {
username = "postgres";
};
rewind = {
username = "rewind";
};
};
parameters = {
listen_addresses = "${ip}";
wal_level = "replica";
hot_standby_feedback = "on";
unix_socket_directories = "/tmp";
};
pg_hba = [
"host replication replicator 192.168.1.0/24 md5"
# Unsafe, do not use for anything other than tests
"host all all 0.0.0.0/0 trust"
];
};
consul = {
host = "192.168.1.4:8500";
register_service = true;
};
};
environmentFiles = {
PATRONI_REPLICATION_PASSWORD = pkgs.writeText "replication-password" "postgres";
PATRONI_SUPERUSER_PASSWORD = pkgs.writeText "superuser-password" "postgres";
PATRONI_REWIND_PASSWORD = pkgs.writeText "rewind-password" "postgres";
};
};
# We always want to restart so the tests never hang
systemd.services.patroni.serviceConfig.StartLimitIntervalSec = 0;
};
in
{
name = "patroni";
nodes = {
node1 = createNode 0 pgOld;
node2 = createNode 1 pgOld;
node3 = createNode 2 pgOld;
node1new = createNode 0 pgNew;
node2new = createNode 1 pgNew;
node3new = createNode 2 pgNew;
consul = { pkgs, ... }: {
networking.interfaces.eth1.ipv4.addresses = pkgs.lib.mkOverride 0 [
{ address = "192.168.1.4"; prefixLength = 16; }
];
services.consul = {
enable = true;
extraConfig = {
addresses.http = "192.168.1.4";
server = true;
bind_addr = "192.168.1.4";
bootstrap_expect = 1;
};
};
networking.firewall.allowedTCPPorts = [ 8500 ];
};
client = { pkgs, ... }: {
environment.systemPackages = [ postgresql ];
systemd.services.db-writer = {
wantedBy = [ "multi-user.target" ];
after = [ "haproxy.service" ];
requires = [ "haproxy.service" ];
serviceConfig.Type = "oneshot";
script = ''
set +e
while ! ${pgNew}/bin/psql -h 127.0.0.1 -U postgres --command='create table dummy2 as select * from generate_series(1, 10) as val;'; do
sleep 2;
done
i=11
version="$(${pgNew}/bin/psql -h 127.0.0.1 -U postgres --pset='pager=off' --tuples-only --command='select version();')"
while sleep .5; do
newVersion=""
while [[ -z "$newVersion" ]]; do
newVersion="$(${pgNew}/bin/psql -h 127.0.0.1 -U postgres --pset='pager=off' --tuples-only --command='select version();')"
sleep .5
done
echo $newVersion
while ! ${pgNew}/bin/psql -h 127.0.0.1 -U postgres --pset='pager=off' --tuples-only --command="insert into dummy2 values($i);"; do
retrying write for value $i
sleep .5
done
echo wrote value $i
i=$((i+1))
if [[ "$newVersion" != "$version" ]]; then
echo new version detected, quitting
exit 0
fi
done
'';
};
networking.interfaces.eth1.ipv4.addresses = pkgs.lib.mkOverride 0 [
{ address = "192.168.2.1"; prefixLength = 16; }
];
services.haproxy = {
enable = true;
config = ''
global
maxconn 100
defaults
log global
mode tcp
retries 2
timeout client 30m
timeout connect 4s
timeout server 30m
timeout check 5s
listen cluster1
bind 127.0.0.1:5432
option httpchk
http-check expect status 200
default-server inter 3s fall 3 rise 2 on-marked-down shutdown-sessions
${builtins.concatStringsSep "\n" (map (ip: "server postgresql_${ip}_5432 ${ip}:5432 maxconn 100 check port 8008") nodesIps)}
'';
};
};
};
testScript = /*python*/ ''
nodes = [node1, node2, node3]
nodes_new = [node1new, node2new, node3new]
node_pairs = [
(1, node1, node1new),
(2, node2, node2new),
(3, node3, node3new)
]
def wait_for_all_nodes_ready(nodes=nodes, expected_replicas=2):
booted_nodes = filter(lambda node: node.booted, nodes)
for node in booted_nodes:
print(node.succeed("patronictl list cluster1"))
node.wait_until_succeeds(f"[ $(patronictl list -f json cluster1 | jq 'length') == {expected_replicas + 1} ]")
node.wait_until_succeeds("[ $(patronictl list -f json cluster1 | jq 'map(select(.Role | test(\"^Leader$\"))) | map(select(.State | test(\"^running$\"))) | length') == 1 ]")
node.wait_until_succeeds(f"[ $(patronictl list -f json cluster1 | jq 'map(select(.Role | test(\"^Replica$\"))) | map(select(.State | test(\"^running$\"))) | length') == {expected_replicas} ]")
print(node.succeed("patronictl list cluster1"))
client.wait_until_succeeds("psql -h 127.0.0.1 -U postgres --command='select 1;'")
def run_dummy_queries():
client.succeed("psql -h 127.0.0.1 -U postgres --pset='pager=off' --tuples-only --command='insert into dummy(val) values (101);'")
client.succeed("test $(psql -h 127.0.0.1 -U postgres --pset='pager=off' --tuples-only --command='select val from dummy where val = 101;') -eq 101")
client.succeed("psql -h 127.0.0.1 -U postgres --pset='pager=off' --tuples-only --command='delete from dummy where val = 101;'")
consul.start()
client.start()
for node in nodes:
node.start()
with subtest("should bootstrap a new patroni cluster"):
wait_for_all_nodes_ready()
with subtest("should be able to insert and select"):
client.succeed("psql -h 127.0.0.1 -U postgres --command='create table dummy as select * from generate_series(1, 100) as val;'")
client.succeed("test $(psql -h 127.0.0.1 -U postgres --pset='pager=off' --tuples-only --command='select count(distinct val) from dummy;') -eq 100")
with subtest("should be able to load test database from dump"):
client.succeed("psql -h 127.0.0.1 -U postgres --command='create database example;'")
client.succeed("pg_restore -h 127.0.0.1 -U postgres -n public -d example ${exampleData}")
with subtest("should upgrade to a new major version"):
for (i, old, new) in node_pairs:
old.succeed("systemctl stop patroni")
old.succeed(f"tar cf /tmp/shared/data{i}.tar /data")
old.shutdown()
new.succeed(f"tar xf /tmp/shared/data{i}.tar -C /")
with subtest("should be able to read and write after upgrade"):
wait_for_all_nodes_ready(nodes=nodes_new)
run_dummy_queries()
#with subtest("should not have lost any data"):
# client.succeed("test $(psql -h 127.0.0.1 -U postgres --pset='pager=off' --tuples-only --command='select count(distinct val) from dummy2;') -eq $(psql -h 127.0.0.1 -U postgres --pset='pager=off' --tuples-only --command='select max(val) from dummy2;')")
'';
})

View file

@ -83,7 +83,7 @@ super: rec {
jre = jre17_standard;
};
postgresql = super.postgresql_14;
postgresql = super.postgresql_15;
powerdns-admin = patch super.powerdns-admin "patches/base/powerdns-admin";