diff --git a/packages/networking/hyprspace/cli/up.go b/packages/networking/hyprspace/cli/up.go index 5910132..803ad01 100644 --- a/packages/networking/hyprspace/cli/up.go +++ b/packages/networking/hyprspace/cli/up.go @@ -11,6 +11,7 @@ import ( "os" "os/signal" "path/filepath" + "sync" "syscall" "time" @@ -30,6 +31,11 @@ import ( "github.com/yl2chen/cidranger" ) +type MuxStream struct { + Stream *network.Stream + Lock *sync.Mutex +} + var ( cfg *config.Config node host.Host @@ -37,7 +43,7 @@ var ( // Hyprspace and the user's machine. tunDev *tun.TUN // activeStreams is a map of active streams to a peer - activeStreams map[peer.ID]network.Stream + activeStreams map[peer.ID]MuxStream // context ctx context.Context // context cancel function @@ -170,7 +176,7 @@ func UpRun(r *cmd.Root, c *cmd.Sub) { // + ----------------------------------------+ // Initialize active streams map and packet byte array. - activeStreams = make(map[peer.ID]network.Stream) + activeStreams = make(map[peer.ID]MuxStream) var packet = make([]byte, 1420) for { // Read in a packet from the tun device. @@ -203,24 +209,31 @@ func UpRun(r *cmd.Root, c *cmd.Sub) { func sendPacket(dst peer.ID, packet []byte, plen int) { // Check if we already have an open connection to the destination peer. - stream, ok := activeStreams[dst] + ms, ok := activeStreams[dst] if ok { - // Write out the packet's length to the libp2p stream to ensure - // we know the full size of the packet at the other end. - err := binary.Write(stream, binary.LittleEndian, uint16(plen)) - if err == nil { - // Write the packet out to the libp2p stream. - // If everyting succeeds continue on to the next packet. - _, err = stream.Write(packet[:plen]) + if func() bool { + ms.Lock.Lock() + defer ms.Lock.Unlock() + // Write out the packet's length to the libp2p stream to ensure + // we know the full size of the packet at the other end. + err := binary.Write(*ms.Stream, binary.LittleEndian, uint16(plen)) if err == nil { - stream.SetWriteDeadline(time.Now().Add(25 * time.Second)) - return + // Write the packet out to the libp2p stream. + // If everyting succeeds continue on to the next packet. + _, err = (*ms.Stream).Write(packet[:plen]) + if err == nil { + (*ms.Stream).SetWriteDeadline(time.Now().Add(25 * time.Second)) + return true + } } + // If we encounter an error when writing to a stream we should + // close that stream and delete it from the active stream map. + (*ms.Stream).Close() + delete(activeStreams, dst) + return false + }() { + return } - // If we encounter an error when writing to a stream we should - // close that stream and delete it from the active stream map. - stream.Close() - delete(activeStreams, dst) } stream, err := node.NewStream(ctx, dst, p2p.Protocol) @@ -245,7 +258,10 @@ func sendPacket(dst peer.ID, packet []byte, plen int) { // If all succeeds when writing the packet to the stream // we should reuse this stream by adding it active streams map. - activeStreams[dst] = stream + activeStreams[dst] = MuxStream{ + Stream: &stream, + Lock: &sync.Mutex{}, + } } func signalHandler(ctx context.Context, host host.Host, lockPath string, dht *dht.IpfsDHT) { diff --git a/packages/networking/hyprspace/project.nix b/packages/networking/hyprspace/project.nix index a95eb84..9462be0 100644 --- a/packages/networking/hyprspace/project.nix +++ b/packages/networking/hyprspace/project.nix @@ -10,7 +10,7 @@ }; packages.hyprspace = with pkgs; buildGo120Module rec { pname = "hyprspace"; - version = "0.8.4"; + version = "0.8.5"; src = with inputs.nix-filter.lib; let dirs = map inDirectory;