From 26005dd03ae44dffcea83a07632f9be1554ff929 Mon Sep 17 00:00:00 2001 From: Max Date: Sun, 30 Jan 2022 20:28:16 +0100 Subject: [PATCH] VEGAS: add Nix IPFS cache service --- hosts/VEGAS/services/nix/ipfs-cache.nix | 24 +++ hosts/VEGAS/services/nix/nix-ipfs-cache.py | 229 +++++++++++++++++++++ hosts/VEGAS/system.nix | 1 + 3 files changed, 254 insertions(+) create mode 100644 hosts/VEGAS/services/nix/ipfs-cache.nix create mode 100644 hosts/VEGAS/services/nix/nix-ipfs-cache.py diff --git a/hosts/VEGAS/services/nix/ipfs-cache.nix b/hosts/VEGAS/services/nix/ipfs-cache.nix new file mode 100644 index 0000000..2764a4b --- /dev/null +++ b/hosts/VEGAS/services/nix/ipfs-cache.nix @@ -0,0 +1,24 @@ +{ config, pkgs, tools, ... }: + +let + py3 = pkgs.python3.withPackages (ps: with ps; [ + requests + requests-unixsocket + ]); + + port = config.portsStr.nixIpfs; +in { + reservePortsFor = [ "nixIpfs" ]; + + systemd.services.nix-ipfs-cache = { + wantedBy = [ "multi-user.target" ]; + serviceConfig = { + ExecStart = "${py3}/bin/python3 -u ${./nix-ipfs-cache.py} ${port}"; + DynamicUser = true; + SupplementaryGroups = [ "ipfs" ]; + CacheDirectory = "nix-ipfs-cache"; + }; + }; + + services.nginx.virtualHosts."reflex.${tools.meta.domain}" = tools.nginx.vhosts.proxy "http://127.0.0.1:${port}"; +} diff --git a/hosts/VEGAS/services/nix/nix-ipfs-cache.py b/hosts/VEGAS/services/nix/nix-ipfs-cache.py new file mode 100644 index 0000000..ce208cf --- /dev/null +++ b/hosts/VEGAS/services/nix/nix-ipfs-cache.py @@ -0,0 +1,229 @@ +from concurrent.futures import ThreadPoolExecutor +from functools import lru_cache +from http.server import HTTPServer, BaseHTTPRequestHandler +from os import environ +from random import randint +from socketserver import ThreadingMixIn +from sys import argv +from threading import Thread, Lock +import base64 +import json +import queue +import re +import requests +import requests_unixsocket +import sqlite3 +import time + +CACHES = [ + "https://cache.privatevoid.net", + "https://cache.nixos.org", + "https://max.cachix.org" +] + +workSet = set() +workSetLock = Lock() +dbLock = Lock() + +class Uncached(Exception): + pass + +class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): + pass + +def db_getcon(): + cacheDir = environ["CACHE_DIRECTORY"] + con = sqlite3.connect(f"{cacheDir}/nix-ipfs-cache.db") + cur = con.cursor() + return con, cur + +def db_close(con): + con.close() + +@lru_cache(maxsize=65536) +def db_get_path_cached(narPath): + with dbLock: + con, cur = db_getcon() + for (nar, ipfs) in cur.execute("SELECT nar, ipfs FROM NarToIpfs WHERE nar=:nar", {"nar": narPath}): + db_close(con) + return ipfs + raise Uncached + +def db_get_path(narPath): + try: + return db_get_path_cached(narPath) + except Uncached: + return None + +def db_set_path(narPath, ipfsPath): + with dbLock: + con, cur = db_getcon() + cur.execute("INSERT INTO NarToIpfs VALUES (:nar, :ipfs)", {"nar": narPath, "ipfs": ipfsPath}) + con.commit() + db_close(con) + +@lru_cache(maxsize=32768) +def try_all_cached(method, path): + fn = requests.get if method == "get" else requests.head if method == "head" else Error("invalid method") + + bestState = 502 + + print(f" fetching [{method}] from any cache {path}") + for cache in CACHES: + try: + rCache = fn(f"{cache}{path}") + if rCache.status_code < bestState: + bestState = rCache.status_code + + print(f" {rCache.status_code} - [{method}] {cache}{path}") + if bestState == 200: + r = (bestState,rCache.content if method != "head" else False) + if path.endswith(".narinfo"): + return r + else: + raise Uncached(r) + except requests.ConnectionError as e: + print(e) + + raise Uncached((bestState,False)) + +def try_all(method, path): + try: + return try_all_cached(method, path) + except Uncached as r: + return r.args[0] + +def ipfs_fetch_task(nar): + print(f"Downloading NAR: {nar}") + code, content = try_all("get",nar) + if code == 200: + upload = {'file': ('FILE',content,'application/octet-stream')} + try: + rIpfs = requests_unixsocket.post('http+unix://%2Frun%2Fipfs%2Fipfs-api.sock/api/v0/add?pin=false&quieter=true', files=upload) + hash = rIpfs.json()["Hash"] + print(f"Mapped: {nar} -> /ipfs/{hash}") + db_set_path(nar, hash) + return (nar, 200, hash) + except requests.ConnectionError as e: + print(e) + return (nar, 502, False) + else: + return (nar, code, False) + +class ServiceHandler(BaseHTTPRequestHandler): + _executor_nar_pre = ThreadPoolExecutor(4) + _executor_nar_main = ThreadPoolExecutor(8) + #_executor_narinfo = ThreadPoolExecutor(16) # for narinfo uploads - TODO + + def do_HEAD(self): + if self.path.endswith(".narinfo"): + print(f"NAR info request: {self.path}") + code, content = try_all("head",self.path) + self.send_response(code) + self.end_headers() + else: + self.send_response(404) + self.end_headers() + + def do_GET(self): + if self.path.startswith("/nix-cache-info"): + self.send_response(200) + self.end_headers() + self.wfile.write(b"StoreDir: /nix/store\n") + return + + elif self.path.startswith("/nar/"): + resultHash = db_get_path(self.path) + + if resultHash == None: + with workSetLock: + found = False + for (itemNar, itemFuture) in workSet: + if itemNar == self.path: + f = itemFuture + print(f"IPFS fetch task for {self.path} already being processed") + found = True + break + + if not found: + print(f"Creating new IPFS fetch task for {self.path}") + f = self._executor_nar_main.submit(ipfs_fetch_task, self.path) + workSet.add((self.path, f)) + + resultNar, code, resultHash = f.result() + + with workSetLock: + try: + workSet.remove((self.path, f)) + except KeyError: + # already removed + pass + else: + code = 200 + + if code != 200: + self.send_response(code) + self.end_headers() + return + + self.send_response(302) + + # not used for auth, but for defining a redirect target + auth = self.headers.get('Authorization') + if auth != None: + try: + decoded1 = base64.b64decode(auth.removeprefix("Basic ")).removesuffix(b":") + if decoded1.isdigit(): + redirect = f"http://127.0.0.1:{decoded1.decode('utf-8')}" + else: + decoded2 = base64.b32decode(decoded1.upper() + b"=" * (-len(decoded1) % 8)) + redirect = decoded2.decode("utf-8") + except Exception: + redirect = "http://127.0.0.1:8080" + else: + redirect = "http://127.0.0.1:8080" + + self.send_header('Location', f'{redirect}/ipfs/{resultHash}') + self.send_header('X-Ipfs-Path', f'/ipfs/{resultHash}') + self.end_headers() + return + + elif self.path.endswith(".narinfo"): + print(f"NAR info request: {self.path}") + code, content = try_all("get",self.path) + self.send_response(code) + self.end_headers() + if code == 200: + self.wfile.write(content) + if match := re.search('URL: (nar/[a-z0-9]*\.nar.*)', content.decode("utf-8")): + nar = f"/{match.group(1)}" + if db_get_path(nar) == None: + with workSetLock: + found = False + for (itemNar, itemFuture) in workSet: + if itemNar == nar: + found = True + break + + if not found: + print(f"Pre-flight: creating IPFS fetch task for {nar}") + f = self._executor_nar_pre.submit(ipfs_fetch_task, nar) + workSet.add((nar, f)) + return + + else: + code = 404 + + if code > 299: + self.send_response(code) + self.end_headers() + return + + +con, cur = db_getcon() +cur.execute("CREATE TABLE IF NOT EXISTS NarToIpfs (nar text primary key, ipfs text)") +con.commit() +db_close(con) + +server = ThreadingHTTPServer(('127.0.0.1',int(argv[1])), ServiceHandler) +server.serve_forever() diff --git a/hosts/VEGAS/system.nix b/hosts/VEGAS/system.nix index 664689b..e109f42 100644 --- a/hosts/VEGAS/system.nix +++ b/hosts/VEGAS/system.nix @@ -33,6 +33,7 @@ ./services/mail ./services/matrix ./services/nix/binary-cache.nix + ./services/nix/ipfs-cache.nix ./services/nix/nar-serve.nix ./services/object-storage ./services/openvpn