Merge pull request #6 from privatevoid-net/pr-reflex-cache
"Reflex" Nix IPFS cache
This commit is contained in:
commit
4fbb8994bb
3 changed files with 254 additions and 0 deletions
24
hosts/VEGAS/services/nix/ipfs-cache.nix
Normal file
24
hosts/VEGAS/services/nix/ipfs-cache.nix
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
{ config, pkgs, tools, ... }:
|
||||||
|
|
||||||
|
let
|
||||||
|
py3 = pkgs.python3.withPackages (ps: with ps; [
|
||||||
|
requests
|
||||||
|
requests-unixsocket
|
||||||
|
]);
|
||||||
|
|
||||||
|
port = config.portsStr.nixIpfs;
|
||||||
|
in {
|
||||||
|
reservePortsFor = [ "nixIpfs" ];
|
||||||
|
|
||||||
|
systemd.services.nix-ipfs-cache = {
|
||||||
|
wantedBy = [ "multi-user.target" ];
|
||||||
|
serviceConfig = {
|
||||||
|
ExecStart = "${py3}/bin/python3 -u ${./nix-ipfs-cache.py} ${port}";
|
||||||
|
DynamicUser = true;
|
||||||
|
SupplementaryGroups = [ "ipfs" ];
|
||||||
|
CacheDirectory = "nix-ipfs-cache";
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
services.nginx.virtualHosts."reflex.${tools.meta.domain}" = tools.nginx.vhosts.proxy "http://127.0.0.1:${port}";
|
||||||
|
}
|
229
hosts/VEGAS/services/nix/nix-ipfs-cache.py
Normal file
229
hosts/VEGAS/services/nix/nix-ipfs-cache.py
Normal file
|
@ -0,0 +1,229 @@
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from functools import lru_cache
|
||||||
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||||||
|
from os import environ
|
||||||
|
from random import randint
|
||||||
|
from socketserver import ThreadingMixIn
|
||||||
|
from sys import argv
|
||||||
|
from threading import Thread, Lock
|
||||||
|
import base64
|
||||||
|
import json
|
||||||
|
import queue
|
||||||
|
import re
|
||||||
|
import requests
|
||||||
|
import requests_unixsocket
|
||||||
|
import sqlite3
|
||||||
|
import time
|
||||||
|
|
||||||
|
CACHES = [
|
||||||
|
"https://cache.privatevoid.net",
|
||||||
|
"https://cache.nixos.org",
|
||||||
|
"https://max.cachix.org"
|
||||||
|
]
|
||||||
|
|
||||||
|
workSet = set()
|
||||||
|
workSetLock = Lock()
|
||||||
|
dbLock = Lock()
|
||||||
|
|
||||||
|
class Uncached(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def db_getcon():
|
||||||
|
cacheDir = environ["CACHE_DIRECTORY"]
|
||||||
|
con = sqlite3.connect(f"{cacheDir}/nix-ipfs-cache.db")
|
||||||
|
cur = con.cursor()
|
||||||
|
return con, cur
|
||||||
|
|
||||||
|
def db_close(con):
|
||||||
|
con.close()
|
||||||
|
|
||||||
|
@lru_cache(maxsize=65536)
|
||||||
|
def db_get_path_cached(narPath):
|
||||||
|
with dbLock:
|
||||||
|
con, cur = db_getcon()
|
||||||
|
for (nar, ipfs) in cur.execute("SELECT nar, ipfs FROM NarToIpfs WHERE nar=:nar", {"nar": narPath}):
|
||||||
|
db_close(con)
|
||||||
|
return ipfs
|
||||||
|
raise Uncached
|
||||||
|
|
||||||
|
def db_get_path(narPath):
|
||||||
|
try:
|
||||||
|
return db_get_path_cached(narPath)
|
||||||
|
except Uncached:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def db_set_path(narPath, ipfsPath):
|
||||||
|
with dbLock:
|
||||||
|
con, cur = db_getcon()
|
||||||
|
cur.execute("INSERT INTO NarToIpfs VALUES (:nar, :ipfs)", {"nar": narPath, "ipfs": ipfsPath})
|
||||||
|
con.commit()
|
||||||
|
db_close(con)
|
||||||
|
|
||||||
|
@lru_cache(maxsize=32768)
|
||||||
|
def try_all_cached(method, path):
|
||||||
|
fn = requests.get if method == "get" else requests.head if method == "head" else Error("invalid method")
|
||||||
|
|
||||||
|
bestState = 502
|
||||||
|
|
||||||
|
print(f" fetching [{method}] from any cache {path}")
|
||||||
|
for cache in CACHES:
|
||||||
|
try:
|
||||||
|
rCache = fn(f"{cache}{path}")
|
||||||
|
if rCache.status_code < bestState:
|
||||||
|
bestState = rCache.status_code
|
||||||
|
|
||||||
|
print(f" {rCache.status_code} - [{method}] {cache}{path}")
|
||||||
|
if bestState == 200:
|
||||||
|
r = (bestState,rCache.content if method != "head" else False)
|
||||||
|
if path.endswith(".narinfo"):
|
||||||
|
return r
|
||||||
|
else:
|
||||||
|
raise Uncached(r)
|
||||||
|
except requests.ConnectionError as e:
|
||||||
|
print(e)
|
||||||
|
|
||||||
|
raise Uncached((bestState,False))
|
||||||
|
|
||||||
|
def try_all(method, path):
|
||||||
|
try:
|
||||||
|
return try_all_cached(method, path)
|
||||||
|
except Uncached as r:
|
||||||
|
return r.args[0]
|
||||||
|
|
||||||
|
def ipfs_fetch_task(nar):
|
||||||
|
print(f"Downloading NAR: {nar}")
|
||||||
|
code, content = try_all("get",nar)
|
||||||
|
if code == 200:
|
||||||
|
upload = {'file': ('FILE',content,'application/octet-stream')}
|
||||||
|
try:
|
||||||
|
rIpfs = requests_unixsocket.post('http+unix://%2Frun%2Fipfs%2Fipfs-api.sock/api/v0/add?pin=false&quieter=true', files=upload)
|
||||||
|
hash = rIpfs.json()["Hash"]
|
||||||
|
print(f"Mapped: {nar} -> /ipfs/{hash}")
|
||||||
|
db_set_path(nar, hash)
|
||||||
|
return (nar, 200, hash)
|
||||||
|
except requests.ConnectionError as e:
|
||||||
|
print(e)
|
||||||
|
return (nar, 502, False)
|
||||||
|
else:
|
||||||
|
return (nar, code, False)
|
||||||
|
|
||||||
|
class ServiceHandler(BaseHTTPRequestHandler):
|
||||||
|
_executor_nar_pre = ThreadPoolExecutor(4)
|
||||||
|
_executor_nar_main = ThreadPoolExecutor(8)
|
||||||
|
#_executor_narinfo = ThreadPoolExecutor(16) # for narinfo uploads - TODO
|
||||||
|
|
||||||
|
def do_HEAD(self):
|
||||||
|
if self.path.endswith(".narinfo"):
|
||||||
|
print(f"NAR info request: {self.path}")
|
||||||
|
code, content = try_all("head",self.path)
|
||||||
|
self.send_response(code)
|
||||||
|
self.end_headers()
|
||||||
|
else:
|
||||||
|
self.send_response(404)
|
||||||
|
self.end_headers()
|
||||||
|
|
||||||
|
def do_GET(self):
|
||||||
|
if self.path.startswith("/nix-cache-info"):
|
||||||
|
self.send_response(200)
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(b"StoreDir: /nix/store\n")
|
||||||
|
return
|
||||||
|
|
||||||
|
elif self.path.startswith("/nar/"):
|
||||||
|
resultHash = db_get_path(self.path)
|
||||||
|
|
||||||
|
if resultHash == None:
|
||||||
|
with workSetLock:
|
||||||
|
found = False
|
||||||
|
for (itemNar, itemFuture) in workSet:
|
||||||
|
if itemNar == self.path:
|
||||||
|
f = itemFuture
|
||||||
|
print(f"IPFS fetch task for {self.path} already being processed")
|
||||||
|
found = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if not found:
|
||||||
|
print(f"Creating new IPFS fetch task for {self.path}")
|
||||||
|
f = self._executor_nar_main.submit(ipfs_fetch_task, self.path)
|
||||||
|
workSet.add((self.path, f))
|
||||||
|
|
||||||
|
resultNar, code, resultHash = f.result()
|
||||||
|
|
||||||
|
with workSetLock:
|
||||||
|
try:
|
||||||
|
workSet.remove((self.path, f))
|
||||||
|
except KeyError:
|
||||||
|
# already removed
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
code = 200
|
||||||
|
|
||||||
|
if code != 200:
|
||||||
|
self.send_response(code)
|
||||||
|
self.end_headers()
|
||||||
|
return
|
||||||
|
|
||||||
|
self.send_response(302)
|
||||||
|
|
||||||
|
# not used for auth, but for defining a redirect target
|
||||||
|
auth = self.headers.get('Authorization')
|
||||||
|
if auth != None:
|
||||||
|
try:
|
||||||
|
decoded1 = base64.b64decode(auth.removeprefix("Basic ")).removesuffix(b":")
|
||||||
|
if decoded1.isdigit():
|
||||||
|
redirect = f"http://127.0.0.1:{decoded1.decode('utf-8')}"
|
||||||
|
else:
|
||||||
|
decoded2 = base64.b32decode(decoded1.upper() + b"=" * (-len(decoded1) % 8))
|
||||||
|
redirect = decoded2.decode("utf-8")
|
||||||
|
except Exception:
|
||||||
|
redirect = "http://127.0.0.1:8080"
|
||||||
|
else:
|
||||||
|
redirect = "http://127.0.0.1:8080"
|
||||||
|
|
||||||
|
self.send_header('Location', f'{redirect}/ipfs/{resultHash}')
|
||||||
|
self.send_header('X-Ipfs-Path', f'/ipfs/{resultHash}')
|
||||||
|
self.end_headers()
|
||||||
|
return
|
||||||
|
|
||||||
|
elif self.path.endswith(".narinfo"):
|
||||||
|
print(f"NAR info request: {self.path}")
|
||||||
|
code, content = try_all("get",self.path)
|
||||||
|
self.send_response(code)
|
||||||
|
self.end_headers()
|
||||||
|
if code == 200:
|
||||||
|
self.wfile.write(content)
|
||||||
|
if match := re.search('URL: (nar/[a-z0-9]*\.nar.*)', content.decode("utf-8")):
|
||||||
|
nar = f"/{match.group(1)}"
|
||||||
|
if db_get_path(nar) == None:
|
||||||
|
with workSetLock:
|
||||||
|
found = False
|
||||||
|
for (itemNar, itemFuture) in workSet:
|
||||||
|
if itemNar == nar:
|
||||||
|
found = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if not found:
|
||||||
|
print(f"Pre-flight: creating IPFS fetch task for {nar}")
|
||||||
|
f = self._executor_nar_pre.submit(ipfs_fetch_task, nar)
|
||||||
|
workSet.add((nar, f))
|
||||||
|
return
|
||||||
|
|
||||||
|
else:
|
||||||
|
code = 404
|
||||||
|
|
||||||
|
if code > 299:
|
||||||
|
self.send_response(code)
|
||||||
|
self.end_headers()
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
con, cur = db_getcon()
|
||||||
|
cur.execute("CREATE TABLE IF NOT EXISTS NarToIpfs (nar text primary key, ipfs text)")
|
||||||
|
con.commit()
|
||||||
|
db_close(con)
|
||||||
|
|
||||||
|
server = ThreadingHTTPServer(('127.0.0.1',int(argv[1])), ServiceHandler)
|
||||||
|
server.serve_forever()
|
|
@ -33,6 +33,7 @@
|
||||||
./services/mail
|
./services/mail
|
||||||
./services/matrix
|
./services/matrix
|
||||||
./services/nix/binary-cache.nix
|
./services/nix/binary-cache.nix
|
||||||
|
./services/nix/ipfs-cache.nix
|
||||||
./services/nix/nar-serve.nix
|
./services/nix/nar-serve.nix
|
||||||
./services/object-storage
|
./services/object-storage
|
||||||
./services/openvpn
|
./services/openvpn
|
||||||
|
|
Loading…
Reference in a new issue