packages/hyprspace: use RoutedHost with combined PeX + DHT routing

This commit is contained in:
Max Headroom 2023-01-28 01:23:31 +01:00
parent bb54aff319
commit 65c5c2e58f
5 changed files with 149 additions and 55 deletions

View file

@ -107,6 +107,7 @@ func UpRun(r *cmd.Root, c *cmd.Sub) {
port, port,
streamHandler, streamHandler,
p2p.NewClosedCircuitRelayFilter(cfg.Peers), p2p.NewClosedCircuitRelayFilter(cfg.Peers),
cfg.Peers,
) )
checkErr(err) checkErr(err)
host.SetStreamHandler(p2p.PeXProtocol, p2p.NewPeXStreamHandler(host, cfg)) host.SetStreamHandler(p2p.PeXProtocol, p2p.NewPeXStreamHandler(host, cfg))

View file

@ -11,6 +11,7 @@ import (
"os" "os"
"time" "time"
"github.com/hyprspace/hyprspace/config"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht" dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/crypto"
@ -20,6 +21,7 @@ import (
"github.com/libp2p/go-libp2p/core/pnet" "github.com/libp2p/go-libp2p/core/pnet"
"github.com/libp2p/go-libp2p/p2p/discovery/backoff" "github.com/libp2p/go-libp2p/p2p/discovery/backoff"
"github.com/libp2p/go-libp2p/p2p/host/autorelay" "github.com/libp2p/go-libp2p/p2p/host/autorelay"
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic" libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/tcp" "github.com/libp2p/go-libp2p/p2p/transport/tcp"
@ -89,7 +91,7 @@ func getExtraBootstrapNodes(addr ma.Multiaddr) (nodesList []string) {
} }
// CreateNode creates an internal Libp2p nodes and returns it and it's DHT Discovery service. // CreateNode creates an internal Libp2p nodes and returns it and it's DHT Discovery service.
func CreateNode(ctx context.Context, inputKey []byte, port int, handler network.StreamHandler, acl relay.ACLFilter) (node host.Host, dhtOut *dht.IpfsDHT, err error) { func CreateNode(ctx context.Context, inputKey []byte, port int, handler network.StreamHandler, acl relay.ACLFilter, vpnPeers []config.Peer) (node host.Host, dhtOut *dht.IpfsDHT, err error) {
// Unmarshal Private Key // Unmarshal Private Key
privateKey, err := crypto.UnmarshalPrivateKey(inputKey) privateKey, err := crypto.UnmarshalPrivateKey(inputKey)
if err != nil { if err != nil {
@ -119,7 +121,7 @@ func CreateNode(ctx context.Context, inputKey []byte, port int, handler network.
peerChan := make(chan peer.AddrInfo) peerChan := make(chan peer.AddrInfo)
// Create libp2p node // Create libp2p node
node, err = libp2p.New( basicHost, err := libp2p.New(
maybePrivateNet, maybePrivateNet,
libp2p.ListenAddrStrings(ip6tcp, ip4tcp, ip4quic, ip6quic), libp2p.ListenAddrStrings(ip6tcp, ip4tcp, ip4quic, ip6quic),
libp2p.Identity(privateKey), libp2p.Identity(privateKey),
@ -164,9 +166,6 @@ func CreateNode(ctx context.Context, inputKey []byte, port int, handler network.
return return
} }
// Setup Hyprspace Stream Handler
node.SetStreamHandler(Protocol, handler)
// Define Bootstrap Nodes. // Define Bootstrap Nodes.
peers := []string{ peers := []string{
"/ip4/168.235.67.108/tcp/4001/p2p/QmRMA5pWXtfuW1y5w2t9gYxrDDD6bPRLKdWAYnHTeCxZMm", "/ip4/168.235.67.108/tcp/4001/p2p/QmRMA5pWXtfuW1y5w2t9gYxrDDD6bPRLKdWAYnHTeCxZMm",
@ -190,7 +189,7 @@ func CreateNode(ctx context.Context, inputKey []byte, port int, handler network.
if err == nil { if err == nil {
fmt.Printf("[+] %d additional addresses\n", len(extraPeers)) fmt.Printf("[+] %d additional addresses\n", len(extraPeers))
for _, p := range extraPeers { for _, p := range extraPeers {
node.Peerstore().AddAddrs(p.ID, p.Addrs, 5*time.Minute) basicHost.Peerstore().AddAddrs(p.ID, p.Addrs, 5*time.Minute)
} }
} }
} }
@ -199,7 +198,7 @@ func CreateNode(ctx context.Context, inputKey []byte, port int, handler network.
// Create DHT Subsystem // Create DHT Subsystem
dhtOut, err = dht.New( dhtOut, err = dht.New(
ctx, ctx,
node, basicHost,
dht.Mode(dht.ModeClient), dht.Mode(dht.ModeClient),
dht.BootstrapPeers(staticBootstrapPeers...), dht.BootstrapPeers(staticBootstrapPeers...),
dht.BootstrapPeersFunc(func() []peer.AddrInfo { dht.BootstrapPeersFunc(func() []peer.AddrInfo {
@ -222,6 +221,13 @@ func CreateNode(ctx context.Context, inputKey []byte, port int, handler network.
}), }),
) )
pexr := PeXRouting{basicHost, vpnPeers}
pr := ParallelRouting{[]routedhost.Routing{pexr, dhtOut}}
node = routedhost.Wrap(basicHost, pr)
// Setup Hyprspace Stream Handler
node.SetStreamHandler(Protocol, handler)
if err != nil { if err != nil {
return node, nil, err return node, nil, err
} }

View file

@ -14,9 +14,15 @@ import (
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
) )
type PeXRouting struct {
host host.Host
vpnPeers []config.Peer
}
const PeXProtocol = "/hyprspace/pex/0.0.1" const PeXProtocol = "/hyprspace/pex/0.0.1"
func checkErrPeX(err error, stream network.Stream) bool { func checkErrPeX(err error, stream network.Stream) bool {
@ -64,11 +70,11 @@ func NewPeXStreamHandler(host host.Host, cfg *config.Config) func(network.Stream
} }
} }
func RequestPeX(ctx context.Context, host host.Host, peers []peer.ID) error { func RequestPeX(ctx context.Context, host host.Host, peers []peer.ID) (addrInfos []peer.AddrInfo, e error) {
for _, p := range peers { for _, p := range peers {
s, err := host.NewStream(ctx, p, PeXProtocol) s, err := host.NewStream(ctx, p, PeXProtocol)
if err != nil { if err != nil {
return err return nil, err
} }
s.Write([]byte("r\n")) s.Write([]byte("r\n"))
s.SetDeadline(time.Now().Add(10 * time.Second)) s.SetDeadline(time.Now().Add(10 * time.Second))
@ -76,9 +82,9 @@ func RequestPeX(ctx context.Context, host host.Host, peers []peer.ID) error {
for { for {
str, err := buf.ReadString('\n') str, err := buf.ReadString('\n')
if err == io.EOF { if err == io.EOF {
return nil return nil, err
} else if checkErrPeX(err, s) { } else if checkErrPeX(err, s) {
return err return nil, err
} }
str = strings.TrimSuffix(str, "\n") str = strings.TrimSuffix(str, "\n")
splits := strings.Split(str, "|") splits := strings.Split(str, "|")
@ -86,18 +92,20 @@ func RequestPeX(ctx context.Context, host host.Host, peers []peer.ID) error {
addrStr := splits[1] addrStr := splits[1]
peerId, err := peer.Decode(idStr) peerId, err := peer.Decode(idStr)
if checkErrPeX(err, s) { if checkErrPeX(err, s) {
return err return nil, err
} }
ma, err := multiaddr.NewMultiaddr(addrStr) ma, err := multiaddr.NewMultiaddr(addrStr)
if checkErrPeX(err, s) { if checkErrPeX(err, s) {
return err return nil, err
} }
fmt.Printf("[-] Got PeX peer: %s/p2p/%s\n", addrStr, idStr) fmt.Printf("[-] Got PeX peer: %s/p2p/%s\n", addrStr, idStr)
host.Peerstore().AddAddr(peerId, ma, 24*time.Hour) addrInfos = append(addrInfos, peer.AddrInfo{
host.Network().DialPeer(ctx, peerId) ID: peerId,
Addrs: []multiaddr.Multiaddr{ma},
})
} }
} }
return nil return addrInfos, nil
} }
func PeXService(ctx context.Context, host host.Host, cfg *config.Config) { func PeXService(ctx context.Context, host host.Host, cfg *config.Config) {
@ -115,13 +123,27 @@ func PeXService(ctx context.Context, host host.Host, cfg *config.Config) {
for _, vpnPeer := range cfg.Peers { for _, vpnPeer := range cfg.Peers {
if vpnPeer.ID == evt.Peer { if vpnPeer.ID == evt.Peer {
if evt.Connectedness == network.Connected { if evt.Connectedness == network.Connected {
go RequestPeX(ctx, host, []peer.ID{evt.Peer}) go func() {
addrInfos, err := RequestPeX(ctx, host, []peer.ID{evt.Peer})
if err != nil {
for _, addrInfo := range addrInfos {
host.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, 30*time.Second)
}
}
}()
} else if evt.Connectedness == network.NotConnected { } else if evt.Connectedness == network.NotConnected {
peers := []peer.ID{} peers := []peer.ID{}
for _, p := range cfg.Peers { for _, p := range cfg.Peers {
peers = append(peers, p.ID) peers = append(peers, p.ID)
} }
go RequestPeX(ctx, host, peers) go func() {
addrInfos, err := RequestPeX(ctx, host, peers)
if err != nil {
for _, addrInfo := range addrInfos {
host.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, 30*time.Second)
}
}
}()
} }
break break
} }
@ -129,3 +151,31 @@ func PeXService(ctx context.Context, host host.Host, cfg *config.Config) {
} }
} }
} }
func (pexr PeXRouting) FindPeer(ctx context.Context, targetPeer peer.ID) (peer.AddrInfo, error) {
found := false
peers := []peer.ID{}
addrInfo := peer.AddrInfo{
ID: targetPeer,
}
for _, p := range pexr.vpnPeers {
peers = append(peers, p.ID)
if p.ID == targetPeer {
found = true
}
}
// PeX routing only returns VPN node addresses
if !found {
return addrInfo, routing.ErrNotFound
}
addrInfos, err := RequestPeX(ctx, pexr.host, peers)
if err != nil {
return addrInfo, err
}
for _, ai := range addrInfos {
if ai.ID == targetPeer {
addrInfo.Addrs = append(addrInfo.Addrs, ai.Addrs...)
}
}
return addrInfo, nil
}

View file

@ -0,0 +1,52 @@
package p2p
import (
"net"
"sync"
"github.com/libp2p/go-libp2p/core/peer"
)
type Reroute struct {
Network net.IPNet
To peer.ID
}
var (
reroutes []Reroute
mut sync.Mutex
)
func findReroute(network net.IPNet, doDelete bool) (int, *Reroute, bool) {
for i, r := range reroutes {
bits1, _ := r.Network.Mask.Size()
bits2, _ := network.Mask.Size()
if r.Network.IP.Equal(network.IP) && bits1 == bits2 {
if doDelete {
reroutes = append(reroutes[:i], reroutes[i+1:]...)
}
return i, &r, true
}
}
return 0, nil, false
}
func FindReroute(network net.IPNet, doDelete bool) (*Reroute, bool) {
mut.Lock()
defer mut.Unlock()
_, i, r := findReroute(network, doDelete)
return i, r
}
func AddReroute(network net.IPNet, peerID peer.ID) {
mut.Lock()
defer mut.Unlock()
if i, _, found := findReroute(network, false); found {
reroutes[i].To = peerID
} else {
reroutes = append(reroutes, Reroute{
Network: network,
To: peerID,
})
}
}

View file

@ -1,52 +1,37 @@
package p2p package p2p
import ( import (
"net" "context"
"sync" "sync"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
) )
type Reroute struct { type ParallelRouting struct {
Network net.IPNet routings []routedhost.Routing
To peer.ID
} }
var ( func (pr ParallelRouting) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) {
reroutes []Reroute var wg sync.WaitGroup
mut sync.Mutex var mutex sync.Mutex
)
func findReroute(network net.IPNet, doDelete bool) (int, *Reroute, bool) { var info peer.AddrInfo
for i, r := range reroutes { info.ID = p
bits1, _ := r.Network.Mask.Size() for _, r := range pr.routings {
bits2, _ := network.Mask.Size() wg.Add(1)
if r.Network.IP.Equal(network.IP) && bits1 == bits2 { r2 := r
if doDelete { go func() {
reroutes = append(reroutes[:i], reroutes[i+1:]...) defer wg.Done()
ai, err := r2.FindPeer(ctx, p)
if err == nil {
mutex.Lock()
defer mutex.Unlock()
info.Addrs = append(info.Addrs, ai.Addrs...)
} }
return i, &r, true }()
}
} }
return 0, nil, false
}
func FindReroute(network net.IPNet, doDelete bool) (*Reroute, bool) { wg.Wait()
mut.Lock() return info, nil
defer mut.Unlock()
_, i, r := findReroute(network, doDelete)
return i, r
}
func AddReroute(network net.IPNet, peerID peer.ID) {
mut.Lock()
defer mut.Unlock()
if i, _, found := findReroute(network, false); found {
reroutes[i].To = peerID
} else {
reroutes = append(reroutes, Reroute{
Network: network,
To: peerID,
})
}
} }