packages/hyprspace: lock reuseable streams

This commit is contained in:
Max Headroom 2024-04-22 09:17:06 +02:00
parent 56c171960e
commit 48454cc245
2 changed files with 34 additions and 18 deletions

View file

@ -11,6 +11,7 @@ import (
"os" "os"
"os/signal" "os/signal"
"path/filepath" "path/filepath"
"sync"
"syscall" "syscall"
"time" "time"
@ -30,6 +31,11 @@ import (
"github.com/yl2chen/cidranger" "github.com/yl2chen/cidranger"
) )
type MuxStream struct {
Stream *network.Stream
Lock *sync.Mutex
}
var ( var (
cfg *config.Config cfg *config.Config
node host.Host node host.Host
@ -37,7 +43,7 @@ var (
// Hyprspace and the user's machine. // Hyprspace and the user's machine.
tunDev *tun.TUN tunDev *tun.TUN
// activeStreams is a map of active streams to a peer // activeStreams is a map of active streams to a peer
activeStreams map[peer.ID]network.Stream activeStreams map[peer.ID]MuxStream
// context // context
ctx context.Context ctx context.Context
// context cancel function // context cancel function
@ -170,7 +176,7 @@ func UpRun(r *cmd.Root, c *cmd.Sub) {
// + ----------------------------------------+ // + ----------------------------------------+
// Initialize active streams map and packet byte array. // Initialize active streams map and packet byte array.
activeStreams = make(map[peer.ID]network.Stream) activeStreams = make(map[peer.ID]MuxStream)
var packet = make([]byte, 1420) var packet = make([]byte, 1420)
for { for {
// Read in a packet from the tun device. // Read in a packet from the tun device.
@ -203,24 +209,31 @@ func UpRun(r *cmd.Root, c *cmd.Sub) {
func sendPacket(dst peer.ID, packet []byte, plen int) { func sendPacket(dst peer.ID, packet []byte, plen int) {
// Check if we already have an open connection to the destination peer. // Check if we already have an open connection to the destination peer.
stream, ok := activeStreams[dst] ms, ok := activeStreams[dst]
if ok { if ok {
// Write out the packet's length to the libp2p stream to ensure if func() bool {
// we know the full size of the packet at the other end. ms.Lock.Lock()
err := binary.Write(stream, binary.LittleEndian, uint16(plen)) defer ms.Lock.Unlock()
if err == nil { // Write out the packet's length to the libp2p stream to ensure
// Write the packet out to the libp2p stream. // we know the full size of the packet at the other end.
// If everyting succeeds continue on to the next packet. err := binary.Write(*ms.Stream, binary.LittleEndian, uint16(plen))
_, err = stream.Write(packet[:plen])
if err == nil { if err == nil {
stream.SetWriteDeadline(time.Now().Add(25 * time.Second)) // Write the packet out to the libp2p stream.
return // If everyting succeeds continue on to the next packet.
_, err = (*ms.Stream).Write(packet[:plen])
if err == nil {
(*ms.Stream).SetWriteDeadline(time.Now().Add(25 * time.Second))
return true
}
} }
// If we encounter an error when writing to a stream we should
// close that stream and delete it from the active stream map.
(*ms.Stream).Close()
delete(activeStreams, dst)
return false
}() {
return
} }
// If we encounter an error when writing to a stream we should
// close that stream and delete it from the active stream map.
stream.Close()
delete(activeStreams, dst)
} }
stream, err := node.NewStream(ctx, dst, p2p.Protocol) stream, err := node.NewStream(ctx, dst, p2p.Protocol)
@ -245,7 +258,10 @@ func sendPacket(dst peer.ID, packet []byte, plen int) {
// If all succeeds when writing the packet to the stream // If all succeeds when writing the packet to the stream
// we should reuse this stream by adding it active streams map. // we should reuse this stream by adding it active streams map.
activeStreams[dst] = stream activeStreams[dst] = MuxStream{
Stream: &stream,
Lock: &sync.Mutex{},
}
} }
func signalHandler(ctx context.Context, host host.Host, lockPath string, dht *dht.IpfsDHT) { func signalHandler(ctx context.Context, host host.Host, lockPath string, dht *dht.IpfsDHT) {

View file

@ -10,7 +10,7 @@
}; };
packages.hyprspace = with pkgs; buildGo120Module rec { packages.hyprspace = with pkgs; buildGo120Module rec {
pname = "hyprspace"; pname = "hyprspace";
version = "0.8.4"; version = "0.8.5";
src = with inputs.nix-filter.lib; let src = with inputs.nix-filter.lib; let
dirs = map inDirectory; dirs = map inDirectory;