Your Web News in One Place

Help Webnuz

Referal links:

Sign up for GreenGeeks web hosting
April 11, 2022 11:28 am GMT

Implement a timing wheel for millions of concurrent tasks.

For systems that contain lots of delayed in-process tasks. If we use lots of timers to handle the tasks, there will be lots of idle goroutines and lots of memory consumed. Lots of gourtines also consume more CPU to schedule them.

This article introduces the TimingWheel in go-zero, which allows developers to schedule lots of delayed tasks. As for delayed tasks, two options are usually available.

  1. Timer, timers are used for one-off tasks. It represents a single event in the future. You tell the timer how long you want to wait, and it provides a channel that will be notified at that time.
  2. TimingWheel, which maintains an array of task groups, and each slot maintains a chain of stored tasks. When execution starts, the timer executes the tasks in one slot at specified intervals.

Option 2 reduces the maintenance of tasks from priority queue O(nlog(n)) to bidirectional linked table O(1), and the execution of tasks also requires only polling for tasks at one point in time O(N), without putting in and removing elements O(nlog(n)), as in the case of the priority queue.

Let's look at our own use of TimingWheel in go-zero:

TimingWheel in cache

Let's start with the use of TimingWheel in the cache of collection.

timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v interface{}) {  key, ok := k.(string)  if !ok {    return  }  cache.Del(key)})if err ! = nil {  return nil, err}cache.timingWheel = timingWheel

This is the initialization of cache which also initializes TimingWheel to clean the expired key.

  • interval: time interval to check the tasks
  • numSlots: the number of time slots
  • execute: the function to process tasks

The execution function in cache is deleting the expired key, and this expiration calls are controlled by TimingWheel to proceed.

Initialization

// The initialization of TimingWheelfunc newTimingWheelWithClock(interval time.Duration, numSlots int, execute Execute, ticker timex.Ticker) (  *TimingWheel, error) {  tw := &TimingWheel{    interval: interval, // time frame interval    ticker: ticker, // the ticker to trigger the execution    slots: make([]*list.List, numSlots), // the slots to put tasks    timers: NewSafeMap(), // map to store task with {key, value}    tickedPos: numSlots - 1, // the previous ticked position    execute: execute, // execute function    numSlots: numSlots, // the number of slots    setChannel: make(chan timingEntry), // the channel to set tasks    moveChannel: make(chan baseEntry), // the channel to move tasks    removeChannel: make(chan interface{}), // the channel to remove tasks    drainChannel: make(chan func(key, value interface{})), // the channel to drain tasks    stopChannel: make(chan lang.PlaceholderType), // the channel to stop TimingWheel  }  // Prepare all the lists stored in the slot  tw.initSlots()  // start asynchronous concurrent process, use channel for task communication and passing  go tw.run()  return tw, nil}

The above is a more visual representation of the "time wheel" of TimingWheel, and the details will be explained later around this diagram.

go tw.run() creates a goroutine to do the tick notification.

func (tw *TimingWheel) run() {  for {    select {    // Timer does time push -> scanAndRunTasks()    case <-tw.ticker.Chan():      tw.onTick()    // add task will input task to setChannel    case task := <-tw.setChannel:      tw.setTask(&task)      ...    }  }}

As you can see, the timer execution starts at initialization and spins in the internal time slot, and then the bottom keeps fetching the task from the list in the slot and giving it to the execute execution.

Task Operation

The next step is to set the cache key.

func (c *Cache) Set(key string, value interface{}) {  c.lock.  _, ok := c.data[key]  c.data[key] = value  c.lruCache.add(key)  c.lock.Unlock()  expiry := c.unstableExpiry.AroundDuration(c.expiry)  if ok {    c.timingWheel.MoveTimer(key, expiry)  } else {    c.timingWheel.SetTimer(key, value, expiry)  }}
  1. check if the key exists in the data map
  2. if it exists, update expire by calling MoveTimer()
  3. otherwise, set the key with expiry by calling SetTimer()

So the use of TimingWheel is clear, developers can add or update according to their needs.

Also, if we read the source code, we will find that SetTimer(), MoveTimer() is to send the task to the channel, and the task operation of the channel will be continuously taken out by the goroutine created in run().

SetTimer() -> setTask().

  • not exist task: getPostion -> pushBack to list -> setPosition
  • exist task: get from timers -> moveTask()

MoveTimer() -> moveTask()

From the above call chain, there is a function that is called by all: moveTask()

func (tw *TimingWheel) moveTask(task baseEntry) {  // timers: Map => get [positionEntry "pos, task"] by key  val, ok := tw.timers.Get(task.key)  if !ok {    return  }  timer := val.(*positionEntry)  // {delay < interval} => delay is less than a time frame interval,  // the task should be executed immediately  if task.delay < tw.interval {    threading.GoSafe(func() {      tw.execute(timer.item.key, timer.item.value)    })    return  }  // If > interval, calculate the new pos, circle out of the time wheel by delaying the time  pos, circle := tw.getPositionAndCircle(task.delay)  if pos >= timer.pos {    timer.item.circle = circle    // Record the offset of the move    timer.item.diff = pos - timer.pos  } else if circle > 0 {    // move to the level of (circle-1)    circle --    timer.item.circle = circle    // because it's an array, add numSlots [which is the equivalent of going to the next level]    timer.item.diff = tw.numSlots + pos - timer.pos  } else {    // If offset is ahead of schedule, task is still on the first level    // mark the old task for deletion and requeue it for execution    timer.item.removed = true    newItem := &timingEntry{      baseEntry: task,      value: timer.item.value,    }    tw.slots[pos].PushBack(newItem)    tw.setTimerPosition(pos, newItem)  }}

The above process has the following cases.

  • delay < internal: because < single time precision, it means that this task needs to be executed immediately
  • delay for changes.
    • new >= old: <newPos, newCircle, diff>
    • newCircle > 0: compute diff and convert circle to the next level, so diff + numslots
    • If the delay is simply shortened, remove the old task marker, rejoin the list, and wait for the next round of loops to be executed

Execute

Previously in the initialization, the timer in run() keeps advancing, and the process of advancing is mainly to pass the task in the list to the execution of execute func. Let's start with the execution of the timer.

// Timer 'execute every internal'func (tw *TimingWheel) onTick() {  // update the current tick position on every execution  tw.tickedPos = (tw.tickedPos + 1) % tw.numSlots  // Get the chain of tasks stored in the tick position at this time  l := tw.slots[tw.tickedPos]  tw.scanAndRunTasks(l)}

Immediately following this is how to execute execute.

func (tw *TimingWheel) scanAndRunTasks(l *list.List) {  // store the tasks {key, value} that currently need to be executed [the arguments needed by execute, passed in turn to execute]  var tasks []timingTask  for e := l.Front(); e ! = nil; {    task := e.Value.(*timingEntry)    // mark for deletion, do the real deletion in scan, delete data from map    if task.removed {      next := e.Next()      l.Remove(e)      tw.timers.Del(task.key)      e = next      continue    } else if task.circle > 0 {      // the current execution point has expired, but at the same time it's not in the first level,      // so even though the current level is done, it drops to the next level      // but it doesn't modify pos      task.circle--      e = e.Next()      continue    } else if task.diff > 0 {      // because the diff has already been marked, it needs to go into the queue again      next := e.Next()      l.Remove(e)      pos := (tw.tickedPos + task.diff) % tw.numSlots      tw.slots[pos].PushBack(task)      tw.setTimerPosition(pos, task)      task.diff = 0      e = next      continue    }    // the above cases are all non-executable cases, those that can be executed will be added to tasks    tasks = append(tasks, timingTask{      key: task.key,      value: task.value,    })    next := e.Next()    l.Remove(e)    tw.timers.Del(task.key)    e = next  }  // for range tasks, then just execute each task->execute  tw.runTasks(tasks)}

The specific branching situation is explained in the comments, which can be combined with the previous moveTask(), where circle descends and diff computation is the focus of the associated two functions.

As for the diff calculation, it involves the calculation of pos, circle.

// interval: 4min, d: 60min, numSlots: 16, tickedPos = 15// step = 15, pos = 14, circle = 0func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos int, circle int) {  steps := int(d / tw.interval)  pos = (tw.tickedPos + steps) % tw.numSlots  circle = (steps - 1) / tw.numSlots  return}

The above process can be simplified to the following.

steps = d / intervalpos = step % numSlots - 1circle = (step - 1) / numSlots

Summarize

  1. TimingWheel relies on the timer to drive the time forward while taking out the tasks from the doubly linked list in the current time frame and passing them to execute for execution. Because it is driven by internal fixed time step, there may be: a 60s task, internal = 1s, so it will run 59 times noop.

  2. in the expansion time, take circle layering, so that you can constantly reuse the original numSlots, because the timer is constantly loop through circle by circle. Any number of tasks can be put into the fixed size of slots. This design can break the long time limit without creating additional data structures.

Also in go-zero there are many practical toolkits, using them for improving service performance and development efficiency.

Project address

https://github.com/zeromicro/go-zero

Welcome to use go-zero and star to support us!


Original Link: https://dev.to/kevwan/implement-a-timing-wheel-for-millions-of-concurrent-tasks-30oi

Share this article:    Share on Facebook
View Full Article

Dev To

An online community for sharing and discovering great ideas, having debates, and making friends

More About this Source Visit Dev To