diff --git a/packages/servers/reflex-cache/reflex_cache/ipfs.py b/packages/servers/reflex-cache/reflex_cache/ipfs.py index 20ce2ba..af439cb 100644 --- a/packages/servers/reflex-cache/reflex_cache/ipfs.py +++ b/packages/servers/reflex-cache/reflex_cache/ipfs.py @@ -10,9 +10,9 @@ class IPFSController: self.__nix = nixCache self.__db = db - def ipfs_fetch_task(self, nar): + def ipfs_fetch_task(self, callback, nar, hint=None): print(f"Downloading NAR: {nar}") - code, content = self.__nix.try_all("get", nar) + code, _, content = self.__nix.try_all("get", nar, hint) if code == 200: upload = {"file": ("FILE", content, "application/octet-stream")} try: @@ -22,9 +22,12 @@ class IPFSController: hash = rIpfs.json()["Hash"] print(f"Mapped: {nar} -> /ipfs/{hash}") self.__db.set_path(nar, hash) + callback() return (nar, 200, hash) except requests.ConnectionError as e: print(e) + callback() return (nar, 502, False) else: + callback() return (nar, code, False) diff --git a/packages/servers/reflex-cache/reflex_cache/nix_cache.py b/packages/servers/reflex-cache/reflex_cache/nix_cache.py index 92f7934..7be6d62 100644 --- a/packages/servers/reflex-cache/reflex_cache/nix_cache.py +++ b/packages/servers/reflex-cache/reflex_cache/nix_cache.py @@ -10,7 +10,7 @@ class NixCacheFetcher: self.__caches = caches @lru_cache(maxsize=32768) - def __try_all_cached(self, method, path): + def __try_all_cached(self, method, path, hint): fn = ( requests.get if method == "get" @@ -22,7 +22,13 @@ class NixCacheFetcher: bestState = 404 print(f" fetching [{method}] from any cache {path}") - for cache in self.__caches: + if hint != None: + caches = [] + caches.append(hint) + caches.extend(self.__caches) + else: + caches = self.__caches + for cache in caches: try: rCache = fn(f"{cache}{path}") if rCache.status_code < bestState: @@ -30,7 +36,7 @@ class NixCacheFetcher: print(f" {rCache.status_code} - [{method}] {cache}{path}") if bestState == 200: - r = (bestState, rCache.content if method != "head" else False) + r = (bestState, cache, rCache.content if method != "head" else False) if path.endswith(".narinfo"): return r else: @@ -40,10 +46,10 @@ class NixCacheFetcher: # HACK: lru_cache does not cache results if an exception occurs # since we don't want to cache empty query results, we make use of this behavior - raise Uncached((bestState, False)) + raise Uncached((bestState, None, False)) - def try_all(self, method, path): + def try_all(self, method, path, hint=None): try: - return self.__try_all_cached(method, path) + return self.__try_all_cached(method, path, hint) except Uncached as r: return r.args[0] diff --git a/packages/servers/reflex-cache/reflex_cache/service_handler.py b/packages/servers/reflex-cache/reflex_cache/service_handler.py index 8d81113..1a848e5 100644 --- a/packages/servers/reflex-cache/reflex_cache/service_handler.py +++ b/packages/servers/reflex-cache/reflex_cache/service_handler.py @@ -27,7 +27,7 @@ class ReflexHTTPServiceHandler(BaseHTTPRequestHandler): def do_HEAD(self): if self.path.endswith(".narinfo"): print(f"NAR info request: {self.path}") - code, content = self._nix.try_all("head", self.path) + code, _, content = self._nix.try_all("head", self.path) self.send_response(code) self.end_headers() else: @@ -45,6 +45,15 @@ class ReflexHTTPServiceHandler(BaseHTTPRequestHandler): resultHash = self._db.get_path(self.path) if resultHash == None: + code, cache, _ = self._nix.try_all("head", self.path) + if code == 200: + self.send_response(302) + self.send_header("Location", f"{cache}{self.path}") + self.end_headers() + else: + self.send_response(404) + self.end_headers() + return with self._workSetLock: found = False for (itemNar, itemFuture) in self._workSet: @@ -58,25 +67,17 @@ class ReflexHTTPServiceHandler(BaseHTTPRequestHandler): if not found: print(f"Creating new IPFS fetch task for {self.path}") + def cb(): + with self._workSetLock: + try: + self._workSet.remove((self.path, f)) + except KeyError: + # already removed + pass f = self._executor_nar.submit( - self._ipfs.ipfs_fetch_task, self.path + self._ipfs.ipfs_fetch_task, cb, self.path, cache ) self._workSet.add((self.path, f)) - - resultNar, code, resultHash = f.result() - - with self._workSetLock: - try: - self._workSet.remove((self.path, f)) - except KeyError: - # already removed - pass - else: - code = 200 - - if code != 200: - self.send_response(code) - self.end_headers() return self.send_response(302) @@ -104,7 +105,7 @@ class ReflexHTTPServiceHandler(BaseHTTPRequestHandler): elif self.path.endswith(".narinfo"): print(f"NAR info request: {self.path}") - code, content = self._nix.try_all("get", self.path) + code, _, content = self._nix.try_all("get", self.path) self.send_response(code) self.end_headers() if code == 200: