packages/hyprspace: rework discovery code

This commit is contained in:
Max Headroom 2022-09-26 19:23:03 +02:00
parent fc3b1012ac
commit 04766e0a9e
2 changed files with 22 additions and 31 deletions

View file

@ -153,8 +153,8 @@ func UpRun(r *cmd.Root, c *cmd.Sub) {
fmt.Println("[+] Setting Up Node Discovery via DHT") fmt.Println("[+] Setting Up Node Discovery via DHT")
// Setup P2P Discovery // Setup P2P Discovery
go p2p.Discover(ctx, host, dht, peerTable) discoverNow := make(chan bool)
go prettyDiscovery(ctx, host, peerTable) go p2p.Discover(ctx, host, dht, peerTable, discoverNow)
// Configure path for lock // Configure path for lock
lockPath := filepath.Join(filepath.Dir(cfg.Path), cfg.Interface.Name+".lock") lockPath := filepath.Join(filepath.Dir(cfg.Path), cfg.Interface.Name+".lock")
@ -220,7 +220,7 @@ func UpRun(r *cmd.Root, c *cmd.Sub) {
stream, err = host.NewStream(ctx, peer, p2p.Protocol) stream, err = host.NewStream(ctx, peer, p2p.Protocol)
if err != nil { if err != nil {
fmt.Println("[!] Failed to dial peer: " + peer.Pretty()) fmt.Println("[!] Failed to dial peer: " + peer.Pretty())
go p2p.Discover(ctx, host, dht, peerTable) go p2p.Rediscover(discoverNow)
continue continue
} }
stream.SetWriteDeadline(time.Now().Add(25 * time.Second)) stream.SetWriteDeadline(time.Now().Add(25 * time.Second))
@ -367,31 +367,6 @@ func streamHandler(stream network.Stream) {
} }
} }
func prettyDiscovery(ctx context.Context, node host.Host, peerTable map[string]peer.ID) {
// Build a temporary map of peers to limit querying to only those
// not connected.
tempTable := make(map[string]peer.ID, len(peerTable))
for ip, id := range peerTable {
tempTable[ip] = id
}
for len(tempTable) > 0 {
for ip, id := range tempTable {
stream, err := node.NewStream(ctx, id, p2p.Protocol)
if err != nil && (strings.HasPrefix(err.Error(), "failed to dial") ||
strings.HasPrefix(err.Error(), "no addresses")) {
// Attempt to connect to peers slowly when they aren't found.
time.Sleep(1 * time.Second)
continue
}
if err == nil {
fmt.Printf("[+] Connection to %s Successful. Network Ready.\n", ip)
stream.Close()
}
delete(tempTable, ip)
}
}
}
func verifyPort(port int) (int, error) { func verifyPort(port int) (int, error) {
var ln net.Listener var ln net.Listener
var err error var err error

View file

@ -2,6 +2,7 @@ package p2p
import ( import (
"context" "context"
"fmt"
"time" "time"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
@ -11,16 +12,21 @@ import (
) )
// Discover starts up a DHT based discovery system finding and adding nodes with the same rendezvous string. // Discover starts up a DHT based discovery system finding and adding nodes with the same rendezvous string.
func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT, peerTable map[string]peer.ID) { func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT, peerTable map[string]peer.ID, discoverNow chan bool) {
ticker := time.NewTicker(time.Second * 1) dur := time.Second * 1
ticker := time.NewTicker(dur)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-discoverNow:
dur = time.Second * 1
// Immediately trigger discovery
ticker.Reset(time.Millisecond * 1)
case <-ticker.C: case <-ticker.C:
for _, id := range peerTable { for nd, id := range peerTable {
if h.Network().Connectedness(id) != network.Connected { if h.Network().Connectedness(id) != network.Connected {
addrs, err := dht.FindPeer(ctx, id) addrs, err := dht.FindPeer(ctx, id)
if err != nil { if err != nil {
@ -30,8 +36,18 @@ func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT, peerTable map[
if err != nil { if err != nil {
continue continue
} }
fmt.Println("[+] Connected to " + nd)
} }
} }
dur = dur * 2
if dur >= time.Second*60 {
dur = time.Second * 60
}
ticker.Reset(dur)
} }
} }
} }
func Rediscover(discoverNow chan bool) {
discoverNow <- true
}