From 65c5c2e58f27453618770aca5cdc24a130d07853 Mon Sep 17 00:00:00 2001 From: Max Date: Sat, 28 Jan 2023 01:23:31 +0100 Subject: [PATCH] packages/hyprspace: use RoutedHost with combined PeX + DHT routing --- packages/networking/hyprspace/cli/up.go | 1 + packages/networking/hyprspace/p2p/node.go | 20 ++++-- packages/networking/hyprspace/p2p/pex.go | 72 ++++++++++++++++--- .../networking/hyprspace/p2p/rerouting.go | 52 ++++++++++++++ packages/networking/hyprspace/p2p/routing.go | 59 ++++++--------- 5 files changed, 149 insertions(+), 55 deletions(-) create mode 100644 packages/networking/hyprspace/p2p/rerouting.go diff --git a/packages/networking/hyprspace/cli/up.go b/packages/networking/hyprspace/cli/up.go index 7339623..be7a91f 100644 --- a/packages/networking/hyprspace/cli/up.go +++ b/packages/networking/hyprspace/cli/up.go @@ -107,6 +107,7 @@ func UpRun(r *cmd.Root, c *cmd.Sub) { port, streamHandler, p2p.NewClosedCircuitRelayFilter(cfg.Peers), + cfg.Peers, ) checkErr(err) host.SetStreamHandler(p2p.PeXProtocol, p2p.NewPeXStreamHandler(host, cfg)) diff --git a/packages/networking/hyprspace/p2p/node.go b/packages/networking/hyprspace/p2p/node.go index f5a4113..618ae80 100644 --- a/packages/networking/hyprspace/p2p/node.go +++ b/packages/networking/hyprspace/p2p/node.go @@ -11,6 +11,7 @@ import ( "os" "time" + "github.com/hyprspace/hyprspace/config" "github.com/libp2p/go-libp2p" dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/crypto" @@ -20,6 +21,7 @@ import ( "github.com/libp2p/go-libp2p/core/pnet" "github.com/libp2p/go-libp2p/p2p/discovery/backoff" "github.com/libp2p/go-libp2p/p2p/host/autorelay" + routedhost "github.com/libp2p/go-libp2p/p2p/host/routed" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic" "github.com/libp2p/go-libp2p/p2p/transport/tcp" @@ -89,7 +91,7 @@ func getExtraBootstrapNodes(addr ma.Multiaddr) (nodesList []string) { } // CreateNode creates an internal Libp2p nodes and returns it and it's DHT Discovery service. -func CreateNode(ctx context.Context, inputKey []byte, port int, handler network.StreamHandler, acl relay.ACLFilter) (node host.Host, dhtOut *dht.IpfsDHT, err error) { +func CreateNode(ctx context.Context, inputKey []byte, port int, handler network.StreamHandler, acl relay.ACLFilter, vpnPeers []config.Peer) (node host.Host, dhtOut *dht.IpfsDHT, err error) { // Unmarshal Private Key privateKey, err := crypto.UnmarshalPrivateKey(inputKey) if err != nil { @@ -119,7 +121,7 @@ func CreateNode(ctx context.Context, inputKey []byte, port int, handler network. peerChan := make(chan peer.AddrInfo) // Create libp2p node - node, err = libp2p.New( + basicHost, err := libp2p.New( maybePrivateNet, libp2p.ListenAddrStrings(ip6tcp, ip4tcp, ip4quic, ip6quic), libp2p.Identity(privateKey), @@ -164,9 +166,6 @@ func CreateNode(ctx context.Context, inputKey []byte, port int, handler network. return } - // Setup Hyprspace Stream Handler - node.SetStreamHandler(Protocol, handler) - // Define Bootstrap Nodes. peers := []string{ "/ip4/168.235.67.108/tcp/4001/p2p/QmRMA5pWXtfuW1y5w2t9gYxrDDD6bPRLKdWAYnHTeCxZMm", @@ -190,7 +189,7 @@ func CreateNode(ctx context.Context, inputKey []byte, port int, handler network. if err == nil { fmt.Printf("[+] %d additional addresses\n", len(extraPeers)) for _, p := range extraPeers { - node.Peerstore().AddAddrs(p.ID, p.Addrs, 5*time.Minute) + basicHost.Peerstore().AddAddrs(p.ID, p.Addrs, 5*time.Minute) } } } @@ -199,7 +198,7 @@ func CreateNode(ctx context.Context, inputKey []byte, port int, handler network. // Create DHT Subsystem dhtOut, err = dht.New( ctx, - node, + basicHost, dht.Mode(dht.ModeClient), dht.BootstrapPeers(staticBootstrapPeers...), dht.BootstrapPeersFunc(func() []peer.AddrInfo { @@ -222,6 +221,13 @@ func CreateNode(ctx context.Context, inputKey []byte, port int, handler network. }), ) + pexr := PeXRouting{basicHost, vpnPeers} + pr := ParallelRouting{[]routedhost.Routing{pexr, dhtOut}} + node = routedhost.Wrap(basicHost, pr) + + // Setup Hyprspace Stream Handler + node.SetStreamHandler(Protocol, handler) + if err != nil { return node, nil, err } diff --git a/packages/networking/hyprspace/p2p/pex.go b/packages/networking/hyprspace/p2p/pex.go index 6b0fc33..a4275fe 100644 --- a/packages/networking/hyprspace/p2p/pex.go +++ b/packages/networking/hyprspace/p2p/pex.go @@ -14,9 +14,15 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/routing" "github.com/multiformats/go-multiaddr" ) +type PeXRouting struct { + host host.Host + vpnPeers []config.Peer +} + const PeXProtocol = "/hyprspace/pex/0.0.1" func checkErrPeX(err error, stream network.Stream) bool { @@ -64,11 +70,11 @@ func NewPeXStreamHandler(host host.Host, cfg *config.Config) func(network.Stream } } -func RequestPeX(ctx context.Context, host host.Host, peers []peer.ID) error { +func RequestPeX(ctx context.Context, host host.Host, peers []peer.ID) (addrInfos []peer.AddrInfo, e error) { for _, p := range peers { s, err := host.NewStream(ctx, p, PeXProtocol) if err != nil { - return err + return nil, err } s.Write([]byte("r\n")) s.SetDeadline(time.Now().Add(10 * time.Second)) @@ -76,9 +82,9 @@ func RequestPeX(ctx context.Context, host host.Host, peers []peer.ID) error { for { str, err := buf.ReadString('\n') if err == io.EOF { - return nil + return nil, err } else if checkErrPeX(err, s) { - return err + return nil, err } str = strings.TrimSuffix(str, "\n") splits := strings.Split(str, "|") @@ -86,18 +92,20 @@ func RequestPeX(ctx context.Context, host host.Host, peers []peer.ID) error { addrStr := splits[1] peerId, err := peer.Decode(idStr) if checkErrPeX(err, s) { - return err + return nil, err } ma, err := multiaddr.NewMultiaddr(addrStr) if checkErrPeX(err, s) { - return err + return nil, err } fmt.Printf("[-] Got PeX peer: %s/p2p/%s\n", addrStr, idStr) - host.Peerstore().AddAddr(peerId, ma, 24*time.Hour) - host.Network().DialPeer(ctx, peerId) + addrInfos = append(addrInfos, peer.AddrInfo{ + ID: peerId, + Addrs: []multiaddr.Multiaddr{ma}, + }) } } - return nil + return addrInfos, nil } func PeXService(ctx context.Context, host host.Host, cfg *config.Config) { @@ -115,13 +123,27 @@ func PeXService(ctx context.Context, host host.Host, cfg *config.Config) { for _, vpnPeer := range cfg.Peers { if vpnPeer.ID == evt.Peer { if evt.Connectedness == network.Connected { - go RequestPeX(ctx, host, []peer.ID{evt.Peer}) + go func() { + addrInfos, err := RequestPeX(ctx, host, []peer.ID{evt.Peer}) + if err != nil { + for _, addrInfo := range addrInfos { + host.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, 30*time.Second) + } + } + }() } else if evt.Connectedness == network.NotConnected { peers := []peer.ID{} for _, p := range cfg.Peers { peers = append(peers, p.ID) } - go RequestPeX(ctx, host, peers) + go func() { + addrInfos, err := RequestPeX(ctx, host, peers) + if err != nil { + for _, addrInfo := range addrInfos { + host.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, 30*time.Second) + } + } + }() } break } @@ -129,3 +151,31 @@ func PeXService(ctx context.Context, host host.Host, cfg *config.Config) { } } } + +func (pexr PeXRouting) FindPeer(ctx context.Context, targetPeer peer.ID) (peer.AddrInfo, error) { + found := false + peers := []peer.ID{} + addrInfo := peer.AddrInfo{ + ID: targetPeer, + } + for _, p := range pexr.vpnPeers { + peers = append(peers, p.ID) + if p.ID == targetPeer { + found = true + } + } + // PeX routing only returns VPN node addresses + if !found { + return addrInfo, routing.ErrNotFound + } + addrInfos, err := RequestPeX(ctx, pexr.host, peers) + if err != nil { + return addrInfo, err + } + for _, ai := range addrInfos { + if ai.ID == targetPeer { + addrInfo.Addrs = append(addrInfo.Addrs, ai.Addrs...) + } + } + return addrInfo, nil +} diff --git a/packages/networking/hyprspace/p2p/rerouting.go b/packages/networking/hyprspace/p2p/rerouting.go new file mode 100644 index 0000000..bbd75ec --- /dev/null +++ b/packages/networking/hyprspace/p2p/rerouting.go @@ -0,0 +1,52 @@ +package p2p + +import ( + "net" + "sync" + + "github.com/libp2p/go-libp2p/core/peer" +) + +type Reroute struct { + Network net.IPNet + To peer.ID +} + +var ( + reroutes []Reroute + mut sync.Mutex +) + +func findReroute(network net.IPNet, doDelete bool) (int, *Reroute, bool) { + for i, r := range reroutes { + bits1, _ := r.Network.Mask.Size() + bits2, _ := network.Mask.Size() + if r.Network.IP.Equal(network.IP) && bits1 == bits2 { + if doDelete { + reroutes = append(reroutes[:i], reroutes[i+1:]...) + } + return i, &r, true + } + } + return 0, nil, false +} + +func FindReroute(network net.IPNet, doDelete bool) (*Reroute, bool) { + mut.Lock() + defer mut.Unlock() + _, i, r := findReroute(network, doDelete) + return i, r +} + +func AddReroute(network net.IPNet, peerID peer.ID) { + mut.Lock() + defer mut.Unlock() + if i, _, found := findReroute(network, false); found { + reroutes[i].To = peerID + } else { + reroutes = append(reroutes, Reroute{ + Network: network, + To: peerID, + }) + } +} diff --git a/packages/networking/hyprspace/p2p/routing.go b/packages/networking/hyprspace/p2p/routing.go index bbd75ec..7e3cea7 100644 --- a/packages/networking/hyprspace/p2p/routing.go +++ b/packages/networking/hyprspace/p2p/routing.go @@ -1,52 +1,37 @@ package p2p import ( - "net" + "context" "sync" "github.com/libp2p/go-libp2p/core/peer" + routedhost "github.com/libp2p/go-libp2p/p2p/host/routed" ) -type Reroute struct { - Network net.IPNet - To peer.ID +type ParallelRouting struct { + routings []routedhost.Routing } -var ( - reroutes []Reroute - mut sync.Mutex -) +func (pr ParallelRouting) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) { + var wg sync.WaitGroup + var mutex sync.Mutex -func findReroute(network net.IPNet, doDelete bool) (int, *Reroute, bool) { - for i, r := range reroutes { - bits1, _ := r.Network.Mask.Size() - bits2, _ := network.Mask.Size() - if r.Network.IP.Equal(network.IP) && bits1 == bits2 { - if doDelete { - reroutes = append(reroutes[:i], reroutes[i+1:]...) + var info peer.AddrInfo + info.ID = p + for _, r := range pr.routings { + wg.Add(1) + r2 := r + go func() { + defer wg.Done() + ai, err := r2.FindPeer(ctx, p) + if err == nil { + mutex.Lock() + defer mutex.Unlock() + info.Addrs = append(info.Addrs, ai.Addrs...) } - return i, &r, true - } + }() } - return 0, nil, false -} -func FindReroute(network net.IPNet, doDelete bool) (*Reroute, bool) { - mut.Lock() - defer mut.Unlock() - _, i, r := findReroute(network, doDelete) - return i, r -} - -func AddReroute(network net.IPNet, peerID peer.ID) { - mut.Lock() - defer mut.Unlock() - if i, _, found := findReroute(network, false); found { - reroutes[i].To = peerID - } else { - reroutes = append(reroutes, Reroute{ - Network: network, - To: peerID, - }) - } + wg.Wait() + return info, nil }