diff --git a/packages/servers/reflex-cache/pyproject.toml b/packages/servers/reflex-cache/pyproject.toml index 0430a27..79a0233 100644 --- a/packages/servers/reflex-cache/pyproject.toml +++ b/packages/servers/reflex-cache/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "reflex-cache" -version = "0.1.0" +version = "0.2.0" description = "Controller for Nix binary caches on IPFS" authors = ["Max "] license = "AGPL-3.0" diff --git a/packages/servers/reflex-cache/reflex_cache/ipfs.py b/packages/servers/reflex-cache/reflex_cache/ipfs.py index af439cb..9e68a7e 100644 --- a/packages/servers/reflex-cache/reflex_cache/ipfs.py +++ b/packages/servers/reflex-cache/reflex_cache/ipfs.py @@ -1,3 +1,4 @@ +from datetime import datetime, timedelta, timezone from urllib.parse import quote_plus import requests @@ -5,8 +6,9 @@ import requests_unixsocket class IPFSController: - def __init__(self, apiAddress, nixCache, db): - self.__addr = f'http+unix://{quote_plus(apiAddress.get("unix"))}' + def __init__(self, nodeApiAddress, clusterApiAddress, nixCache, db): + self.__nodeAddr = f'http+unix://{quote_plus(nodeApiAddress.get("unix"))}' + self.__clusterAddr = f'http+unix://{quote_plus(clusterApiAddress.get("unix"))}' self.__nix = nixCache self.__db = db @@ -17,11 +19,17 @@ class IPFSController: upload = {"file": ("FILE", content, "application/octet-stream")} try: rIpfs = requests_unixsocket.post( - f"{self.__addr}/api/v0/add?pin=false&quieter=true", files=upload + f"{self.__nodeAddr}/api/v0/add?pin=false&quieter=true", files=upload ) hash = rIpfs.json()["Hash"] print(f"Mapped: {nar} -> /ipfs/{hash}") self.__db.set_path(nar, hash) + expireAt = datetime.now(timezone.utc) + timedelta(hours=24) + rClusterPin = requests_unixsocket.post( + f"{self.__clusterAddr}/pins/ipfs/{hash}?expire-at={quote_plus(expireAt.isoformat())}&mode=recursive&name=reflex-{quote_plus(nar)}&replication-max=2&replication-min=1", files=upload + ) + if rClusterPin.status_code != 200: + print(f"Warning: failed to pin {hash} on IPFS cluster") callback() return (nar, 200, hash) except requests.ConnectionError as e: diff --git a/packages/servers/reflex-cache/reflex_cache/service_handler.py b/packages/servers/reflex-cache/reflex_cache/service_handler.py index 1a848e5..1a24179 100644 --- a/packages/servers/reflex-cache/reflex_cache/service_handler.py +++ b/packages/servers/reflex-cache/reflex_cache/service_handler.py @@ -22,7 +22,11 @@ class ReflexHTTPServiceHandler(BaseHTTPRequestHandler): util.envOr("NIX_CACHES", "https://cache.nixos.org").split(" ") ) - _ipfs = ipfs.IPFSController(Multiaddr(util.envOrRaise("IPFS_API")), _nix, _db) + _ipfs = ipfs.IPFSController( + Multiaddr(util.envOrRaise("IPFS_API")), + Multiaddr(util.envOrRaise("IPFS_CLUSTER_API")), + _nix, + _db) def do_HEAD(self): if self.path.endswith(".narinfo"): @@ -124,8 +128,15 @@ class ReflexHTTPServiceHandler(BaseHTTPRequestHandler): if not found and len(self._workSet) < 8: print(f"Pre-flight: creating IPFS fetch task for {nar}") + def cb(): + with self._workSetLock: + try: + self._workSet.remove((nar, f)) + except KeyError: + # already removed + pass f = self._executor_nar.submit( - self._ipfs.ipfs_fetch_task, nar + self._ipfs.ipfs_fetch_task, cb, nar ) self._workSet.add((nar, f)) return