packages/reflex-cache: black + cleanup

This commit is contained in:
Max Headroom 2022-03-01 23:59:31 +01:00
parent e7ac9bb9b1
commit 6fdfd183d4
6 changed files with 67 additions and 35 deletions

View file

@ -10,13 +10,14 @@ class ReflexDB:
def __init__(self, cacheDir):
self.__cacheDir = cacheDir
self.__lock = Lock()
# initialize DB schema
with self.getcon() as (con, cur):
cur.execute("CREATE TABLE IF NOT EXISTS NarToIpfs (nar text primary key, ipfs text)")
cur.execute(
"CREATE TABLE IF NOT EXISTS NarToIpfs (nar text primary key, ipfs text)"
)
con.commit()
@contextlib.contextmanager
def getcon(self):
with self.__lock:
@ -30,7 +31,9 @@ class ReflexDB:
@lru_cache(maxsize=65536)
def __get_path_cached(self, narPath):
with self.getcon() as (con, cur):
for (nar, ipfs) in cur.execute("SELECT nar, ipfs FROM NarToIpfs WHERE nar=:nar", {"nar": narPath}):
for (nar, ipfs) in cur.execute(
"SELECT nar, ipfs FROM NarToIpfs WHERE nar=:nar", {"nar": narPath}
):
return ipfs
# HACK: lru_cache does not cache results if an exception occurs
# since we don't want to cache empty query results, we make use of this behavior
@ -44,5 +47,8 @@ class ReflexDB:
def set_path(self, narPath, ipfsPath):
with self.getcon() as (con, cur):
cur.execute("INSERT INTO NarToIpfs VALUES (:nar, :ipfs)", {"nar": narPath, "ipfs": ipfsPath})
cur.execute(
"INSERT INTO NarToIpfs VALUES (:nar, :ipfs)",
{"nar": narPath, "ipfs": ipfsPath},
)
con.commit()

View file

@ -3,8 +3,6 @@ from urllib.parse import quote_plus
import requests
import requests_unixsocket
from reflex_cache.util import envOrRaise
class IPFSController:
def __init__(self, apiAddress, nixCache, db):
@ -14,11 +12,13 @@ class IPFSController:
def ipfs_fetch_task(self, nar):
print(f"Downloading NAR: {nar}")
code, content = self.__nix.try_all("get",nar)
code, content = self.__nix.try_all("get", nar)
if code == 200:
upload = {'file': ('FILE',content,'application/octet-stream')}
upload = {"file": ("FILE", content, "application/octet-stream")}
try:
rIpfs = requests_unixsocket.post(f'{self.__addr}/api/v0/add?pin=false&quieter=true', files=upload)
rIpfs = requests_unixsocket.post(
f"{self.__addr}/api/v0/add?pin=false&quieter=true", files=upload
)
hash = rIpfs.json()["Hash"]
print(f"Mapped: {nar} -> /ipfs/{hash}")
self.__db.set_path(nar, hash)

View file

@ -1,16 +1,19 @@
from reflex_cache import db, service_handler, util
from reflex_cache import service_handler, util
CACHES = [
"https://cache.privatevoid.net",
"https://cache.nixos.org",
"https://max.cachix.org"
"https://max.cachix.org",
]
def main():
server = util.ThreadingHTTPServer(('127.0.0.1',int(util.envOr("REFLEX_PORT", "8002"))), service_handler.ReflexHTTPServiceHandler)
server = util.ThreadingHTTPServer(
("127.0.0.1", int(util.envOr("REFLEX_PORT", "8002"))),
service_handler.ReflexHTTPServiceHandler,
)
server.serve_forever()
if __name__ == "__main__":
main()

View file

@ -11,7 +11,13 @@ class NixCacheFetcher:
@lru_cache(maxsize=32768)
def __try_all_cached(self, method, path):
fn = requests.get if method == "get" else requests.head if method == "head" else Error("invalid method")
fn = (
requests.get
if method == "get"
else requests.head
if method == "head"
else Exception("invalid method")
)
bestState = 404
@ -24,7 +30,7 @@ class NixCacheFetcher:
print(f" {rCache.status_code} - [{method}] {cache}{path}")
if bestState == 200:
r = (bestState,rCache.content if method != "head" else False)
r = (bestState, rCache.content if method != "head" else False)
if path.endswith(".narinfo"):
return r
else:
@ -34,7 +40,7 @@ class NixCacheFetcher:
# HACK: lru_cache does not cache results if an exception occurs
# since we don't want to cache empty query results, we make use of this behavior
raise Uncached((bestState,False))
raise Uncached((bestState, False))
def try_all(self, method, path):
try:

View file

@ -1,3 +1,4 @@
import base64
import re
from concurrent.futures import ThreadPoolExecutor
from http.server import BaseHTTPRequestHandler
@ -13,18 +14,20 @@ class ReflexHTTPServiceHandler(BaseHTTPRequestHandler):
_workSet = set()
_workSetLock = Lock()
_executor_nar = ThreadPoolExecutor(8)
#_executor_narinfo = ThreadPoolExecutor(16) # for narinfo uploads - TODO
# _executor_narinfo = ThreadPoolExecutor(16) # for narinfo uploads - TODO
_db = db.ReflexDB(util.envOr("CACHE_DIRECTORY", "/var/tmp"))
_nix = nix_cache.NixCacheFetcher(util.envOr("NIX_CACHES","https://cache.nixos.org").split(" "))
_nix = nix_cache.NixCacheFetcher(
util.envOr("NIX_CACHES", "https://cache.nixos.org").split(" ")
)
_ipfs = ipfs.IPFSController(Multiaddr(util.envOrRaise("IPFS_API")), _nix, _db)
def do_HEAD(self):
if self.path.endswith(".narinfo"):
print(f"NAR info request: {self.path}")
code, content = self._nix.try_all("head",self.path)
code, content = self._nix.try_all("head", self.path)
self.send_response(code)
self.end_headers()
else:
@ -47,13 +50,17 @@ class ReflexHTTPServiceHandler(BaseHTTPRequestHandler):
for (itemNar, itemFuture) in self._workSet:
if itemNar == self.path:
f = itemFuture
print(f"IPFS fetch task for {self.path} already being processed")
print(
f"IPFS fetch task for {self.path} already being processed"
)
found = True
break
if not found:
print(f"Creating new IPFS fetch task for {self.path}")
f = self._executor_nar.submit(self._ipfs.ipfs_fetch_task, self.path)
f = self._executor_nar.submit(
self._ipfs.ipfs_fetch_task, self.path
)
self._workSet.add((self.path, f))
resultNar, code, resultHash = f.result()
@ -75,10 +82,12 @@ class ReflexHTTPServiceHandler(BaseHTTPRequestHandler):
self.send_response(302)
# not used for auth, but for defining a redirect target
auth = self.headers.get('Authorization')
if auth != None:
auth = self.headers.get("Authorization")
if auth:
try:
decoded1 = base64.b64decode(auth.removeprefix("Basic ")).removesuffix(b":")
decoded1 = base64.b64decode(
auth.removeprefix("Basic ")
).removesuffix(b":")
if decoded1.isdigit():
redirect = f"http://127.0.0.1:{decoded1.decode('utf-8')}"
else:
@ -88,21 +97,23 @@ class ReflexHTTPServiceHandler(BaseHTTPRequestHandler):
else:
redirect = "http://127.0.0.1:8080"
self.send_header('Location', f'{redirect}/ipfs/{resultHash}')
self.send_header('X-Ipfs-Path', f'/ipfs/{resultHash}')
self.send_header("Location", f"{redirect}/ipfs/{resultHash}")
self.send_header("X-Ipfs-Path", f"/ipfs/{resultHash}")
self.end_headers()
return
elif self.path.endswith(".narinfo"):
print(f"NAR info request: {self.path}")
code, content = self._nix.try_all("get",self.path)
code, content = self._nix.try_all("get", self.path)
self.send_response(code)
self.end_headers()
if code == 200:
self.wfile.write(content)
if match := re.search('URL: (nar/[a-z0-9]*\.nar.*)', content.decode("utf-8")):
if match := re.search(
"URL: (nar/[a-z0-9]*\\.nar.*)", content.decode("utf-8")
):
nar = f"/{match.group(1)}"
if self._db.get_path(nar) == None:
if not self._db.get_path(nar):
with self._workSetLock:
found = False
for (itemNar, itemFuture) in self._workSet:
@ -112,7 +123,9 @@ class ReflexHTTPServiceHandler(BaseHTTPRequestHandler):
if not found and len(self._workSet) < 8:
print(f"Pre-flight: creating IPFS fetch task for {nar}")
f = self._executor_nar.submit(self._ipfs.ipfs_fetch_task, nar)
f = self._executor_nar.submit(
self._ipfs.ipfs_fetch_task, nar
)
self._workSet.add((nar, f))
return

View file

@ -6,18 +6,22 @@ from socketserver import ThreadingMixIn
class Uncached(Exception):
pass
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
pass
class MissingEnvironmentVariableError(Exception):
pass
def envOr(key, default):
if key in environ:
return environ[key]
else:
return default
def envOrRaise(key):
if key in environ:
return environ[key]