packages/hyprspace: use libp2p built-in DHT bootstrap logic

This commit is contained in:
Max Headroom 2022-11-09 23:52:56 +01:00
parent 649fb307fd
commit c3435bf9b8
3 changed files with 69 additions and 100 deletions

View file

@ -23,6 +23,7 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/multiformats/go-multibase"
"github.com/nxadm/tail"
)
@ -158,8 +159,8 @@ func UpRun(r *cmd.Root, c *cmd.Sub) {
// Configure path for lock
lockPath := filepath.Join(filepath.Dir(cfg.Path), cfg.Interface.Name+".lock")
// Register the application to listen for SIGINT/SIGTERM
go signalExit(host, lockPath)
// Register the application to listen for signals
go signalHandler(host, lockPath, dht)
// Write lock to filesystem to indicate an existing running daemon.
err = os.WriteFile(lockPath, []byte(fmt.Sprint(os.Getpid())), os.ModePerm)
@ -259,27 +260,33 @@ func UpRun(r *cmd.Root, c *cmd.Sub) {
}
}
// singalExit registers two syscall handlers on the system so that if
// an SIGINT or SIGTERM occur on the system hyprspace can gracefully
// shutdown and remove the filesystem lock file.
func signalExit(host host.Host, lockPath string) {
// Wait for a SIGINT or SIGTERM signal
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
<-ch
func signalHandler(host host.Host, lockPath string, dht *dht.IpfsDHT) {
exitCh := make(chan os.Signal, 1)
rebootstrapCh := make(chan os.Signal, 1)
signal.Notify(exitCh, syscall.SIGINT, syscall.SIGTERM)
signal.Notify(rebootstrapCh, syscall.SIGUSR1)
// Shut the node down
err := host.Close()
checkErr(err)
for {
select {
case <-rebootstrapCh:
fmt.Println("[-] Rebootstrapping on SIGUSR1")
<-dht.ForceRefresh()
p2p.Rediscover()
case <-exitCh:
// Shut the node down
err := host.Close()
checkErr(err)
// Remove daemon lock from file system.
err = os.Remove(lockPath)
checkErr(err)
// Remove daemon lock from file system.
err = os.Remove(lockPath)
checkErr(err)
fmt.Println("Received signal, shutting down...")
fmt.Println("Received signal, shutting down...")
// Exit the application.
os.Exit(0)
// Exit the application.
os.Exit(0)
}
}
}
// createDaemon handles creating an independent background process for a

View file

@ -47,7 +47,8 @@ func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT, peerTable map[
}
if !connectedToAny {
fmt.Println("[!] Not connected to any peers, attempting to bootstrap again")
Rebootstrap()
dht.Bootstrap(ctx)
dht.RefreshRoutingTable()
dur = time.Second * 1
ticker.Reset(dur)
} else {

View file

@ -8,11 +8,7 @@ import (
"net/http"
"net/url"
"os"
"os/signal"
"sync"
"syscall"
"github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
@ -79,16 +75,6 @@ func CreateNode(ctx context.Context, inputKey []byte, port int, handler network.
}
defer swarmKey.Close()
}
extraBootstrapNodes := []string{}
ipfsApiStr, ok := os.LookupEnv("HYPRSPACE_IPFS_API")
if ok {
ipfsApiAddr, err := ma.NewMultiaddr(ipfsApiStr)
if err == nil {
fmt.Println("[+] Getting additional peers from IPFS API")
extraBootstrapNodes = getExtraBootstrapNodes(ipfsApiAddr)
fmt.Printf("[+] %d additional addresses\n", len(extraBootstrapNodes))
}
}
ip6tcp := fmt.Sprintf("/ip6/::/tcp/%d", port)
ip4tcp := fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port)
@ -116,9 +102,6 @@ func CreateNode(ctx context.Context, inputKey []byte, port int, handler network.
// Setup Hyprspace Stream Handler
node.SetStreamHandler(Protocol, handler)
// Create DHT Subsystem
dhtOut = dht.NewDHTClient(ctx, node, datastore.NewMapDatastore())
// Define Bootstrap Nodes.
peers := []string{
"/ip4/168.235.67.108/tcp/4001/p2p/QmRMA5pWXtfuW1y5w2t9gYxrDDD6bPRLKdWAYnHTeCxZMm",
@ -128,77 +111,55 @@ func CreateNode(ctx context.Context, inputKey []byte, port int, handler network.
}
// Convert Bootstap Nodes into usable addresses.
BootstrapPeers := make(map[peer.ID]*peer.AddrInfo, len(peers))
for _, addrStr := range append(peers, extraBootstrapNodes...) {
addr, err := ma.NewMultiaddr(addrStr)
if err != nil {
return node, dhtOut, err
}
pii, err := peer.AddrInfoFromP2pAddr(addr)
if err != nil {
return node, dhtOut, err
}
pi, ok := BootstrapPeers[pii.ID]
if !ok {
pi = &peer.AddrInfo{ID: pii.ID}
BootstrapPeers[pi.ID] = pi
}
pi.Addrs = append(pi.Addrs, pii.Addrs...)
staticBootstrapPeers, err := parsePeerAddrs(peers)
if err != nil {
return node, nil, err
}
// Let's connect to the bootstrap nodes first. They will tell us about the
// other nodes in the network.
count := bootstrap(ctx, node, BootstrapPeers)
// Create DHT Subsystem
dhtOut, err = dht.New(
ctx,
node,
dht.Mode(dht.ModeAuto),
dht.BootstrapPeers(staticBootstrapPeers...),
dht.BootstrapPeersFunc(func() []peer.AddrInfo {
extraBootstrapNodes := []string{}
ipfsApiStr, ok := os.LookupEnv("HYPRSPACE_IPFS_API")
if ok {
ipfsApiAddr, err := ma.NewMultiaddr(ipfsApiStr)
if err == nil {
fmt.Println("[+] Getting additional peers from IPFS API")
extraBootstrapNodes = getExtraBootstrapNodes(ipfsApiAddr)
fmt.Printf("[+] %d additional addresses\n", len(extraBootstrapNodes))
}
}
dynamicBootstrapPeers, err := parsePeerAddrs(extraBootstrapNodes)
if err != nil {
return staticBootstrapPeers
} else {
return append(staticBootstrapPeers, dynamicBootstrapPeers...)
}
}),
)
if count < 1 {
fmt.Println("[!] Initial bootstrap failed")
if err != nil {
return node, nil, err
}
go rebootstrap(ctx, node, BootstrapPeers)
return node, dhtOut, nil
}
func bootstrap(ctx context.Context, node host.Host, bootstrapPeers map[peer.ID]*peer.AddrInfo) int {
var wg sync.WaitGroup
lock := sync.Mutex{}
count := 0
wg.Add(len(bootstrapPeers))
for _, peerInfo := range bootstrapPeers {
go func(peerInfo *peer.AddrInfo) {
defer wg.Done()
node.Network().ClosePeer(peerInfo.ID)
err := node.Connect(ctx, *peerInfo)
if err == nil {
lock.Lock()
count++
lock.Unlock()
}
}(peerInfo)
}
wg.Wait()
fmt.Printf("[+] Connected to %d bootstrap peers\n", count)
return count
}
func rebootstrap(ctx context.Context, node host.Host, bootstrapPeers map[peer.ID]*peer.AddrInfo) {
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, syscall.SIGUSR1)
for {
select {
case <-ctx.Done():
return
case <-bootstrapTriggerChan:
bootstrap(ctx, node, bootstrapPeers)
case <-signalCh:
fmt.Println("[-] Rebootstrapping on SIGUSR1")
bootstrap(ctx, node, bootstrapPeers)
Rediscover()
func parsePeerAddrs(peers []string) (addrs []peer.AddrInfo, err error) {
for _, addrStr := range peers {
addr, err := ma.NewMultiaddr(addrStr)
if err != nil {
return nil, err
}
pii, err := peer.AddrInfoFromP2pAddr(addr)
if err != nil {
return nil, err
}
addrs = append(addrs, *pii)
}
}
func Rebootstrap() {
bootstrapTriggerChan <- true
return addrs, nil
}