depot/packages/servers/reflex-cache/reflex_cache/service_handler.py

160 lines
6 KiB
Python
Raw Permalink Normal View History

2022-03-02 00:59:31 +02:00
import base64
2022-02-26 01:48:16 +02:00
import re
from concurrent.futures import ThreadPoolExecutor
from http.server import BaseHTTPRequestHandler
from threading import Lock
import multibase
from multiaddr import Multiaddr
from reflex_cache import db, ipfs, nix_cache, util
class ReflexHTTPServiceHandler(BaseHTTPRequestHandler):
_workSet = set()
_workSetLock = Lock()
_executor_nar = ThreadPoolExecutor(8)
2022-03-02 00:59:31 +02:00
# _executor_narinfo = ThreadPoolExecutor(16) # for narinfo uploads - TODO
2022-02-26 01:48:16 +02:00
_db = db.ReflexDB(util.envOr("CACHE_DIRECTORY", "/var/tmp"))
2022-03-02 00:59:31 +02:00
_nix = nix_cache.NixCacheFetcher(
util.envOr("NIX_CACHES", "https://cache.nixos.org").split(" ")
)
2022-02-26 01:48:16 +02:00
_ipfs = ipfs.IPFSController(
Multiaddr(util.envOrRaise("IPFS_API")),
Multiaddr(util.envOrRaise("IPFS_CLUSTER_API")),
_nix,
_db)
2022-03-02 00:59:31 +02:00
2022-02-26 01:48:16 +02:00
def do_HEAD(self):
if self.path.endswith(".narinfo"):
print(f"NAR info request: {self.path}")
code, _, content = self._nix.try_all("head", self.path)
2022-02-26 01:48:16 +02:00
self.send_response(code)
self.end_headers()
else:
self.send_response(404)
self.end_headers()
def do_GET(self):
if self.path.startswith("/nix-cache-info"):
self.send_response(200)
self.end_headers()
self.wfile.write(b"StoreDir: /nix/store\n")
return
elif self.path.startswith("/nar/"):
resultHash = self._db.get_path(self.path)
if resultHash == None:
code, cache, content = self._nix.try_all("get", self.path)
2023-11-10 01:31:41 +02:00
if code != 200:
self.send_response(404)
self.end_headers()
return
2022-02-26 01:48:16 +02:00
with self._workSetLock:
found = False
for (itemNar, itemFuture) in self._workSet:
if itemNar == self.path:
f = itemFuture
2022-03-02 00:59:31 +02:00
print(
f"IPFS fetch task for {self.path} already being processed"
)
2022-02-26 01:48:16 +02:00
found = True
break
if not found:
print(f"Creating new IPFS fetch task for {self.path}")
def cb():
with self._workSetLock:
try:
self._workSet.remove((self.path, f))
except KeyError:
# already removed
pass
2022-03-02 00:59:31 +02:00
f = self._executor_nar.submit(
self._ipfs.ipfs_fetch_task, cb, self.path, cache, content
2022-03-02 00:59:31 +02:00
)
2022-02-26 01:48:16 +02:00
self._workSet.add((self.path, f))
2023-11-10 01:31:41 +02:00
_, code, resultHash = f.result()
else:
code = 200
if code != 200:
self.send_response(code)
self.end_headers()
2022-02-26 01:48:16 +02:00
return
self.send_response(302)
# not used for auth, but for defining a redirect target
2022-03-02 00:59:31 +02:00
auth = self.headers.get("Authorization")
if auth:
2022-02-26 01:48:16 +02:00
try:
2022-03-02 00:59:31 +02:00
decoded1 = base64.b64decode(
auth.removeprefix("Basic ")
).removesuffix(b":")
2022-02-26 01:48:16 +02:00
if decoded1.isdigit():
redirect = f"http://127.0.0.1:{decoded1.decode('utf-8')}"
else:
redirect = multibase.decode(decoded1).decode("utf-8")
except Exception:
redirect = "http://127.0.0.1:8080"
else:
redirect = "http://127.0.0.1:8080"
2022-03-02 00:59:31 +02:00
self.send_header("Location", f"{redirect}/ipfs/{resultHash}")
self.send_header("X-Ipfs-Path", f"/ipfs/{resultHash}")
2022-02-26 01:48:16 +02:00
self.end_headers()
return
elif self.path.endswith(".narinfo"):
print(f"NAR info request: {self.path}")
code, _, content = self._nix.try_all("get", self.path)
2022-02-26 01:48:16 +02:00
self.send_response(code)
self.end_headers()
if code == 200:
2022-03-02 00:59:31 +02:00
if match := re.search(
"URL: (nar/[a-z0-9]*\\.nar.*)", content.decode("utf-8")
):
2022-02-26 01:48:16 +02:00
nar = f"/{match.group(1)}"
2023-11-10 01:31:41 +02:00
# we decompress xz, so tell Nix to except an uncompressed NAR
if nar.endswith(".xz"):
self.wfile.write(content.replace(b"Compression: xz\n", b"Compression: none\n"))
else:
self.wfile.write(content)
2022-03-02 00:59:31 +02:00
if not self._db.get_path(nar):
2022-02-26 01:48:16 +02:00
with self._workSetLock:
found = False
for (itemNar, itemFuture) in self._workSet:
if itemNar == nar:
found = True
break
if not found and len(self._workSet) < 8:
print(f"Pre-flight: creating IPFS fetch task for {nar}")
def cb():
with self._workSetLock:
try:
self._workSet.remove((nar, f))
except KeyError:
# already removed
pass
2022-03-02 00:59:31 +02:00
f = self._executor_nar.submit(
self._ipfs.ipfs_fetch_task, cb, nar
2022-03-02 00:59:31 +02:00
)
2022-02-26 01:48:16 +02:00
self._workSet.add((nar, f))
2023-11-10 01:31:41 +02:00
else:
self.wfile.write(content)
2022-02-26 01:48:16 +02:00
return
else:
code = 404
if code > 299:
self.send_response(code)
self.end_headers()
return