From 7520d134b18d7df846b1f1466c5a078108bd6633 Mon Sep 17 00:00:00 2001 From: Max Date: Mon, 23 Jan 2023 01:48:56 +0100 Subject: [PATCH] packages/hyprspace: implement route metrics service --- packages/networking/hyprspace/cli/up.go | 3 ++ packages/networking/hyprspace/p2p/metrics.go | 55 ++++++++++++++++++++ 2 files changed, 58 insertions(+) create mode 100644 packages/networking/hyprspace/p2p/metrics.go diff --git a/packages/networking/hyprspace/cli/up.go b/packages/networking/hyprspace/cli/up.go index 4f7f2b1..07eb54e 100644 --- a/packages/networking/hyprspace/cli/up.go +++ b/packages/networking/hyprspace/cli/up.go @@ -124,6 +124,9 @@ func UpRun(r *cmd.Root, c *cmd.Sub) { // PeX go p2p.PeXService(ctx, host, cfg) + // Route metrics and latency + go p2p.RouteMetricsService(ctx, host, cfg) + // Register the application to listen for signals go signalHandler(ctx, host, lockPath, dht) diff --git a/packages/networking/hyprspace/p2p/metrics.go b/packages/networking/hyprspace/p2p/metrics.go new file mode 100644 index 0000000..bd82da5 --- /dev/null +++ b/packages/networking/hyprspace/p2p/metrics.go @@ -0,0 +1,55 @@ +package p2p + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/hyprspace/hyprspace/config" + "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" +) + +func RouteMetricsService(ctx context.Context, host host.Host, cfg *config.Config) { + subCon, err := host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged)) + if err != nil { + log.Fatal(err) + } + fmt.Println("[-] Route metrics service ready") + for { + select { + case <-ctx.Done(): + return + case ev := <-subCon.Out(): + evt := ev.(event.EvtPeerConnectednessChanged) + _, found := config.FindPeer(cfg.Peers, evt.Peer) + if found { + if evt.Connectedness == network.Connected { + ctx2, cancel := context.WithDeadline(ctx, time.Now().Add(30*time.Second)) + ch := ping.Ping(ctx2, host, evt.Peer) + go func() { + t := time.NewTimer(15 * time.Second) + for { + select { + case <-t.C: + cancel() + case <-ctx2.Done(): + return + case res := <-ch: + if res.Error == nil { + host.Peerstore().RecordLatency(evt.Peer, res.RTT) + } + time.Sleep(5 * time.Second) + } + } + }() + // wait a little before spawning another ping goroutine + time.Sleep(1 * time.Second) + } + } + } + } +}