diff --git a/modules/patroni/default.nix b/modules/patroni/default.nix index 0a2f364..9ed7da8 100644 --- a/modules/patroni/default.nix +++ b/modules/patroni/default.nix @@ -183,6 +183,13 @@ in }; description = mdDoc "Environment variables made available to Patroni as files content, useful for providing secrets from files."; }; + + migrations = { + enable = mkEnableOption "automatic migrations"; + + + + }; }; config = mkIf cfg.enable { @@ -192,6 +199,10 @@ in inherit (cfg) name; inherit (cfg) namespace; + bootstrap = mkIf cfg.migrations.enable { + dcs.postgresql.parameters.wal_level = "logical"; + }; + restapi = { listen = "${cfg.nodeIp}:${toString cfg.restApiPort}"; connect_address = "${cfg.nodeIp}:${toString cfg.restApiPort}"; @@ -238,6 +249,187 @@ in wantedBy = [ "multi-user.target" ]; after = [ "network.target" ]; + preStart = let + upgradeScript = pkgs.writers.writePython3 "patroni-migration-self-upgrade" { + libraries = [ (pkgs.python3Packages.toPythonModule patroni) ]; + flakeIgnore = [ + "E501" + ]; + } '' + import os + import subprocess + from datetime import datetime + from patroni.__main__ import Patroni + from patroni.config import Config + from patroni.utils import polling_loop + + if __name__ == "__main__": + print("creating patroni control object") + ctl = Patroni(Config(os.getenv("PATRONICTL_CONFIG_FILE"))) + pg = ctl.postgresql + + print("running initdb") + pg.bootstrap._initdb(ctl.config.get("initdb")) + + configuration = pg.config.effective_configuration + + print("configuring postgres") + pg.config.check_directories() + pg.config.write_postgresql_conf(configuration) + pg.config.resolve_connection_addresses() + pg.config.replace_pg_hba() + pg.config.replace_pg_ident() + + print("starting postgres") + pg.start() + + auth = pg.config.get("authentication") + listen = pg.config.get("listen").split(":") + leader_host = os.getenv("PATRONIMIGRATOR_LEADER_HOST") + leader_port = os.getenv("PATRONIMIGRATOR_LEADER_PORT") + + def psql(host=listen[0], port=listen[1], user=auth["superuser"]["username"]): + return lambda query: subprocess.run([ + pg.pgcommand("psql"), + "-h", host, + "-p", port, + "-U", user, + "-f", "-" + ], input=(query if type(query) is bytes else query.encode("utf8"))) + + print("dumping schema") + args = [ + pg.pgcommand("pg_dumpall"), + "-h", leader_host, + "-p", leader_port, + "-U", auth["superuser"]["username"], + "-s" + ] + print("running with args:") + print(args) + dump = subprocess.run(args, capture_output=True) + + psql_self = psql() + psql_leader = psql( + host=leader_host, + port=leader_port + ) + print("applying schema") + psql_self(dump.stdout) + ctime = int((datetime.utcnow() - datetime(1970, 1, 1)).total_seconds()) + pub = f"pub_live_upgrade_{ctime}" + sub = f"sub_live_upgrade_{ctime}" + replication_user = auth["superuser"]["username"] + all_databases = pg.query("SELECT datname FROM pg_database WHERE datistemplate = false;").fetchall() + for (db,) in all_databases: + print(f"creating pub/sub for database {db}") + psql_leader(f""" + \\connect {db} + CREATE PUBLICATION {pub}_{db} FOR ALL TABLES; + """) + psql_self(f""" + \\connect {db} + CREATE SUBSCRIPTION {sub}_{db} CONNECTION 'host={leader_host} port={leader_port} dbname={db} user={replication_user}' PUBLICATION {pub}_{db}; + """) + + # TODO: should probably wait longer + for _ in polling_loop(300): + print("waiting for synchronization to complete") + laststate = "?" + for (state,) in pg.query(""" + SELECT srsubstate FROM pg_subscription_rel; + """): + laststate = state + if state != "r": + print(f"sync state={state}") + break + if laststate == "r": + break + print("synchronized!") + for (db,) in all_databases: + print(f"dropping pub/sub for database {db}") + psql_self(f""" + \\connect {db} + DROP SUBSCRIPTION {sub}_{db}; + """) + psql_leader(f""" + \\connect {db} + DROP PUBLICATION {pub}_{db}; + """) + + [(sysid,)] = pg.query("SELECT system_identifier FROM pg_control_system();").fetchall() + print(f"setting system identifier to {sysid}") + ctl.dcs.initialize(create_new=False, sysid=str(sysid)) + pg.stop() + ''; + migrationScript = pkgs.writeShellScript "patroni-migration-replicate-or-self-upgrade" '' + if [[ "$(consul catalog nodes --service='${cfg.scope}' 2>/dev/null | wc -l)" -gt 0 ]]; then + # check if there's an active leader + leader="$(patronictl list -f json | jq -r 'map(select(.Role == "Leader" and .State == "running") | .Member) | .[0]')" + if [[ -n "$leader" ]]; then + leaderVersion="$(patronictl version '${cfg.scope}' "$leader" | grep -o 'PostgreSQL [0-9]*' | cut -d' ' -f2)" + if [[ "$leaderVersion" == '${postgresql.psqlSchema}' ]]; then + # leader is the same version as our target + echo leader is at target version, preparing for reinit + # TODO: need to wipe data dir, or will patroni do it for us? + rm -rf '${cfg.postgresqlDataDir}' + exit 0 + else + echo leader version $leaderVersion differs from target version ${postgresql.psqlSchema}, trying to find an upgraded replica + for replica in $(patronictl list -f json | jq -r 'map(select(.Role == "Replica" and .State == "running") | .Member) | .[]'); do + replicaVersion="$(patronictl version '${cfg.scope}' "$replica" | grep -o 'PostgreSQL [0-9]*' | cut -d' ' -f2)" + if [[ "$replicaVersion" == '${postgresql.psqlSchema}' ]]; then + # another replica is the same version as us, make leader + echo found a replica with the same target version, attempting to promote it to leader + # TODO: do we need to force it to become the leader or is there another way? + if ! patronictl switchover '${cfg.scope}' --master "$leader" --candidate "$replica" --force --scheduled now; then + echo switchover failed! attempting failover + patronictl failover '${cfg.scope}' --candidate "$replica" --force + fi + while [[ "$(patronictl list -f json | jq -r 'map(select(.Role == "Leader" and .State == "running") | .Member) | .[0]')" != "$replica" ]]; do + echo waiting for "$replica" to become the leader + patronictl list + sleep 1 + done + echo preparing for reinit after leader promotion + # TODO: need to wipe data dir, or will patroni do it for us? + rm -rf '${cfg.postgresqlDataDir}' + exit 0 + fi + done + echo no other nodes are at the target version, performing self-upgrade + leaderHost="$(patronictl list -f json | jq -r 'map(select(.Role == "Leader" and .State == "running") | .Host) | .[0]')" + # this is where it gets spicy + rm -rf '${cfg.postgresqlDataDir}' + install -dm700 '${cfg.postgresqlDataDir}' + # give the migration script 1800 seconds + systemd-notify EXTEND_TIMEOUT_USEC=1800000000 + export PATRONIMIGRATOR_LEADER_HOST="$leaderHost" + # HACK: find a way to get the port + export PATRONIMIGRATOR_LEADER_PORT="5432" + exec ${upgradeScript} + fi + fi + fi + echo consul returned no nodes, proceeding with cluster bootstrap + # no other nodes around, nothing we can do + ''; + in mkIf cfg.migrations.enable /*bash*/ '' + export PATH=${makeBinPath [ pkgs.jq pkgs.gnugrep config.services.consul.package patroni ]}:$PATH + export PATRONICTL_CONFIG_FILE=${configFile} + set -e + pgVersion='${cfg.postgresqlDataDir}/PG_VERSION' + # don't do anything if already at the target version + if [[ -e "$pgVersion" && "$(<"$pgVersion")" == '${postgresql.psqlSchema}' ]]; then + echo data directory version is target, no migrations to run + exit 0 + fi + # HACK: + export CONSUL_HTTP_ADDR=192.168.1.4:8500 + # ask consul if there are any other nodes around + exec consul lock --verbose --child-exit-code --shell=false '/patroni-migrator-upgrade/${cfg.scope}' ${migrationScript} + ''; + script = '' ${concatStringsSep "\n" (attrValues (mapAttrs (name: path: "export ${name}=\"$(<'${path}')\"") cfg.environmentFiles))} exec ${patroni}/bin/patroni ${configFile} @@ -288,10 +480,10 @@ in StateDirectoryMode = "0750"; }) (mkIf cfg.softwareWatchdog { - ExecStartPre = "+" + pkgs.writeShellScript "configure-software-watchdog.sh" '' + ExecStartPre = [("+" + pkgs.writeShellScript "configure-software-watchdog.sh" '' ${pkgs.kmod}/bin/modprobe softdog ${pkgs.coreutils}/bin/chown ${cfg.user} /dev/watchdog - ''; + '')]; })]; }; }; @@ -300,6 +492,7 @@ in patroni postgresql (mkIf cfg.raft pkgs.python310Packages.pysyncobj) + (pkgs.python3.withPackages (_: [ (pkgs.python3Packages.toPythonModule patroni) ])) ]; environment.sessionVariables = {