From 90d81f6eb9fa7d6cee4440aa560fbe184a5af46d Mon Sep 17 00:00:00 2001 From: PabloMK7 Date: Thu, 18 Apr 2024 14:47:40 +0200 Subject: [PATCH] Implement and use deleteConnectionByID --- mutex_map.go | 17 +++++++++++++++++ prudp_connection.go | 5 +---- prudp_endpoint.go | 7 +++++++ resend_scheduler.go | 7 +------ 4 files changed, 26 insertions(+), 10 deletions(-) diff --git a/mutex_map.go b/mutex_map.go index b2d076f..372a31c 100644 --- a/mutex_map.go +++ b/mutex_map.go @@ -43,6 +43,23 @@ func (m *MutexMap[K, V]) Delete(key K) { delete(m.real, key) } +// DeleteIf deletes every element if the callback returns true. +// Returns the amount of elements deleted. +func (m *MutexMap[K, V]) DeleteIf(callback func(key K, value V) bool) int { + m.Lock() + defer m.Unlock() + + amout := 0 + for key, value := range m.real { + if callback(key, value) { + delete(m.real, key) + amout++ + } + } + + return amout +} + // RunAndDelete runs a callback and removes the key afterwards func (m *MutexMap[K, V]) RunAndDelete(key K, callback func(key K, value V)) { m.Lock() diff --git a/prudp_connection.go b/prudp_connection.go index 5bdea13..db5180e 100644 --- a/prudp_connection.go +++ b/prudp_connection.go @@ -2,7 +2,6 @@ package nex import ( "crypto/md5" - "fmt" "net" "time" @@ -195,9 +194,7 @@ func (pc *PRUDPConnection) startHeartbeat() { pc.pingKickTimer = time.AfterFunc(maxSilenceTime, func() { pc.cleanup() // * "removed" event is dispatched here - discriminator := fmt.Sprintf("%s-%d-%d", pc.Socket.Address.String(), pc.StreamType, pc.StreamID) - - endpoint.Connections.Delete(discriminator) + endpoint.deleteConnectionByID(pc.ID) }) }) } diff --git a/prudp_endpoint.go b/prudp_endpoint.go index a8c2e00..32f56ed 100644 --- a/prudp_endpoint.go +++ b/prudp_endpoint.go @@ -100,6 +100,13 @@ func (pep *PRUDPEndPoint) EmitError(err *Error) { } } +// deleteConnectionByID deletes the connection with the specified ID +func (pep *PRUDPEndPoint) deleteConnectionByID(cid uint32) { + pep.Connections.DeleteIf(func(key string, value *PRUDPConnection) bool { + return value.ID == cid + }) +} + func (pep *PRUDPEndPoint) processPacket(packet PRUDPPacketInterface, socket *SocketConnection) { streamType := packet.SourceVirtualPortStreamType() streamID := packet.SourceVirtualPortStreamID() diff --git a/resend_scheduler.go b/resend_scheduler.go index 66cce22..812f701 100644 --- a/resend_scheduler.go +++ b/resend_scheduler.go @@ -1,7 +1,6 @@ package nex import ( - "fmt" "time" ) @@ -105,11 +104,7 @@ func (rs *ResendScheduler) resendPacket(pendingPacket *PendingPacket) { rs.packets.Delete(packet.SequenceID()) connection.cleanup() // * "removed" event is dispatched here - streamType := packet.SourceVirtualPortStreamType() - streamID := packet.SourceVirtualPortStreamID() - discriminator := fmt.Sprintf("%s-%d-%d", packet.Sender().Address().String(), streamType, streamID) - - connection.endpoint.Connections.Delete(discriminator) + connection.endpoint.deleteConnectionByID(connection.ID) return }