diff --git a/packages/packages.nix b/packages/packages.nix index 3cae71a..64b42a9 100644 --- a/packages/packages.nix +++ b/packages/packages.nix @@ -30,6 +30,11 @@ in minio-console = pkgs.callPackage ./servers/minio-console { }; privatevoid-smart-card-ca-bundle = pkgs.callPackage ./data/privatevoid-smart-card-certificate-authority-bundle.nix { }; + + reflex-cache = poetry2nix.mkPoetryApplication { + projectDir = ./servers/reflex-cache; + meta.mainProgram = "reflex"; + }; sips = pkgs.callPackage ./servers/sips { }; } diff --git a/packages/servers/reflex-cache/.gitignore b/packages/servers/reflex-cache/.gitignore new file mode 100644 index 0000000..940ad72 --- /dev/null +++ b/packages/servers/reflex-cache/.gitignore @@ -0,0 +1,3 @@ +__pycache__ +dist/ +result/ \ No newline at end of file diff --git a/packages/servers/reflex-cache/poetry.lock b/packages/servers/reflex-cache/poetry.lock new file mode 100644 index 0000000..2877137 --- /dev/null +++ b/packages/servers/reflex-cache/poetry.lock @@ -0,0 +1,206 @@ +[[package]] +name = "base58" +version = "2.1.1" +description = "Base58 and Base58Check implementation." +category = "main" +optional = false +python-versions = ">=3.5" + +[package.extras] +tests = ["mypy", "PyHamcrest (>=2.0.2)", "pytest (>=4.6)", "pytest-benchmark", "pytest-cov", "pytest-flake8"] + +[[package]] +name = "certifi" +version = "2021.10.8" +description = "Python package for providing Mozilla's CA Bundle." +category = "main" +optional = false +python-versions = "*" + +[[package]] +name = "charset-normalizer" +version = "2.0.12" +description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet." +category = "main" +optional = false +python-versions = ">=3.5.0" + +[package.extras] +unicode_backport = ["unicodedata2"] + +[[package]] +name = "idna" +version = "3.3" +description = "Internationalized Domain Names in Applications (IDNA)" +category = "main" +optional = false +python-versions = ">=3.5" + +[[package]] +name = "morphys" +version = "1.0" +description = "Smart conversions between unicode and bytes types for common cases" +category = "main" +optional = false +python-versions = "*" + +[[package]] +name = "multiaddr" +version = "0.0.9" +description = "Python implementation of jbenet's multiaddr" +category = "main" +optional = false +python-versions = ">=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*" + +[package.dependencies] +base58 = "*" +netaddr = "*" +six = "*" +varint = "*" + +[[package]] +name = "netaddr" +version = "0.8.0" +description = "A network address manipulation library for Python" +category = "main" +optional = false +python-versions = "*" + +[[package]] +name = "py-multibase" +version = "1.0.3" +description = "Multibase implementation for Python" +category = "main" +optional = false +python-versions = "*" + +[package.dependencies] +morphys = ">=1.0,<2.0" +python-baseconv = ">=1.2.0,<2.0" +six = ">=1.10.0,<2.0" + +[[package]] +name = "python-baseconv" +version = "1.2.2" +description = "Convert numbers from base 10 integers to base X strings and back again." +category = "main" +optional = false +python-versions = "*" + +[[package]] +name = "requests" +version = "2.27.1" +description = "Python HTTP for Humans." +category = "main" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*" + +[package.dependencies] +certifi = ">=2017.4.17" +charset-normalizer = {version = ">=2.0.0,<2.1.0", markers = "python_version >= \"3\""} +idna = {version = ">=2.5,<4", markers = "python_version >= \"3\""} +urllib3 = ">=1.21.1,<1.27" + +[package.extras] +socks = ["PySocks (>=1.5.6,!=1.5.7)", "win-inet-pton"] +use_chardet_on_py3 = ["chardet (>=3.0.2,<5)"] + +[[package]] +name = "requests-unixsocket" +version = "0.3.0" +description = "Use requests to talk HTTP via a UNIX domain socket" +category = "main" +optional = false +python-versions = "*" + +[package.dependencies] +requests = ">=1.1" + +[[package]] +name = "six" +version = "1.16.0" +description = "Python 2 and 3 compatibility utilities" +category = "main" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*" + +[[package]] +name = "urllib3" +version = "1.26.8" +description = "HTTP library with thread-safe connection pooling, file post, and more." +category = "main" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4" + +[package.extras] +brotli = ["brotlipy (>=0.6.0)"] +secure = ["pyOpenSSL (>=0.14)", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "certifi", "ipaddress"] +socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"] + +[[package]] +name = "varint" +version = "1.0.2" +description = "Simple python varint implementation" +category = "main" +optional = false +python-versions = "*" + +[metadata] +lock-version = "1.1" +python-versions = "^3.9" +content-hash = "a824fa8d090c6650d61de20c861bbab58ba3c4b47f8853f44daa778c6bc480c9" + +[metadata.files] +base58 = [ + {file = "base58-2.1.1-py3-none-any.whl", hash = "sha256:11a36f4d3ce51dfc1043f3218591ac4eb1ceb172919cebe05b52a5bcc8d245c2"}, + {file = "base58-2.1.1.tar.gz", hash = "sha256:c5d0cb3f5b6e81e8e35da5754388ddcc6d0d14b6c6a132cb93d69ed580a7278c"}, +] +certifi = [ + {file = "certifi-2021.10.8-py2.py3-none-any.whl", hash = "sha256:d62a0163eb4c2344ac042ab2bdf75399a71a2d8c7d47eac2e2ee91b9d6339569"}, + {file = "certifi-2021.10.8.tar.gz", hash = "sha256:78884e7c1d4b00ce3cea67b44566851c4343c120abd683433ce934a68ea58872"}, +] +charset-normalizer = [ + {file = "charset-normalizer-2.0.12.tar.gz", hash = "sha256:2857e29ff0d34db842cd7ca3230549d1a697f96ee6d3fb071cfa6c7393832597"}, + {file = "charset_normalizer-2.0.12-py3-none-any.whl", hash = "sha256:6881edbebdb17b39b4eaaa821b438bf6eddffb4468cf344f09f89def34a8b1df"}, +] +idna = [ + {file = "idna-3.3-py3-none-any.whl", hash = "sha256:84d9dd047ffa80596e0f246e2eab0b391788b0503584e8945f2368256d2735ff"}, + {file = "idna-3.3.tar.gz", hash = "sha256:9d643ff0a55b762d5cdb124b8eaa99c66322e2157b69160bc32796e824360e6d"}, +] +morphys = [ + {file = "morphys-1.0-py2.py3-none-any.whl", hash = "sha256:76d6dbaa4d65f597e59d332c81da786d83e4669387b9b2a750cfec74e7beec20"}, +] +multiaddr = [ + {file = "multiaddr-0.0.9-py2.py3-none-any.whl", hash = "sha256:5c0f862cbcf19aada2a899f80ef896ddb2e85614e0c8f04dd287c06c69dac95b"}, + {file = "multiaddr-0.0.9.tar.gz", hash = "sha256:30b2695189edc3d5b90f1c303abb8f02d963a3a4edf2e7178b975eb417ab0ecf"}, +] +netaddr = [ + {file = "netaddr-0.8.0-py2.py3-none-any.whl", hash = "sha256:9666d0232c32d2656e5e5f8d735f58fd6c7457ce52fc21c98d45f2af78f990ac"}, + {file = "netaddr-0.8.0.tar.gz", hash = "sha256:d6cc57c7a07b1d9d2e917aa8b36ae8ce61c35ba3fcd1b83ca31c5a0ee2b5a243"}, +] +py-multibase = [ + {file = "py-multibase-1.0.3.tar.gz", hash = "sha256:d28a20efcbb61eec28f55827a0bf329c7cea80fffd933aecaea6ae8431267fe4"}, + {file = "py_multibase-1.0.3-py2.py3-none-any.whl", hash = "sha256:2677c1fafcc0ae15ddb9c7f444c5becc2530b3889124fd4fa2959ddfefb8c15b"}, +] +python-baseconv = [ + {file = "python-baseconv-1.2.2.tar.gz", hash = "sha256:0539f8bd0464013b05ad62e0a1673f0ac9086c76b43ebf9f833053527cd9931b"}, +] +requests = [ + {file = "requests-2.27.1-py2.py3-none-any.whl", hash = "sha256:f22fa1e554c9ddfd16e6e41ac79759e17be9e492b3587efa038054674760e72d"}, + {file = "requests-2.27.1.tar.gz", hash = "sha256:68d7c56fd5a8999887728ef304a6d12edc7be74f1cfa47714fc8b414525c9a61"}, +] +requests-unixsocket = [ + {file = "requests-unixsocket-0.3.0.tar.gz", hash = "sha256:28304283ea9357d45fff58ad5b11e47708cfbf5806817aa59b2a363228ee971e"}, + {file = "requests_unixsocket-0.3.0-py2.py3-none-any.whl", hash = "sha256:c685c680f0809e1b2955339b1e5afc3c0022b3066f4f7eb343f43a6065fc0e5d"}, +] +six = [ + {file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"}, + {file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"}, +] +urllib3 = [ + {file = "urllib3-1.26.8-py2.py3-none-any.whl", hash = "sha256:000ca7f471a233c2251c6c7023ee85305721bfdf18621ebff4fd17a8653427ed"}, + {file = "urllib3-1.26.8.tar.gz", hash = "sha256:0e7c33d9a63e7ddfcb86780aac87befc2fbddf46c58dbb487e0855f7ceec283c"}, +] +varint = [ + {file = "varint-1.0.2.tar.gz", hash = "sha256:a6ecc02377ac5ee9d65a6a8ad45c9ff1dac8ccee19400a5950fb51d594214ca5"}, +] diff --git a/packages/servers/reflex-cache/pyproject.toml b/packages/servers/reflex-cache/pyproject.toml new file mode 100644 index 0000000..f2e72fc --- /dev/null +++ b/packages/servers/reflex-cache/pyproject.toml @@ -0,0 +1,22 @@ +[tool.poetry] +name = "reflex-cache" +version = "0.1.0" +description = "Controller for Nix binary caches on IPFS" +authors = ["Max "] +license = "AGPL-3.0" + +[tool.poetry.dependencies] +python = "^3.9" +requests = "^2.27.1" +requests-unixsocket = "^0.3.0" +multiaddr = "^0.0.9" +py-multibase = "^1.0.3" + +[tool.poetry.dev-dependencies] + +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api" + +[tool.poetry.scripts] +reflex = "reflex_cache.main:main" diff --git a/packages/servers/reflex-cache/reflex_cache/db.py b/packages/servers/reflex-cache/reflex_cache/db.py new file mode 100644 index 0000000..6911e0b --- /dev/null +++ b/packages/servers/reflex-cache/reflex_cache/db.py @@ -0,0 +1,48 @@ +import contextlib +import sqlite3 +from functools import lru_cache +from threading import Lock + +from reflex_cache import util + + +class ReflexDB: + def __init__(self, cacheDir): + self.__cacheDir = cacheDir + self.__lock = Lock() + + # initialize DB schema + with self.getcon() as (con, cur): + cur.execute("CREATE TABLE IF NOT EXISTS NarToIpfs (nar text primary key, ipfs text)") + con.commit() + + + @contextlib.contextmanager + def getcon(self): + with self.__lock: + con = sqlite3.connect(f"{self.__cacheDir}/nix-ipfs-cache.db") + cur = con.cursor() + try: + yield con, cur + finally: + con.close() + + @lru_cache(maxsize=65536) + def __get_path_cached(self, narPath): + with self.getcon() as (con, cur): + for (nar, ipfs) in cur.execute("SELECT nar, ipfs FROM NarToIpfs WHERE nar=:nar", {"nar": narPath}): + return ipfs + # HACK: lru_cache does not cache results if an exception occurs + # since we don't want to cache empty query results, we make use of this behavior + raise util.Uncached + + def get_path(self, narPath): + try: + return self.__get_path_cached(narPath) + except util.Uncached: + return None + + def set_path(self, narPath, ipfsPath): + with self.getcon() as (con, cur): + cur.execute("INSERT INTO NarToIpfs VALUES (:nar, :ipfs)", {"nar": narPath, "ipfs": ipfsPath}) + con.commit() diff --git a/packages/servers/reflex-cache/reflex_cache/ipfs.py b/packages/servers/reflex-cache/reflex_cache/ipfs.py new file mode 100644 index 0000000..48f5a3e --- /dev/null +++ b/packages/servers/reflex-cache/reflex_cache/ipfs.py @@ -0,0 +1,30 @@ +from urllib.parse import quote_plus + +import requests +import requests_unixsocket + +from reflex_cache.util import envOrRaise + + +class IPFSController: + def __init__(self, apiAddress, nixCache, db): + self.__addr = f'http+unix://{quote_plus(apiAddress.get("unix"))}' + self.__nix = nixCache + self.__db = db + + def ipfs_fetch_task(self, nar): + print(f"Downloading NAR: {nar}") + code, content = self.__nix.try_all("get",nar) + if code == 200: + upload = {'file': ('FILE',content,'application/octet-stream')} + try: + rIpfs = requests_unixsocket.post(f'{self.__addr}/api/v0/add?pin=false&quieter=true', files=upload) + hash = rIpfs.json()["Hash"] + print(f"Mapped: {nar} -> /ipfs/{hash}") + self.__db.set_path(nar, hash) + return (nar, 200, hash) + except requests.ConnectionError as e: + print(e) + return (nar, 502, False) + else: + return (nar, code, False) diff --git a/packages/servers/reflex-cache/reflex_cache/main.py b/packages/servers/reflex-cache/reflex_cache/main.py new file mode 100644 index 0000000..36973b5 --- /dev/null +++ b/packages/servers/reflex-cache/reflex_cache/main.py @@ -0,0 +1,16 @@ +from reflex_cache import db, service_handler, util + +CACHES = [ + "https://cache.privatevoid.net", + "https://cache.nixos.org", + "https://max.cachix.org" +] + + + +def main(): + server = util.ThreadingHTTPServer(('127.0.0.1',int(util.envOr("REFLEX_PORT", "8002"))), service_handler.ReflexHTTPServiceHandler) + server.serve_forever() + +if __name__ == "__main__": + main() diff --git a/packages/servers/reflex-cache/reflex_cache/nix_cache.py b/packages/servers/reflex-cache/reflex_cache/nix_cache.py new file mode 100644 index 0000000..2a2420d --- /dev/null +++ b/packages/servers/reflex-cache/reflex_cache/nix_cache.py @@ -0,0 +1,43 @@ +from functools import lru_cache + +import requests + +from reflex_cache.util import Uncached + + +class NixCacheFetcher: + def __init__(self, caches): + self.__caches = caches + + @lru_cache(maxsize=32768) + def __try_all_cached(self, method, path): + fn = requests.get if method == "get" else requests.head if method == "head" else Error("invalid method") + + bestState = 404 + + print(f" fetching [{method}] from any cache {path}") + for cache in self.__caches: + try: + rCache = fn(f"{cache}{path}") + if rCache.status_code < bestState: + bestState = rCache.status_code + + print(f" {rCache.status_code} - [{method}] {cache}{path}") + if bestState == 200: + r = (bestState,rCache.content if method != "head" else False) + if path.endswith(".narinfo"): + return r + else: + raise Uncached(r) + except requests.ConnectionError as e: + print(e) + + # HACK: lru_cache does not cache results if an exception occurs + # since we don't want to cache empty query results, we make use of this behavior + raise Uncached((bestState,False)) + + def try_all(self, method, path): + try: + return self.__try_all_cached(method, path) + except Uncached as r: + return r.args[0] diff --git a/packages/servers/reflex-cache/reflex_cache/service_handler.py b/packages/servers/reflex-cache/reflex_cache/service_handler.py new file mode 100644 index 0000000..80e0241 --- /dev/null +++ b/packages/servers/reflex-cache/reflex_cache/service_handler.py @@ -0,0 +1,125 @@ +import re +from concurrent.futures import ThreadPoolExecutor +from http.server import BaseHTTPRequestHandler +from threading import Lock + +import multibase +from multiaddr import Multiaddr + +from reflex_cache import db, ipfs, nix_cache, util + + +class ReflexHTTPServiceHandler(BaseHTTPRequestHandler): + _workSet = set() + _workSetLock = Lock() + _executor_nar = ThreadPoolExecutor(8) + #_executor_narinfo = ThreadPoolExecutor(16) # for narinfo uploads - TODO + + _db = db.ReflexDB(util.envOr("CACHE_DIRECTORY", "/var/tmp")) + + _nix = nix_cache.NixCacheFetcher(util.envOr("NIX_CACHES","https://cache.nixos.org").split(" ")) + + _ipfs = ipfs.IPFSController(Multiaddr(util.envOrRaise("IPFS_API")), _nix, _db) + + def do_HEAD(self): + if self.path.endswith(".narinfo"): + print(f"NAR info request: {self.path}") + code, content = self._nix.try_all("head",self.path) + self.send_response(code) + self.end_headers() + else: + self.send_response(404) + self.end_headers() + + def do_GET(self): + if self.path.startswith("/nix-cache-info"): + self.send_response(200) + self.end_headers() + self.wfile.write(b"StoreDir: /nix/store\n") + return + + elif self.path.startswith("/nar/"): + resultHash = self._db.get_path(self.path) + + if resultHash == None: + with self._workSetLock: + found = False + for (itemNar, itemFuture) in self._workSet: + if itemNar == self.path: + f = itemFuture + print(f"IPFS fetch task for {self.path} already being processed") + found = True + break + + if not found: + print(f"Creating new IPFS fetch task for {self.path}") + f = self._executor_nar.submit(self._ipfs.ipfs_fetch_task, self.path) + self._workSet.add((self.path, f)) + + resultNar, code, resultHash = f.result() + + with self._workSetLock: + try: + self._workSet.remove((self.path, f)) + except KeyError: + # already removed + pass + else: + code = 200 + + if code != 200: + self.send_response(code) + self.end_headers() + return + + self.send_response(302) + + # not used for auth, but for defining a redirect target + auth = self.headers.get('Authorization') + if auth != None: + try: + decoded1 = base64.b64decode(auth.removeprefix("Basic ")).removesuffix(b":") + if decoded1.isdigit(): + redirect = f"http://127.0.0.1:{decoded1.decode('utf-8')}" + else: + redirect = multibase.decode(decoded1).decode("utf-8") + except Exception: + redirect = "http://127.0.0.1:8080" + else: + redirect = "http://127.0.0.1:8080" + + self.send_header('Location', f'{redirect}/ipfs/{resultHash}') + self.send_header('X-Ipfs-Path', f'/ipfs/{resultHash}') + self.end_headers() + return + + elif self.path.endswith(".narinfo"): + print(f"NAR info request: {self.path}") + code, content = self._nix.try_all("get",self.path) + self.send_response(code) + self.end_headers() + if code == 200: + self.wfile.write(content) + if match := re.search('URL: (nar/[a-z0-9]*\.nar.*)', content.decode("utf-8")): + nar = f"/{match.group(1)}" + if self._db.get_path(nar) == None: + with self._workSetLock: + found = False + for (itemNar, itemFuture) in self._workSet: + if itemNar == nar: + found = True + break + + if not found and len(self._workSet) < 8: + print(f"Pre-flight: creating IPFS fetch task for {nar}") + f = self._executor_nar.submit(self._ipfs.ipfs_fetch_task, nar) + self._workSet.add((nar, f)) + return + + else: + code = 404 + + if code > 299: + self.send_response(code) + self.end_headers() + return diff --git a/packages/servers/reflex-cache/reflex_cache/util.py b/packages/servers/reflex-cache/reflex_cache/util.py new file mode 100644 index 0000000..07cfa65 --- /dev/null +++ b/packages/servers/reflex-cache/reflex_cache/util.py @@ -0,0 +1,25 @@ +from http.server import HTTPServer +from os import environ +from socketserver import ThreadingMixIn + + +class Uncached(Exception): + pass + +class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): + pass + +class MissingEnvironmentVariableError(Exception): + pass + +def envOr(key, default): + if key in environ: + return environ[key] + else: + return default + +def envOrRaise(key): + if key in environ: + return environ[key] + else: + raise MissingEnvironmentVariableError(key)