diff --git a/hosts/prophet/services/reflex/reflex.py b/hosts/prophet/services/reflex/reflex.py deleted file mode 100644 index ce208cf..0000000 --- a/hosts/prophet/services/reflex/reflex.py +++ /dev/null @@ -1,229 +0,0 @@ -from concurrent.futures import ThreadPoolExecutor -from functools import lru_cache -from http.server import HTTPServer, BaseHTTPRequestHandler -from os import environ -from random import randint -from socketserver import ThreadingMixIn -from sys import argv -from threading import Thread, Lock -import base64 -import json -import queue -import re -import requests -import requests_unixsocket -import sqlite3 -import time - -CACHES = [ - "https://cache.privatevoid.net", - "https://cache.nixos.org", - "https://max.cachix.org" -] - -workSet = set() -workSetLock = Lock() -dbLock = Lock() - -class Uncached(Exception): - pass - -class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): - pass - -def db_getcon(): - cacheDir = environ["CACHE_DIRECTORY"] - con = sqlite3.connect(f"{cacheDir}/nix-ipfs-cache.db") - cur = con.cursor() - return con, cur - -def db_close(con): - con.close() - -@lru_cache(maxsize=65536) -def db_get_path_cached(narPath): - with dbLock: - con, cur = db_getcon() - for (nar, ipfs) in cur.execute("SELECT nar, ipfs FROM NarToIpfs WHERE nar=:nar", {"nar": narPath}): - db_close(con) - return ipfs - raise Uncached - -def db_get_path(narPath): - try: - return db_get_path_cached(narPath) - except Uncached: - return None - -def db_set_path(narPath, ipfsPath): - with dbLock: - con, cur = db_getcon() - cur.execute("INSERT INTO NarToIpfs VALUES (:nar, :ipfs)", {"nar": narPath, "ipfs": ipfsPath}) - con.commit() - db_close(con) - -@lru_cache(maxsize=32768) -def try_all_cached(method, path): - fn = requests.get if method == "get" else requests.head if method == "head" else Error("invalid method") - - bestState = 502 - - print(f" fetching [{method}] from any cache {path}") - for cache in CACHES: - try: - rCache = fn(f"{cache}{path}") - if rCache.status_code < bestState: - bestState = rCache.status_code - - print(f" {rCache.status_code} - [{method}] {cache}{path}") - if bestState == 200: - r = (bestState,rCache.content if method != "head" else False) - if path.endswith(".narinfo"): - return r - else: - raise Uncached(r) - except requests.ConnectionError as e: - print(e) - - raise Uncached((bestState,False)) - -def try_all(method, path): - try: - return try_all_cached(method, path) - except Uncached as r: - return r.args[0] - -def ipfs_fetch_task(nar): - print(f"Downloading NAR: {nar}") - code, content = try_all("get",nar) - if code == 200: - upload = {'file': ('FILE',content,'application/octet-stream')} - try: - rIpfs = requests_unixsocket.post('http+unix://%2Frun%2Fipfs%2Fipfs-api.sock/api/v0/add?pin=false&quieter=true', files=upload) - hash = rIpfs.json()["Hash"] - print(f"Mapped: {nar} -> /ipfs/{hash}") - db_set_path(nar, hash) - return (nar, 200, hash) - except requests.ConnectionError as e: - print(e) - return (nar, 502, False) - else: - return (nar, code, False) - -class ServiceHandler(BaseHTTPRequestHandler): - _executor_nar_pre = ThreadPoolExecutor(4) - _executor_nar_main = ThreadPoolExecutor(8) - #_executor_narinfo = ThreadPoolExecutor(16) # for narinfo uploads - TODO - - def do_HEAD(self): - if self.path.endswith(".narinfo"): - print(f"NAR info request: {self.path}") - code, content = try_all("head",self.path) - self.send_response(code) - self.end_headers() - else: - self.send_response(404) - self.end_headers() - - def do_GET(self): - if self.path.startswith("/nix-cache-info"): - self.send_response(200) - self.end_headers() - self.wfile.write(b"StoreDir: /nix/store\n") - return - - elif self.path.startswith("/nar/"): - resultHash = db_get_path(self.path) - - if resultHash == None: - with workSetLock: - found = False - for (itemNar, itemFuture) in workSet: - if itemNar == self.path: - f = itemFuture - print(f"IPFS fetch task for {self.path} already being processed") - found = True - break - - if not found: - print(f"Creating new IPFS fetch task for {self.path}") - f = self._executor_nar_main.submit(ipfs_fetch_task, self.path) - workSet.add((self.path, f)) - - resultNar, code, resultHash = f.result() - - with workSetLock: - try: - workSet.remove((self.path, f)) - except KeyError: - # already removed - pass - else: - code = 200 - - if code != 200: - self.send_response(code) - self.end_headers() - return - - self.send_response(302) - - # not used for auth, but for defining a redirect target - auth = self.headers.get('Authorization') - if auth != None: - try: - decoded1 = base64.b64decode(auth.removeprefix("Basic ")).removesuffix(b":") - if decoded1.isdigit(): - redirect = f"http://127.0.0.1:{decoded1.decode('utf-8')}" - else: - decoded2 = base64.b32decode(decoded1.upper() + b"=" * (-len(decoded1) % 8)) - redirect = decoded2.decode("utf-8") - except Exception: - redirect = "http://127.0.0.1:8080" - else: - redirect = "http://127.0.0.1:8080" - - self.send_header('Location', f'{redirect}/ipfs/{resultHash}') - self.send_header('X-Ipfs-Path', f'/ipfs/{resultHash}') - self.end_headers() - return - - elif self.path.endswith(".narinfo"): - print(f"NAR info request: {self.path}") - code, content = try_all("get",self.path) - self.send_response(code) - self.end_headers() - if code == 200: - self.wfile.write(content) - if match := re.search('URL: (nar/[a-z0-9]*\.nar.*)', content.decode("utf-8")): - nar = f"/{match.group(1)}" - if db_get_path(nar) == None: - with workSetLock: - found = False - for (itemNar, itemFuture) in workSet: - if itemNar == nar: - found = True - break - - if not found: - print(f"Pre-flight: creating IPFS fetch task for {nar}") - f = self._executor_nar_pre.submit(ipfs_fetch_task, nar) - workSet.add((nar, f)) - return - - else: - code = 404 - - if code > 299: - self.send_response(code) - self.end_headers() - return - - -con, cur = db_getcon() -cur.execute("CREATE TABLE IF NOT EXISTS NarToIpfs (nar text primary key, ipfs text)") -con.commit() -db_close(con) - -server = ThreadingHTTPServer(('127.0.0.1',int(argv[1])), ServiceHandler) -server.serve_forever()