An Interest In:
Web News this Week
- April 27, 2024
- April 26, 2024
- April 25, 2024
- April 24, 2024
- April 23, 2024
- April 22, 2024
- April 21, 2024
Implement a timing wheel for millions of concurrent tasks.
For systems that contain lots of delayed in-process tasks. If we use lots of timers to handle the tasks, there will be lots of idle goroutines and lots of memory consumed. Lots of gourtines also consume more CPU to schedule them.
This article introduces the TimingWheel in go-zero
, which allows developers to schedule lots of delayed tasks. As for delayed tasks, two options are usually available.
Timer
, timers are used for one-off tasks. It represents a single event in the future. You tell the timer how long you want to wait, and it provides a channel that will be notified at that time.TimingWheel
, which maintains an array of task groups, and each slot maintains a chain of stored tasks. When execution starts, the timer executes the tasks in one slot at specified intervals.
Option 2 reduces the maintenance of tasks from priority queue O(nlog(n))
to bidirectional linked table O(1)
, and the execution of tasks also requires only polling for tasks at one point in time O(N)
, without putting in and removing elements O(nlog(n))
, as in the case of the priority queue.
Let's look at our own use of TimingWheel
in go-zero
:
TimingWheel in cache
Let's start with the use of TimingWheel
in the cache
of collection
.
timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v interface{}) { key, ok := k.(string) if !ok { return } cache.Del(key)})if err ! = nil { return nil, err}cache.timingWheel = timingWheel
This is the initialization of cache
which also initializes TimingWheel
to clean the expired key.
interval
: time interval to check the tasksnumSlots
: the number of time slotsexecute
: the function to process tasks
The execution function in cache
is deleting the expired key, and this expiration calls are controlled by TimingWheel
to proceed.
Initialization
// The initialization of TimingWheelfunc newTimingWheelWithClock(interval time.Duration, numSlots int, execute Execute, ticker timex.Ticker) ( *TimingWheel, error) { tw := &TimingWheel{ interval: interval, // time frame interval ticker: ticker, // the ticker to trigger the execution slots: make([]*list.List, numSlots), // the slots to put tasks timers: NewSafeMap(), // map to store task with {key, value} tickedPos: numSlots - 1, // the previous ticked position execute: execute, // execute function numSlots: numSlots, // the number of slots setChannel: make(chan timingEntry), // the channel to set tasks moveChannel: make(chan baseEntry), // the channel to move tasks removeChannel: make(chan interface{}), // the channel to remove tasks drainChannel: make(chan func(key, value interface{})), // the channel to drain tasks stopChannel: make(chan lang.PlaceholderType), // the channel to stop TimingWheel } // Prepare all the lists stored in the slot tw.initSlots() // start asynchronous concurrent process, use channel for task communication and passing go tw.run() return tw, nil}
The above is a more visual representation of the "time wheel" of TimingWheel
, and the details will be explained later around this diagram.
go tw.run()
creates a goroutine to do the tick notification.
func (tw *TimingWheel) run() { for { select { // Timer does time push -> scanAndRunTasks() case <-tw.ticker.Chan(): tw.onTick() // add task will input task to setChannel case task := <-tw.setChannel: tw.setTask(&task) ... } }}
As you can see, the timer
execution starts at initialization and spins in the internal
time slot, and then the bottom keeps fetching the task from the list
in the slot
and giving it to the execute
execution.
Task Operation
The next step is to set the cache key
.
func (c *Cache) Set(key string, value interface{}) { c.lock. _, ok := c.data[key] c.data[key] = value c.lruCache.add(key) c.lock.Unlock() expiry := c.unstableExpiry.AroundDuration(c.expiry) if ok { c.timingWheel.MoveTimer(key, expiry) } else { c.timingWheel.SetTimer(key, value, expiry) }}
- check if the key exists in the
data map
- if it exists, update
expire
by callingMoveTimer()
- otherwise, set the key with expiry by calling
SetTimer()
So the use of TimingWheel
is clear, developers can add
or update
according to their needs.
Also, if we read the source code, we will find that SetTimer()
, MoveTimer()
is to send the task to the channel, and the task operation of the channel
will be continuously taken out by the goroutine created in run()
.
SetTimer() -> setTask()
.
- not exist task:
getPostion -> pushBack to list -> setPosition
- exist task:
get from timers -> moveTask()
MoveTimer() -> moveTask()
From the above call chain, there is a function that is called by all: moveTask()
func (tw *TimingWheel) moveTask(task baseEntry) { // timers: Map => get [positionEntry "pos, task"] by key val, ok := tw.timers.Get(task.key) if !ok { return } timer := val.(*positionEntry) // {delay < interval} => delay is less than a time frame interval, // the task should be executed immediately if task.delay < tw.interval { threading.GoSafe(func() { tw.execute(timer.item.key, timer.item.value) }) return } // If > interval, calculate the new pos, circle out of the time wheel by delaying the time pos, circle := tw.getPositionAndCircle(task.delay) if pos >= timer.pos { timer.item.circle = circle // Record the offset of the move timer.item.diff = pos - timer.pos } else if circle > 0 { // move to the level of (circle-1) circle -- timer.item.circle = circle // because it's an array, add numSlots [which is the equivalent of going to the next level] timer.item.diff = tw.numSlots + pos - timer.pos } else { // If offset is ahead of schedule, task is still on the first level // mark the old task for deletion and requeue it for execution timer.item.removed = true newItem := &timingEntry{ baseEntry: task, value: timer.item.value, } tw.slots[pos].PushBack(newItem) tw.setTimerPosition(pos, newItem) }}
The above process has the following cases.
delay < internal
: because < single time precision, it means that this task needs to be executed immediatelydelay
for changes.new >= old
:<newPos, newCircle, diff>
newCircle > 0
: compute diff and convert circle to the next level, so diff + numslots- If the delay is simply shortened, remove the old task marker, rejoin the list, and wait for the next round of loops to be executed
Execute
Previously in the initialization, the timer in run()
keeps advancing, and the process of advancing is mainly to pass the task in the list to the execution of execute func
. Let's start with the execution of the timer.
// Timer 'execute every internal'func (tw *TimingWheel) onTick() { // update the current tick position on every execution tw.tickedPos = (tw.tickedPos + 1) % tw.numSlots // Get the chain of tasks stored in the tick position at this time l := tw.slots[tw.tickedPos] tw.scanAndRunTasks(l)}
Immediately following this is how to execute execute
.
func (tw *TimingWheel) scanAndRunTasks(l *list.List) { // store the tasks {key, value} that currently need to be executed [the arguments needed by execute, passed in turn to execute] var tasks []timingTask for e := l.Front(); e ! = nil; { task := e.Value.(*timingEntry) // mark for deletion, do the real deletion in scan, delete data from map if task.removed { next := e.Next() l.Remove(e) tw.timers.Del(task.key) e = next continue } else if task.circle > 0 { // the current execution point has expired, but at the same time it's not in the first level, // so even though the current level is done, it drops to the next level // but it doesn't modify pos task.circle-- e = e.Next() continue } else if task.diff > 0 { // because the diff has already been marked, it needs to go into the queue again next := e.Next() l.Remove(e) pos := (tw.tickedPos + task.diff) % tw.numSlots tw.slots[pos].PushBack(task) tw.setTimerPosition(pos, task) task.diff = 0 e = next continue } // the above cases are all non-executable cases, those that can be executed will be added to tasks tasks = append(tasks, timingTask{ key: task.key, value: task.value, }) next := e.Next() l.Remove(e) tw.timers.Del(task.key) e = next } // for range tasks, then just execute each task->execute tw.runTasks(tasks)}
The specific branching situation is explained in the comments, which can be combined with the previous moveTask()
, where circle
descends and diff
computation is the focus of the associated two functions.
As for the diff
calculation, it involves the calculation of pos, circle
.
// interval: 4min, d: 60min, numSlots: 16, tickedPos = 15// step = 15, pos = 14, circle = 0func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos int, circle int) { steps := int(d / tw.interval) pos = (tw.tickedPos + steps) % tw.numSlots circle = (steps - 1) / tw.numSlots return}
The above process can be simplified to the following.
steps = d / intervalpos = step % numSlots - 1circle = (step - 1) / numSlots
Summarize
TimingWheel
relies on the timer to drive the time forward while taking out the tasks from thedoubly linked list
in the current time frame and passing them toexecute
for execution. Because it is driven byinternal
fixed time step, there may be: a 60s task,internal = 1s
, so it will run 59 times noop.in the expansion time, take
circle
layering, so that you can constantly reuse the originalnumSlots
, because the timer is constantlyloop
through circle by circle. Any number of tasks can be put into the fixed size ofslots
. This design can break the long time limit without creating additional data structures.
Also in
go-zero
there are many practical toolkits, using them for improving service performance and development efficiency.
Project address
https://github.com/zeromicro/go-zero
Welcome to use go-zero
and star to support us!
Original Link: https://dev.to/kevwan/implement-a-timing-wheel-for-millions-of-concurrent-tasks-30oi
Dev To
An online community for sharing and discovering great ideas, having debates, and making friendsMore About this Source Visit Dev To