From fbe2b91e75eb431582dea8e7fb36de8163430088 Mon Sep 17 00:00:00 2001 From: Ash Logan Date: Fri, 3 Jan 2025 18:27:20 +1100 Subject: [PATCH 1/2] feat(endpoint): Route connection closures via new cleanupConnection method Centralising connection closures helps ensure that everything gets done (.cleanup()) and there's One True Way to find them in the Connections map It's worth noting that every callsite has a PRUDPConnection*, so maybe a different datastructure would serve us better here? --- prudp_connection.go | 4 +--- prudp_endpoint.go | 26 ++++++++++++++++++-------- timeout_manager.go | 8 +++----- websocket_server.go | 23 ++++++++++++----------- 4 files changed, 34 insertions(+), 27 deletions(-) diff --git a/prudp_connection.go b/prudp_connection.go index 11630aa..c205595 100644 --- a/prudp_connection.go +++ b/prudp_connection.go @@ -261,9 +261,7 @@ func (pc *PRUDPConnection) startHeartbeat() { // * If the heartbeat still did not restart, assume the // * connection is dead and clean up pc.pingKickTimer = time.AfterFunc(maxSilenceTime, func() { - pc.cleanup() // * "removed" event is dispatched here - - endpoint.deleteConnectionByID(pc.ID) + endpoint.cleanupConnection(pc) }) }) } diff --git a/prudp_endpoint.go b/prudp_endpoint.go index f4f17a1..b198d19 100644 --- a/prudp_endpoint.go +++ b/prudp_endpoint.go @@ -105,11 +105,24 @@ func (pep *PRUDPEndPoint) EmitError(err *Error) { } } -// deleteConnectionByID deletes the connection with the specified ID -func (pep *PRUDPEndPoint) deleteConnectionByID(cid uint32) { - pep.Connections.DeleteIf(func(key string, value *PRUDPConnection) bool { - return value.ID == cid +// cleanupConnection cleans up and deletes a connection from this endpoint. Will lock the Connections mutex - make sure +// you don't hold it during a call, or this will deadlock +func (pep *PRUDPEndPoint) cleanupConnection(connection *PRUDPConnection) { + discriminator := fmt.Sprintf("%s-%d-%d", connection.Socket.Address.String(), connection.StreamType, connection.StreamID) + + found := false + pep.Connections.RunAndDelete(discriminator, func(key string, conn *PRUDPConnection) { + found = true }) + + // * Probably this connection is on a different PRUDPEndPoint + if !found { + logger.Warningf("Tried to delete connection %v (ID %v) but it doesn't exist!", discriminator, connection.ID) + } + + // * We can't do this during RunAndDelete, since we hold the Connections mutex then + // * This way we avoid any recursive locking + connection.cleanup() } func (pep *PRUDPEndPoint) processPacket(packet PRUDPPacketInterface, socket *SocketConnection) { @@ -417,10 +430,7 @@ func (pep *PRUDPEndPoint) handleDisconnect(packet PRUDPPacketInterface) { streamID := packet.SourceVirtualPortStreamID() discriminator := fmt.Sprintf("%s-%d-%d", packet.Sender().Address().String(), streamType, streamID) if connection, ok := pep.Connections.Get(discriminator); ok { - // * We make sure to update the connection state here because we could still be attempting to - // * resend packets. - connection.cleanup() - pep.Connections.Delete(discriminator) + pep.cleanupConnection(connection) } pep.emit("disconnect", packet) diff --git a/timeout_manager.go b/timeout_manager.go index 2cacb9e..22fb943 100644 --- a/timeout_manager.go +++ b/timeout_manager.go @@ -53,10 +53,10 @@ func (tm *TimeoutManager) start(packet PRUDPPacketInterface) { } if tm.packets.Has(packet.SequenceID()) { + endpoint := packet.Sender().Endpoint().(*PRUDPEndPoint) + // * This is `<` instead of `<=` for accuracy with observed behavior, even though we're comparing send count vs _resend_ max if packet.SendCount() < tm.streamSettings.MaxPacketRetransmissions { - endpoint := packet.Sender().Endpoint().(*PRUDPEndPoint) - packet.incrementSendCount() packet.setSentAt(time.Now()) rto := endpoint.ComputeRetransmitTimeout(packet) @@ -76,9 +76,7 @@ func (tm *TimeoutManager) start(packet PRUDPPacketInterface) { server.sendRaw(connection.Socket, data) } else { // * Packet has been retried too many times, consider the connection dead - connection.Lock() - defer connection.Unlock() - connection.cleanup() + endpoint.cleanupConnection(connection) } } } diff --git a/websocket_server.go b/websocket_server.go index a25c644..ece53e5 100644 --- a/websocket_server.go +++ b/websocket_server.go @@ -22,25 +22,26 @@ func (wseh *wsEventHandler) OnOpen(socket *gws.Conn) { } func (wseh *wsEventHandler) OnClose(wsConn *gws.Conn, _ error) { - connections := make([]*PRUDPConnection, 0) - // * Loop over all connections on all endpoints wseh.prudpServer.Endpoints.Each(func(streamid uint8, pep *PRUDPEndPoint) bool { - return pep.Connections.Each(func(discriminator string, pc *PRUDPConnection) bool { + connections := make([]*PRUDPConnection, 0) + + pep.Connections.Each(func(discriminator string, pc *PRUDPConnection) bool { if pc.Socket.Address == wsConn.RemoteAddr() { connections = append(connections, pc) } return false }) - }) - // * We cannot modify a MutexMap while looping over it - // * since the mutex is locked. We first need to grab - // * the entries we want to delete, and then loop over - // * them here to actually clean them up - for _, connection := range connections { - connection.cleanup() // * "removed" event is dispatched here - } + // * We cannot modify a MutexMap while looping over it + // * since the mutex is locked. We first need to grab + // * the entries we want to delete, and then loop over + // * them here to actually clean them up + for _, connection := range connections { + pep.cleanupConnection(connection) // * "removed" event is dispatched here + } + return false + }) } func (wseh *wsEventHandler) OnPing(socket *gws.Conn, payload []byte) { From 9e5e0332b50ea93a6c6830575a0a29e6c80df0c6 Mon Sep 17 00:00:00 2001 From: Ash Logan Date: Fri, 3 Jan 2025 18:30:58 +1100 Subject: [PATCH 2/2] fix(endpoint): Check PID invariant in FindConnectionByPID An expected invariant of this system is that StateConnected connections also have a PID, and other states do *not* have a PID. While any code that breaks this invariant is buggy, we can still make FindConnectionByPID resilient to this to prevent that type of bug from taking the whole server down. That type of bug can still be detected through explicit testing of the Connections map. --- prudp_endpoint.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prudp_endpoint.go b/prudp_endpoint.go index b198d19..c1a2a0f 100644 --- a/prudp_endpoint.go +++ b/prudp_endpoint.go @@ -708,7 +708,7 @@ func (pep *PRUDPEndPoint) FindConnectionByPID(pid uint64) *PRUDPConnection { var connection *PRUDPConnection pep.Connections.Each(func(discriminator string, pc *PRUDPConnection) bool { - if pc.pid.Value() == pid { + if pc.pid.Value() == pid && pc.ConnectionState == StateConnected { connection = pc return true }