packages/hyprspace: implement RPC server

This commit is contained in:
Max Headroom 2023-01-20 23:36:31 +01:00
parent 9584c5f2f8
commit 88f402dbaa
4 changed files with 128 additions and 10 deletions

View file

@ -5,7 +5,7 @@ import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
"log" "io/fs"
"net" "net"
"os" "os"
"os/signal" "os/signal"
@ -18,12 +18,14 @@ import (
"github.com/DataDrake/cli-ng/v2/cmd" "github.com/DataDrake/cli-ng/v2/cmd"
"github.com/hyprspace/hyprspace/config" "github.com/hyprspace/hyprspace/config"
"github.com/hyprspace/hyprspace/p2p" "github.com/hyprspace/hyprspace/p2p"
hsrpc "github.com/hyprspace/hyprspace/rpc"
"github.com/hyprspace/hyprspace/tun" "github.com/hyprspace/hyprspace/tun"
dht "github.com/libp2p/go-libp2p-kad-dht" dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multibase" "github.com/multiformats/go-multibase"
) )
@ -36,6 +38,10 @@ var (
RevLookup map[string]string RevLookup map[string]string
// activeStreams is a map of active streams to a peer // activeStreams is a map of active streams to a peer
activeStreams map[string]network.Stream activeStreams map[string]network.Stream
// context
ctx context.Context
// context cancel function
ctxCancel func()
) )
// Up creates and brings up a Hyprspace Interface. // Up creates and brings up a Hyprspace Interface.
@ -112,7 +118,7 @@ func UpRun(r *cmd.Root, c *cmd.Sub) {
} }
// Setup System Context // Setup System Context
ctx := context.Background() ctx, ctxCancel = context.WithCancel(context.Background())
fmt.Println("[+] Creating LibP2P Node") fmt.Println("[+] Creating LibP2P Node")
@ -152,10 +158,13 @@ func UpRun(r *cmd.Root, c *cmd.Sub) {
lockPath := filepath.Join(filepath.Dir(cfg.Path), cfg.Interface.Name+".lock") lockPath := filepath.Join(filepath.Dir(cfg.Path), cfg.Interface.Name+".lock")
// Register the application to listen for signals // Register the application to listen for signals
go signalHandler(host, lockPath, dht) go signalHandler(ctx, host, lockPath, dht)
// Log about various events // Log about various events
go eventLogger(host, cfg) go eventLogger(ctx, host, cfg)
// RPC server
go hsrpc.RpcServer(ctx, multiaddr.StringCast(fmt.Sprintf("/unix/run/hyprspace-rpc.%s.sock", cfg.Interface.Name)), host, *cfg)
// Write lock to filesystem to indicate an existing running daemon. // Write lock to filesystem to indicate an existing running daemon.
err = os.WriteFile(lockPath, []byte(fmt.Sprint(os.Getpid())), os.ModePerm) err = os.WriteFile(lockPath, []byte(fmt.Sprint(os.Getpid())), os.ModePerm)
@ -183,8 +192,13 @@ func UpRun(r *cmd.Root, c *cmd.Sub) {
for { for {
// Read in a packet from the tun device. // Read in a packet from the tun device.
plen, err := tunDev.Iface.Read(packet) plen, err := tunDev.Iface.Read(packet)
if err != nil { if errors.Is(err, fs.ErrClosed) {
log.Println(err) fmt.Println("[-] Interface closed")
<-ctx.Done()
time.Sleep(1 * time.Second)
return
} else if err != nil {
fmt.Println(err)
continue continue
} }
@ -255,7 +269,7 @@ func UpRun(r *cmd.Root, c *cmd.Sub) {
} }
} }
func signalHandler(host host.Host, lockPath string, dht *dht.IpfsDHT) { func signalHandler(ctx context.Context, host host.Host, lockPath string, dht *dht.IpfsDHT) {
exitCh := make(chan os.Signal, 1) exitCh := make(chan os.Signal, 1)
rebootstrapCh := make(chan os.Signal, 1) rebootstrapCh := make(chan os.Signal, 1)
signal.Notify(exitCh, syscall.SIGINT, syscall.SIGTERM) signal.Notify(exitCh, syscall.SIGINT, syscall.SIGTERM)
@ -263,6 +277,8 @@ func signalHandler(host host.Host, lockPath string, dht *dht.IpfsDHT) {
for { for {
select { select {
case <-ctx.Done():
return
case <-rebootstrapCh: case <-rebootstrapCh:
fmt.Println("[-] Rebootstrapping on SIGUSR1") fmt.Println("[-] Rebootstrapping on SIGUSR1")
host.ConnManager().TrimOpenConns(context.Background()) host.ConnManager().TrimOpenConns(context.Background())
@ -279,17 +295,20 @@ func signalHandler(host host.Host, lockPath string, dht *dht.IpfsDHT) {
fmt.Println("Received signal, shutting down...") fmt.Println("Received signal, shutting down...")
// Exit the application. tunDev.Iface.Close()
os.Exit(0) tunDev.Down()
ctxCancel()
} }
} }
} }
func eventLogger(host host.Host, cfg *config.Config) { func eventLogger(ctx context.Context, host host.Host, cfg *config.Config) {
subCon, err := host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged)) subCon, err := host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
checkErr(err) checkErr(err)
for { for {
select { select {
case <-ctx.Done():
return
case ev := <-subCon.Out(): case ev := <-subCon.Out():
evt := ev.(event.EvtPeerConnectednessChanged) evt := ev.(event.EvtPeerConnectednessChanged)
for vpnIp, vpnPeer := range cfg.Peers { for vpnIp, vpnPeer := range cfg.Peers {

View file

@ -24,6 +24,7 @@
"cli" "cli"
"config" "config"
"p2p" "p2p"
"rpc"
"tun" "tun"
]); ]);
}; };

View file

@ -0,0 +1,81 @@
package rpc
import (
"context"
"fmt"
"log"
"net"
"net/rpc"
"os"
"github.com/hyprspace/hyprspace/config"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)
type HyprspaceRPC struct {
host host.Host
config config.Config
}
func (hsr *HyprspaceRPC) Status(args *Args, reply *StatusReply) error {
netPeersCurrent := 0
var netPeerAddrsCurrent []string
for _, id := range hsr.config.Peers {
peerId, err := peer.Decode(id.ID)
if err != nil {
return err
}
if hsr.host.Network().Connectedness(peerId) == network.Connected {
netPeersCurrent = netPeersCurrent + 1
for _, c := range hsr.host.Network().ConnsToPeer(peerId) {
netPeerAddrsCurrent = append(netPeerAddrsCurrent, fmt.Sprintf("%s/p2p/%s", c.RemoteMultiaddr().String(), peerId.String()))
}
}
}
var addrStrings []string
for _, ma := range hsr.host.Addrs() {
addrStrings = append(addrStrings, ma.String())
}
*reply = StatusReply{
hsr.host.ID().String(),
len(hsr.host.Network().Conns()),
netPeersCurrent,
netPeerAddrsCurrent,
len(hsr.config.Peers),
addrStrings,
}
return nil
}
func (hsr *HyprspaceRPC) Peers(args *Args, reply *PeersReply) error {
var peerAddrs []string
for _, c := range hsr.host.Network().Conns() {
peerAddrs = append(peerAddrs, fmt.Sprintf("%s/p2p/%s", c.RemoteMultiaddr().String(), c.RemotePeer().String()))
}
*reply = PeersReply{peerAddrs}
return nil
}
func RpcServer(ctx context.Context, ma multiaddr.Multiaddr, host host.Host, config config.Config) {
hsr := HyprspaceRPC{host, config}
rpc.Register(&hsr)
addr, err := ma.ValueForProtocol(multiaddr.P_UNIX)
if err != nil {
log.Fatal("[!] Failed to parse multiaddr: ", err)
}
var lc net.ListenConfig
l, err := lc.Listen(ctx, "unix", addr)
os.Chmod(addr, 0o0770)
if err != nil {
log.Fatal("[!] Failed to launch RPC server: ", err)
}
fmt.Println("[-] RPC server ready")
go rpc.Accept(l)
<-ctx.Done()
fmt.Println("[-] Closing RPC server")
l.Close()
}

View file

@ -0,0 +1,17 @@
package rpc
type Args struct {
}
type StatusReply struct {
PeerID string
SwarmPeersCurrent int
NetPeersCurrent int
NetPeerAddrsCurrent []string
NetPeersMax int
ListenAddrs []string
}
type PeersReply struct {
PeerAddrs []string
}