nex-go/packet_resend_manager.go
2023-09-26 14:48:13 -04:00

114 lines
3.1 KiB
Go

package nex
import (
"time"
)
// PendingPacket represents a packet which the server has sent but not received an ACK for
// it handles it's own retransmission on a per-packet timer
type PendingPacket struct {
ticking bool
ticker *time.Ticker
quit chan struct{}
packet PacketInterface
iterations *Counter
timeout time.Duration
timeoutInc time.Duration
maxIterations int
}
// BeginTimeoutTimer starts the pending packets timeout timer until it is either stopped or maxIterations is hit
func (p *PendingPacket) BeginTimeoutTimer() {
go func() {
for {
select {
case <-p.quit:
return
case <-p.ticker.C:
client := p.packet.Sender()
server := client.Server()
if int(p.iterations.Increment()) > p.maxIterations {
// * Max iterations hit. Assume client is dead
server.TimeoutKick(client)
p.StopTimeoutTimer()
return
} else {
if p.timeoutInc != 0 {
p.timeout += p.timeoutInc
p.ticker.Reset(p.timeout)
}
// * Resend the packet
server.SendRaw(client.Address(), p.packet.Bytes())
}
}
}
}()
}
// StopTimeoutTimer stops the packet retransmission timer
func (p *PendingPacket) StopTimeoutTimer() {
if p.ticking {
close(p.quit)
p.ticker.Stop()
p.ticking = false
}
}
// NewPendingPacket returns a new PendingPacket
func NewPendingPacket(packet PacketInterface, timeoutTime time.Duration, timeoutIncrement time.Duration, maxIterations int) *PendingPacket {
p := &PendingPacket{
ticking: true,
ticker: time.NewTicker(timeoutTime),
quit: make(chan struct{}),
packet: packet,
iterations: NewCounter(0),
timeout: timeoutTime,
timeoutInc: timeoutIncrement,
maxIterations: maxIterations,
}
return p
}
// PacketResendManager manages all the pending packets sent the client waiting to be ACKed
type PacketResendManager struct {
pending *MutexMap[uint16, *PendingPacket]
timeoutTime time.Duration
timeoutInc time.Duration
maxIterations int
}
// Add creates a PendingPacket, adds it to the pool, and begins it's timeout timer
func (p *PacketResendManager) Add(packet PacketInterface) {
cached := NewPendingPacket(packet, p.timeoutTime, p.timeoutInc, p.maxIterations)
p.pending.Set(packet.SequenceID(), cached)
cached.BeginTimeoutTimer()
}
// Remove removes a packet from pool and stops it's timer
func (p *PacketResendManager) Remove(sequenceID uint16) {
if cached, ok := p.pending.Get(sequenceID); ok {
cached.StopTimeoutTimer()
p.pending.Delete(sequenceID)
}
}
// Clear removes all packets from pool and stops their timers
func (p *PacketResendManager) Clear() {
p.pending.Clear(func(key uint16, value *PendingPacket) {
value.StopTimeoutTimer()
})
}
// NewPacketResendManager returns a new PacketResendManager
func NewPacketResendManager(timeoutTime time.Duration, timeoutIncrement time.Duration, maxIterations int) *PacketResendManager {
return &PacketResendManager{
pending: NewMutexMap[uint16, *PendingPacket](),
timeoutTime: timeoutTime,
timeoutInc: timeoutIncrement,
maxIterations: maxIterations,
}
}