packages/reflex-cache: send redirect to upstream cache on first fetch

This commit is contained in:
Max Headroom 2022-12-18 23:05:29 +01:00
parent 1754841832
commit 226fc6da9a
3 changed files with 36 additions and 26 deletions

View file

@ -10,9 +10,9 @@ class IPFSController:
self.__nix = nixCache
self.__db = db
def ipfs_fetch_task(self, nar):
def ipfs_fetch_task(self, callback, nar, hint=None):
print(f"Downloading NAR: {nar}")
code, content = self.__nix.try_all("get", nar)
code, _, content = self.__nix.try_all("get", nar, hint)
if code == 200:
upload = {"file": ("FILE", content, "application/octet-stream")}
try:
@ -22,9 +22,12 @@ class IPFSController:
hash = rIpfs.json()["Hash"]
print(f"Mapped: {nar} -> /ipfs/{hash}")
self.__db.set_path(nar, hash)
callback()
return (nar, 200, hash)
except requests.ConnectionError as e:
print(e)
callback()
return (nar, 502, False)
else:
callback()
return (nar, code, False)

View file

@ -10,7 +10,7 @@ class NixCacheFetcher:
self.__caches = caches
@lru_cache(maxsize=32768)
def __try_all_cached(self, method, path):
def __try_all_cached(self, method, path, hint):
fn = (
requests.get
if method == "get"
@ -22,7 +22,13 @@ class NixCacheFetcher:
bestState = 404
print(f" fetching [{method}] from any cache {path}")
for cache in self.__caches:
if hint != None:
caches = []
caches.append(hint)
caches.extend(self.__caches)
else:
caches = self.__caches
for cache in caches:
try:
rCache = fn(f"{cache}{path}")
if rCache.status_code < bestState:
@ -30,7 +36,7 @@ class NixCacheFetcher:
print(f" {rCache.status_code} - [{method}] {cache}{path}")
if bestState == 200:
r = (bestState, rCache.content if method != "head" else False)
r = (bestState, cache, rCache.content if method != "head" else False)
if path.endswith(".narinfo"):
return r
else:
@ -40,10 +46,10 @@ class NixCacheFetcher:
# HACK: lru_cache does not cache results if an exception occurs
# since we don't want to cache empty query results, we make use of this behavior
raise Uncached((bestState, False))
raise Uncached((bestState, None, False))
def try_all(self, method, path):
def try_all(self, method, path, hint=None):
try:
return self.__try_all_cached(method, path)
return self.__try_all_cached(method, path, hint)
except Uncached as r:
return r.args[0]

View file

@ -27,7 +27,7 @@ class ReflexHTTPServiceHandler(BaseHTTPRequestHandler):
def do_HEAD(self):
if self.path.endswith(".narinfo"):
print(f"NAR info request: {self.path}")
code, content = self._nix.try_all("head", self.path)
code, _, content = self._nix.try_all("head", self.path)
self.send_response(code)
self.end_headers()
else:
@ -45,6 +45,15 @@ class ReflexHTTPServiceHandler(BaseHTTPRequestHandler):
resultHash = self._db.get_path(self.path)
if resultHash == None:
code, cache, _ = self._nix.try_all("head", self.path)
if code == 200:
self.send_response(302)
self.send_header("Location", f"{cache}{self.path}")
self.end_headers()
else:
self.send_response(404)
self.end_headers()
return
with self._workSetLock:
found = False
for (itemNar, itemFuture) in self._workSet:
@ -58,25 +67,17 @@ class ReflexHTTPServiceHandler(BaseHTTPRequestHandler):
if not found:
print(f"Creating new IPFS fetch task for {self.path}")
f = self._executor_nar.submit(
self._ipfs.ipfs_fetch_task, self.path
)
self._workSet.add((self.path, f))
resultNar, code, resultHash = f.result()
def cb():
with self._workSetLock:
try:
self._workSet.remove((self.path, f))
except KeyError:
# already removed
pass
else:
code = 200
if code != 200:
self.send_response(code)
self.end_headers()
f = self._executor_nar.submit(
self._ipfs.ipfs_fetch_task, cb, self.path, cache
)
self._workSet.add((self.path, f))
return
self.send_response(302)
@ -104,7 +105,7 @@ class ReflexHTTPServiceHandler(BaseHTTPRequestHandler):
elif self.path.endswith(".narinfo"):
print(f"NAR info request: {self.path}")
code, content = self._nix.try_all("get", self.path)
code, _, content = self._nix.try_all("get", self.path)
self.send_response(code)
self.end_headers()
if code == 200: