From 6fdfd183d476411080221a863be5f36e54ad4964 Mon Sep 17 00:00:00 2001 From: Max Date: Tue, 1 Mar 2022 23:59:31 +0100 Subject: [PATCH] packages/reflex-cache: black + cleanup --- .../servers/reflex-cache/reflex_cache/db.py | 16 ++++--- .../servers/reflex-cache/reflex_cache/ipfs.py | 10 ++--- .../servers/reflex-cache/reflex_cache/main.py | 13 +++--- .../reflex-cache/reflex_cache/nix_cache.py | 12 +++-- .../reflex_cache/service_handler.py | 45 ++++++++++++------- .../servers/reflex-cache/reflex_cache/util.py | 6 ++- 6 files changed, 67 insertions(+), 35 deletions(-) diff --git a/packages/servers/reflex-cache/reflex_cache/db.py b/packages/servers/reflex-cache/reflex_cache/db.py index 6911e0b..22a3718 100644 --- a/packages/servers/reflex-cache/reflex_cache/db.py +++ b/packages/servers/reflex-cache/reflex_cache/db.py @@ -10,13 +10,14 @@ class ReflexDB: def __init__(self, cacheDir): self.__cacheDir = cacheDir self.__lock = Lock() - + # initialize DB schema with self.getcon() as (con, cur): - cur.execute("CREATE TABLE IF NOT EXISTS NarToIpfs (nar text primary key, ipfs text)") + cur.execute( + "CREATE TABLE IF NOT EXISTS NarToIpfs (nar text primary key, ipfs text)" + ) con.commit() - @contextlib.contextmanager def getcon(self): with self.__lock: @@ -30,7 +31,9 @@ class ReflexDB: @lru_cache(maxsize=65536) def __get_path_cached(self, narPath): with self.getcon() as (con, cur): - for (nar, ipfs) in cur.execute("SELECT nar, ipfs FROM NarToIpfs WHERE nar=:nar", {"nar": narPath}): + for (nar, ipfs) in cur.execute( + "SELECT nar, ipfs FROM NarToIpfs WHERE nar=:nar", {"nar": narPath} + ): return ipfs # HACK: lru_cache does not cache results if an exception occurs # since we don't want to cache empty query results, we make use of this behavior @@ -44,5 +47,8 @@ class ReflexDB: def set_path(self, narPath, ipfsPath): with self.getcon() as (con, cur): - cur.execute("INSERT INTO NarToIpfs VALUES (:nar, :ipfs)", {"nar": narPath, "ipfs": ipfsPath}) + cur.execute( + "INSERT INTO NarToIpfs VALUES (:nar, :ipfs)", + {"nar": narPath, "ipfs": ipfsPath}, + ) con.commit() diff --git a/packages/servers/reflex-cache/reflex_cache/ipfs.py b/packages/servers/reflex-cache/reflex_cache/ipfs.py index 48f5a3e..20ce2ba 100644 --- a/packages/servers/reflex-cache/reflex_cache/ipfs.py +++ b/packages/servers/reflex-cache/reflex_cache/ipfs.py @@ -3,8 +3,6 @@ from urllib.parse import quote_plus import requests import requests_unixsocket -from reflex_cache.util import envOrRaise - class IPFSController: def __init__(self, apiAddress, nixCache, db): @@ -14,11 +12,13 @@ class IPFSController: def ipfs_fetch_task(self, nar): print(f"Downloading NAR: {nar}") - code, content = self.__nix.try_all("get",nar) + code, content = self.__nix.try_all("get", nar) if code == 200: - upload = {'file': ('FILE',content,'application/octet-stream')} + upload = {"file": ("FILE", content, "application/octet-stream")} try: - rIpfs = requests_unixsocket.post(f'{self.__addr}/api/v0/add?pin=false&quieter=true', files=upload) + rIpfs = requests_unixsocket.post( + f"{self.__addr}/api/v0/add?pin=false&quieter=true", files=upload + ) hash = rIpfs.json()["Hash"] print(f"Mapped: {nar} -> /ipfs/{hash}") self.__db.set_path(nar, hash) diff --git a/packages/servers/reflex-cache/reflex_cache/main.py b/packages/servers/reflex-cache/reflex_cache/main.py index 36973b5..1faa249 100644 --- a/packages/servers/reflex-cache/reflex_cache/main.py +++ b/packages/servers/reflex-cache/reflex_cache/main.py @@ -1,16 +1,19 @@ -from reflex_cache import db, service_handler, util +from reflex_cache import service_handler, util CACHES = [ "https://cache.privatevoid.net", "https://cache.nixos.org", - "https://max.cachix.org" + "https://max.cachix.org", ] - def main(): - server = util.ThreadingHTTPServer(('127.0.0.1',int(util.envOr("REFLEX_PORT", "8002"))), service_handler.ReflexHTTPServiceHandler) + server = util.ThreadingHTTPServer( + ("127.0.0.1", int(util.envOr("REFLEX_PORT", "8002"))), + service_handler.ReflexHTTPServiceHandler, + ) server.serve_forever() - + + if __name__ == "__main__": main() diff --git a/packages/servers/reflex-cache/reflex_cache/nix_cache.py b/packages/servers/reflex-cache/reflex_cache/nix_cache.py index 2a2420d..92f7934 100644 --- a/packages/servers/reflex-cache/reflex_cache/nix_cache.py +++ b/packages/servers/reflex-cache/reflex_cache/nix_cache.py @@ -11,7 +11,13 @@ class NixCacheFetcher: @lru_cache(maxsize=32768) def __try_all_cached(self, method, path): - fn = requests.get if method == "get" else requests.head if method == "head" else Error("invalid method") + fn = ( + requests.get + if method == "get" + else requests.head + if method == "head" + else Exception("invalid method") + ) bestState = 404 @@ -24,7 +30,7 @@ class NixCacheFetcher: print(f" {rCache.status_code} - [{method}] {cache}{path}") if bestState == 200: - r = (bestState,rCache.content if method != "head" else False) + r = (bestState, rCache.content if method != "head" else False) if path.endswith(".narinfo"): return r else: @@ -34,7 +40,7 @@ class NixCacheFetcher: # HACK: lru_cache does not cache results if an exception occurs # since we don't want to cache empty query results, we make use of this behavior - raise Uncached((bestState,False)) + raise Uncached((bestState, False)) def try_all(self, method, path): try: diff --git a/packages/servers/reflex-cache/reflex_cache/service_handler.py b/packages/servers/reflex-cache/reflex_cache/service_handler.py index 80e0241..8d81113 100644 --- a/packages/servers/reflex-cache/reflex_cache/service_handler.py +++ b/packages/servers/reflex-cache/reflex_cache/service_handler.py @@ -1,3 +1,4 @@ +import base64 import re from concurrent.futures import ThreadPoolExecutor from http.server import BaseHTTPRequestHandler @@ -13,18 +14,20 @@ class ReflexHTTPServiceHandler(BaseHTTPRequestHandler): _workSet = set() _workSetLock = Lock() _executor_nar = ThreadPoolExecutor(8) - #_executor_narinfo = ThreadPoolExecutor(16) # for narinfo uploads - TODO - + # _executor_narinfo = ThreadPoolExecutor(16) # for narinfo uploads - TODO + _db = db.ReflexDB(util.envOr("CACHE_DIRECTORY", "/var/tmp")) - _nix = nix_cache.NixCacheFetcher(util.envOr("NIX_CACHES","https://cache.nixos.org").split(" ")) + _nix = nix_cache.NixCacheFetcher( + util.envOr("NIX_CACHES", "https://cache.nixos.org").split(" ") + ) _ipfs = ipfs.IPFSController(Multiaddr(util.envOrRaise("IPFS_API")), _nix, _db) - + def do_HEAD(self): if self.path.endswith(".narinfo"): print(f"NAR info request: {self.path}") - code, content = self._nix.try_all("head",self.path) + code, content = self._nix.try_all("head", self.path) self.send_response(code) self.end_headers() else: @@ -47,13 +50,17 @@ class ReflexHTTPServiceHandler(BaseHTTPRequestHandler): for (itemNar, itemFuture) in self._workSet: if itemNar == self.path: f = itemFuture - print(f"IPFS fetch task for {self.path} already being processed") + print( + f"IPFS fetch task for {self.path} already being processed" + ) found = True break if not found: print(f"Creating new IPFS fetch task for {self.path}") - f = self._executor_nar.submit(self._ipfs.ipfs_fetch_task, self.path) + f = self._executor_nar.submit( + self._ipfs.ipfs_fetch_task, self.path + ) self._workSet.add((self.path, f)) resultNar, code, resultHash = f.result() @@ -75,10 +82,12 @@ class ReflexHTTPServiceHandler(BaseHTTPRequestHandler): self.send_response(302) # not used for auth, but for defining a redirect target - auth = self.headers.get('Authorization') - if auth != None: + auth = self.headers.get("Authorization") + if auth: try: - decoded1 = base64.b64decode(auth.removeprefix("Basic ")).removesuffix(b":") + decoded1 = base64.b64decode( + auth.removeprefix("Basic ") + ).removesuffix(b":") if decoded1.isdigit(): redirect = f"http://127.0.0.1:{decoded1.decode('utf-8')}" else: @@ -88,21 +97,23 @@ class ReflexHTTPServiceHandler(BaseHTTPRequestHandler): else: redirect = "http://127.0.0.1:8080" - self.send_header('Location', f'{redirect}/ipfs/{resultHash}') - self.send_header('X-Ipfs-Path', f'/ipfs/{resultHash}') + self.send_header("Location", f"{redirect}/ipfs/{resultHash}") + self.send_header("X-Ipfs-Path", f"/ipfs/{resultHash}") self.end_headers() return elif self.path.endswith(".narinfo"): print(f"NAR info request: {self.path}") - code, content = self._nix.try_all("get",self.path) + code, content = self._nix.try_all("get", self.path) self.send_response(code) self.end_headers() if code == 200: self.wfile.write(content) - if match := re.search('URL: (nar/[a-z0-9]*\.nar.*)', content.decode("utf-8")): + if match := re.search( + "URL: (nar/[a-z0-9]*\\.nar.*)", content.decode("utf-8") + ): nar = f"/{match.group(1)}" - if self._db.get_path(nar) == None: + if not self._db.get_path(nar): with self._workSetLock: found = False for (itemNar, itemFuture) in self._workSet: @@ -112,7 +123,9 @@ class ReflexHTTPServiceHandler(BaseHTTPRequestHandler): if not found and len(self._workSet) < 8: print(f"Pre-flight: creating IPFS fetch task for {nar}") - f = self._executor_nar.submit(self._ipfs.ipfs_fetch_task, nar) + f = self._executor_nar.submit( + self._ipfs.ipfs_fetch_task, nar + ) self._workSet.add((nar, f)) return diff --git a/packages/servers/reflex-cache/reflex_cache/util.py b/packages/servers/reflex-cache/reflex_cache/util.py index 07cfa65..be2628d 100644 --- a/packages/servers/reflex-cache/reflex_cache/util.py +++ b/packages/servers/reflex-cache/reflex_cache/util.py @@ -6,18 +6,22 @@ from socketserver import ThreadingMixIn class Uncached(Exception): pass + class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): pass + class MissingEnvironmentVariableError(Exception): pass + def envOr(key, default): if key in environ: return environ[key] else: return default - + + def envOrRaise(key): if key in environ: return environ[key]