mirror of https://github.com/slackhq/nebula.git
226 lines
5.6 KiB
Go
226 lines
5.6 KiB
Go
package nebula
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// How many timer objects should be cached
|
|
const timerCacheMax = 50000
|
|
|
|
type TimerWheel[T any] struct {
|
|
// Current tick
|
|
current int
|
|
|
|
// Cheat on finding the length of the wheel
|
|
wheelLen int
|
|
|
|
// Last time we ticked, since we are lazy ticking
|
|
lastTick *time.Time
|
|
|
|
// Durations of a tick and the entire wheel
|
|
tickDuration time.Duration
|
|
wheelDuration time.Duration
|
|
|
|
// The actual wheel which is just a set of singly linked lists, head/tail pointers
|
|
wheel []*TimeoutList[T]
|
|
|
|
// Singly linked list of items that have timed out of the wheel
|
|
expired *TimeoutList[T]
|
|
|
|
// Item cache to avoid garbage collect
|
|
itemCache *TimeoutItem[T]
|
|
itemsCached int
|
|
}
|
|
|
|
type LockingTimerWheel[T any] struct {
|
|
m sync.Mutex
|
|
t *TimerWheel[T]
|
|
}
|
|
|
|
// TimeoutList Represents a tick in the wheel
|
|
type TimeoutList[T any] struct {
|
|
Head *TimeoutItem[T]
|
|
Tail *TimeoutItem[T]
|
|
}
|
|
|
|
// TimeoutItem Represents an item within a tick
|
|
type TimeoutItem[T any] struct {
|
|
Item T
|
|
Next *TimeoutItem[T]
|
|
}
|
|
|
|
// NewTimerWheel Builds a timer wheel and identifies the tick duration and wheel duration from the provided values
|
|
// Purge must be called once per entry to actually remove anything
|
|
// The TimerWheel does not handle concurrency on its own.
|
|
// Locks around access to it must be used if multiple routines are manipulating it.
|
|
func NewTimerWheel[T any](min, max time.Duration) *TimerWheel[T] {
|
|
//TODO provide an error
|
|
//if min >= max {
|
|
// return nil
|
|
//}
|
|
|
|
// Round down and add 2 so we can have the smallest # of ticks in the wheel and still account for a full
|
|
// max duration, even if our current tick is at the maximum position and the next item to be added is at maximum
|
|
// timeout
|
|
wLen := int((max / min) + 2)
|
|
|
|
tw := TimerWheel[T]{
|
|
wheelLen: wLen,
|
|
wheel: make([]*TimeoutList[T], wLen),
|
|
tickDuration: min,
|
|
wheelDuration: max,
|
|
expired: &TimeoutList[T]{},
|
|
}
|
|
|
|
for i := range tw.wheel {
|
|
tw.wheel[i] = &TimeoutList[T]{}
|
|
}
|
|
|
|
return &tw
|
|
}
|
|
|
|
// NewLockingTimerWheel is version of TimerWheel that is safe for concurrent use with a small performance penalty
|
|
func NewLockingTimerWheel[T any](min, max time.Duration) *LockingTimerWheel[T] {
|
|
return &LockingTimerWheel[T]{
|
|
t: NewTimerWheel[T](min, max),
|
|
}
|
|
}
|
|
|
|
// Add will add an item to the wheel in its proper timeout.
|
|
// Caller should Advance the wheel prior to ensure the proper slot is used.
|
|
func (tw *TimerWheel[T]) Add(v T, timeout time.Duration) *TimeoutItem[T] {
|
|
i := tw.findWheel(timeout)
|
|
|
|
// Try to fetch off the cache
|
|
ti := tw.itemCache
|
|
if ti != nil {
|
|
tw.itemCache = ti.Next
|
|
tw.itemsCached--
|
|
ti.Next = nil
|
|
} else {
|
|
ti = &TimeoutItem[T]{}
|
|
}
|
|
|
|
// Relink and return
|
|
ti.Item = v
|
|
if tw.wheel[i].Tail == nil {
|
|
tw.wheel[i].Head = ti
|
|
tw.wheel[i].Tail = ti
|
|
} else {
|
|
tw.wheel[i].Tail.Next = ti
|
|
tw.wheel[i].Tail = ti
|
|
}
|
|
|
|
return ti
|
|
}
|
|
|
|
// Purge removes and returns the first available expired item from the wheel and the 2nd argument is true.
|
|
// If no item is available then an empty T is returned and the 2nd argument is false.
|
|
func (tw *TimerWheel[T]) Purge() (T, bool) {
|
|
if tw.expired.Head == nil {
|
|
var na T
|
|
return na, false
|
|
}
|
|
|
|
ti := tw.expired.Head
|
|
tw.expired.Head = ti.Next
|
|
|
|
if tw.expired.Head == nil {
|
|
tw.expired.Tail = nil
|
|
}
|
|
|
|
// Clear out the items references
|
|
ti.Next = nil
|
|
|
|
// Maybe cache it for later
|
|
if tw.itemsCached < timerCacheMax {
|
|
ti.Next = tw.itemCache
|
|
tw.itemCache = ti
|
|
tw.itemsCached++
|
|
}
|
|
|
|
return ti.Item, true
|
|
}
|
|
|
|
// findWheel find the next position in the wheel for the provided timeout given the current tick
|
|
func (tw *TimerWheel[T]) findWheel(timeout time.Duration) (i int) {
|
|
if timeout < tw.tickDuration {
|
|
// Can't track anything below the set resolution
|
|
timeout = tw.tickDuration
|
|
} else if timeout > tw.wheelDuration {
|
|
// We aren't handling timeouts greater than the wheels duration
|
|
timeout = tw.wheelDuration
|
|
}
|
|
|
|
// Find the next highest, rounding up
|
|
tick := int(((timeout - 1) / tw.tickDuration) + 1)
|
|
|
|
// Add another tick since the current tick may almost be over then map it to the wheel from our
|
|
// current position
|
|
tick += tw.current + 1
|
|
if tick >= tw.wheelLen {
|
|
tick -= tw.wheelLen
|
|
}
|
|
|
|
return tick
|
|
}
|
|
|
|
// Advance will move the wheel forward by the appropriate number of ticks for the provided time and all items
|
|
// passed over will be moved to the expired list. Calling Purge is necessary to remove them entirely.
|
|
func (tw *TimerWheel[T]) Advance(now time.Time) {
|
|
if tw.lastTick == nil {
|
|
tw.lastTick = &now
|
|
}
|
|
|
|
// We want to round down
|
|
ticks := int(now.Sub(*tw.lastTick) / tw.tickDuration)
|
|
adv := ticks
|
|
if ticks > tw.wheelLen {
|
|
ticks = tw.wheelLen
|
|
}
|
|
|
|
for i := 0; i < ticks; i++ {
|
|
tw.current++
|
|
if tw.current >= tw.wheelLen {
|
|
tw.current = 0
|
|
}
|
|
|
|
if tw.wheel[tw.current].Head != nil {
|
|
// We need to append the expired items as to not starve evicting the oldest ones
|
|
if tw.expired.Tail == nil {
|
|
tw.expired.Head = tw.wheel[tw.current].Head
|
|
tw.expired.Tail = tw.wheel[tw.current].Tail
|
|
} else {
|
|
tw.expired.Tail.Next = tw.wheel[tw.current].Head
|
|
tw.expired.Tail = tw.wheel[tw.current].Tail
|
|
}
|
|
|
|
tw.wheel[tw.current].Head = nil
|
|
tw.wheel[tw.current].Tail = nil
|
|
}
|
|
}
|
|
|
|
// Advance the tick based on duration to avoid losing some accuracy
|
|
newTick := tw.lastTick.Add(tw.tickDuration * time.Duration(adv))
|
|
tw.lastTick = &newTick
|
|
}
|
|
|
|
func (lw *LockingTimerWheel[T]) Add(v T, timeout time.Duration) *TimeoutItem[T] {
|
|
lw.m.Lock()
|
|
defer lw.m.Unlock()
|
|
return lw.t.Add(v, timeout)
|
|
}
|
|
|
|
func (lw *LockingTimerWheel[T]) Purge() (T, bool) {
|
|
lw.m.Lock()
|
|
defer lw.m.Unlock()
|
|
return lw.t.Purge()
|
|
}
|
|
|
|
func (lw *LockingTimerWheel[T]) Advance(now time.Time) {
|
|
lw.m.Lock()
|
|
defer lw.m.Unlock()
|
|
lw.t.Advance(now)
|
|
}
|