定時器是什么
Golang 原生 time 包下可以用來執(zhí)行一些定時任務或者是周期性的任務的一個工具
本文基于 Go 1.14,如果以下文章有哪里不對或者問題的地方,歡迎討論學習
定時器的日常使用
Timer 相關
func NewTimer(d Duration) *Timer
func (t *Timer) Reset(d Duration) bool
func (t *Timer) Stop() bool
func After(d Duration) -chan Time
func AfterFunc(d Duration, f func()) *Timer
func main() {
timer := time.NewTimer(3 * time.Second)
select {
case -timer.C:
fmt.Println("3秒執(zhí)行任務")
}
timer.Stop() // 這里來提高 timer 的回收
}
func main() {
tChannel := time.After(3 * time.Second) // 其內(nèi)部其實是生成了一個 timer
select {
case -tChannel:
fmt.Println("3秒執(zhí)行任務")
}
}
func main() {
timer := time.NewTimer(3 * time.Second)
for {
timer.Reset(4 * time.Second) // 這樣來復用 timer 和修改執(zhí)行時間
select {
case -timer.C:
fmt.Println("每隔4秒執(zhí)行任務")
}
}
}
注意事項:
錯誤使用:time.After 這里會不斷生成 timer,雖然最終會回收,但是會造成無意義的cpu資源消耗
func main() {
for {
select {
case -time.After(3 * time.Second):
fmt.Println("每隔3秒執(zhí)行一次")
}
}
}
正確使用:
func main() {
timer := time.NewTimer(3 * time.Second)
for {
timer.Reset(3 * time.Second) // 這里復用了 timer
select {
case -timer.C:
fmt.Println("每隔3秒執(zhí)行一次")
}
}
}
Ticker 相關
func NewTicker(d Duration) *Ticker
func Tick(d Duration) -chan Time
func (t *Ticker) Stop()
func main() {
ticker := time.NewTicker(3 * time.Second)
for range ticker.C {
fmt.Print("每隔3秒執(zhí)行任務")
}
ticker.Stop()
}
錯誤使用:
func main() {
for {
select {
case -time.Tick(3 * time.Second): // 這里會不斷生成 ticker,而且 ticker 會進行重新調(diào)度,造成泄漏(后面源碼會有解析)
fmt.Println("每隔3秒執(zhí)行一次")
}
}
}
定時器源碼分析
我先給出涉及到過程的相關結(jié)構(gòu)體(!??!要注意 Timer 和 timer 的不同)
type Timer struct {
C -chan Time
r runtimeTimer
}
// Ticker 的結(jié)構(gòu)與 Timer 一致
type Ticker struct {
C -chan Time // 這里就是返回的 channel
r runtimeTimer
}
// If this struct changes,
// adjust ../time/sleep.go:/runtimeTimer.
// 這里是與 runtimeTimer 對應的
type timer struct {
pp puintptr // 對應的當前 P 的指針
when int64 // 需要執(zhí)行的時間
period int64 // 周期,Ticker 會使用
f func(interface{}, uintptr) // 給 channel 推送信息的方式
arg interface{} // 與 f 相關的第一個參數(shù),可以看下面 Ticker 的例子
seq uintptr // 與 f 相關的第二個參數(shù)(后續(xù)我們可以看到)
nextwhen int64 // 下次執(zhí)行的時候
status uint32 // 當前狀態(tài)
}
// P 結(jié)構(gòu)體中的相關 timer 的字段
type p struct {
...
timersLock mutex // 一個 P 中保證 timers 同步鎖
timers []*timer // timers 是四叉小頂堆(后續(xù)代碼會有說明)
numTimers uint32 // timer 的數(shù)量
adjustTimers uint32 // 需要調(diào)整的 timer 的數(shù)量
deletedTimers uint32 // 需要刪除的 timer 的數(shù)量
...
}
我們以 Ticker 為切入點
func NewTicker(d Duration) *Ticker {
if d = 0 {
panic(errors.New("non-positive interval for NewTicker"))
}
c := make(chan Time, 1)
t := Ticker{
C: c,
r: runtimeTimer{
when: when(d),//當前時間+d的時間,可看下面
period: int64(d),//執(zhí)行周期
f: sendTime,
arg: c, // 就是 f 中第一個參數(shù)
},
}
startTimer(t.r)
return t
}
func when(d Duration) int64 {
if d = 0 {
return runtimeNano()
}
t := runtimeNano() + int64(d) //當前時間加上需要等待的時間
if t 0 {
t = 163 - 1 // math.MaxInt64
}
return t
}
func sendTime(c interface{}, seq uintptr) {
select {
case c.(chan Time) - Now():
default:
}
}
從 NewTicker 中我們可以看到,開始執(zhí)行是在 startTimer(),我們進去看下
addtimer
// startTimer adds t to the timer heap.
// 這里已經(jīng)說明了 timers 是一種堆的數(shù)據(jù)結(jié)構(gòu),由于是定時器,
// 最近的最先執(zhí)行,所以猜測以 when 來判斷的小頂堆
func startTimer(t *timer) {
addtimer(t)
}
func addtimer(t *timer) {
if t.when 0 {
t.when = maxWhen //maxWhen 是 163 - 1
}
if t.status != timerNoStatus {
throw("addtimer called with initialized timer")
}
t.status = timerWaiting
when := t.when
pp := getg().m.p.ptr()
lock(pp.timersLock)
cleantimers(pp) // 根據(jù) timer 刪除和修改狀態(tài)進行操作,可以看下面源碼相關
doaddtimer(pp, t)// 添加 timer 的到 timers 堆
unlock(pp.timersLock)
wakeNetPoller(when)
}
// 清理 timers 的源碼部分
func cleantimers(pp *p) {
for {
if len(pp.timers) == 0 {
return
}
t := pp.timers[0]// 從 0 開始,即最小的堆頂開始
if t.pp.ptr() != pp {
throw("cleantimers: bad p")
}
switch s := atomic.Load(t.status); s {
case timerDeleted:
if !atomic.Cas(t.status, s, timerRemoving) {// status 變更為 timerRemoving
continue
}
dodeltimer0(pp) // 這里是刪除 timer 的關鍵部分,刪除堆頂?shù)牟糠植⒄{(diào)整
if !atomic.Cas(t.status, timerRemoving, timerRemoved) { // stauts 變更為 timerRemoved
badTimer() // 這里就是 throw 一個異常
}
atomic.Xadd(pp.deletedTimers, -1)
case timerModifiedEarlier, timerModifiedLater:
if !atomic.Cas(t.status, s, timerMoving) { // stauts 變更為 timerMoving
continue
}
t.when = t.nextwhen // 將執(zhí)行時間設置為其下次執(zhí)行的時候
// -----刪除堆頂位置,并按照其新的執(zhí)行時間加入到對應的位置
dodeltimer0(pp)
doaddtimer(pp, t) // 添加 timer 的關鍵部分
// ------------
if s == timerModifiedEarlier {
atomic.Xadd(pp.adjustTimers, -1)
}
if !atomic.Cas(t.status, timerMoving, timerWaiting) {
badTimer()
}
default:
return
}
}
}
// timer 刪除的源碼部分
//(擴充:func dodeltimer(pp *p, i int) 意思就是刪除指定所索引
// 的位置,然后恢復小頂堆的結(jié)構(gòu),可以看源碼,就不解釋了)
func dodeltimer0(pp *p) {
if t := pp.timers[0]; t.pp.ptr() != pp {
throw("dodeltimer0: wrong P")
} else {
t.pp = 0 // 這里將指針情況
}
// --- 將堆的最后一位 timer 放到堆頂,然后清空最后一位的空間,然后向下調(diào)整---
last := len(pp.timers) - 1
if last > 0 {
pp.timers[0] = pp.timers[last]
}
pp.timers[last] = nil
pp.timers = pp.timers[:last]
if last > 0 {
siftdownTimer(pp.timers, 0)//向下調(diào)整的核心部分
}
// ---------------------
updateTimer0When(pp) //更新當前 p 的最先執(zhí)行 timer 的執(zhí)行時間
atomic.Xadd(pp.numTimers, -1)
}
func updateTimer0When(pp *p) {
if len(pp.timers) == 0 {
atomic.Store64(pp.timer0When, 0)
} else {
atomic.Store64(pp.timer0When, uint64(pp.timers[0].when))
}
}
// timer 增加的源碼部分
func doaddtimer(pp *p, t *timer) {
...
if t.pp != 0 {
throw("doaddtimer: P already set in timer")
}
t.pp.set(pp)
// --- 將 timer 放置到堆的最后一位,然后向上調(diào)整 ---
i := len(pp.timers)
pp.timers = append(pp.timers, t)
siftupTimer(pp.timers, i)// 向上調(diào)整的核心部分
// ---------------------------
if t == pp.timers[0] {
atomic.Store64(pp.timer0When, uint64(t.when))
}
atomic.Xadd(pp.numTimers, 1)
}
當我們已知 timers 是小頂堆的數(shù)據(jù)結(jié)構(gòu)(滿足“當前位置的值小于等于父位置的值“即可,實現(xiàn)方式使用數(shù)組,由下面代碼可以知道是四叉小頂堆,結(jié)構(gòu)如下圖)的情況后,接下來看堆向上或者向下調(diào)整的細節(jié)部分
// timers 堆的向上調(diào)整
func siftupTimer(t []*timer, i int) {
...
when := t[i].when
tmp := t[i]
for i > 0 {
p := (i - 1) / 4 // 由這里可以看出,堆的節(jié)點長度是4
if when >= t[p].when {
break
}
// --- 向上進行調(diào)整,即父節(jié)點下移,當前節(jié)點上移 ---
t[i] = t[p]
i = p
//向上進行調(diào)整
}
if tmp != t[i] {
t[i] = tmp
}
}
//timers 堆的向下調(diào)整
func siftdownTimer(t []*timer, i int) {
n := len(t)
if i >= n {
badTimer()
}
when := t[i].when
tmp := t[i]
for {
// --- 以下部分就是找到當前4個節(jié)點中最小的那個值和在數(shù)組的位置 -----
c := i*4 + 1 // 這里是子節(jié)點最左邊的節(jié)點
c3 := c + 2 // 這里是子節(jié)點第三個節(jié)點
if c >= n {
break
}
w := t[c].when
if c+1 n t[c+1].when w {
w = t[c+1].when
c++
}
if c3 n {
w3 := t[c3].when
if c3+1 n t[c3+1].when w3 {
w3 = t[c3+1].when
c3++
}
if w3 w {
w = w3
c = c3
}
}
//---------------------------------
if w >= when {
break
}
// --- 向下進行調(diào)整,即子節(jié)點上移,當前節(jié)點下移 ---
t[i] = t[c]
i = c
// ---------------
}
if tmp != t[i] {
t[i] = tmp
}
}
既然已經(jīng)知道timer放到四叉小頂堆,那 timer 是怎么執(zhí)行的呢?接下來就是定時器的核心部分入口 runtimer()
runtimer
// 這里執(zhí)行的前提是當前 P 的 timesLock 已經(jīng)鎖了,所以不用擔心并發(fā)問題
func runtimer(pp *p, now int64) int64 {
for {
t := pp.timers[0] //找到 timers 堆的堆頂,為最先執(zhí)行的 timer
if t.pp.ptr() != pp {
throw("runtimer: bad p")
}
switch s := atomic.Load(t.status); s {
case timerWaiting:
if t.when > now { //如果還沒到時間,則返回調(diào)用的時間
return t.when
}
if !atomic.Cas(t.status, s, timerRunning) {
continue
}
runOneTimer(pp, t, now)// 這里是執(zhí)行timer的核心
return 0
case timerDeleted:
if !atomic.Cas(t.status, s, timerRemoving) {
continue
}
dodeltimer0(pp) //刪除 timers 堆頂?shù)?timer
if !atomic.Cas(t.status, timerRemoving, timerRemoved) {
badTimer()
}
atomic.Xadd(pp.deletedTimers, -1)
if len(pp.timers) == 0 {
return -1
}
case timerModifiedEarlier, timerModifiedLater:
if !atomic.Cas(t.status, s, timerMoving) {
continue
}
//刪除堆頂?shù)奈恢?,調(diào)整 timer 到最新的時間,以及進行重新調(diào)整
t.when = t.nextwhen
dodeltimer0(pp)
doaddtimer(pp, t)
if s == timerModifiedEarlier {
atomic.Xadd(pp.adjustTimers, -1)
}
if !atomic.Cas(t.status, timerMoving, timerWaiting) {
badTimer()
}
case timerModifying:
osyield()
case timerNoStatus, timerRemoved:
badTimer()
case timerRunning, timerRemoving, timerMoving:
badTimer()
default:
badTimer()
}
}
}
因此我們知道了執(zhí)行的核心流程是 runOneTimer()
runOneTimer
// 由于是 runtimer 進行調(diào)用,因此也線程安全
func runOneTimer(pp *p, t *timer, now int64) {
...
f := t.f
arg := t.arg
seq := t.seq
if t.period > 0 { //如果有周期,則算出下次 timer 執(zhí)行的時間,并加入到對應的位置(這里就是 Ticker 和 Timer 的區(qū)別)
delta := t.when - now
t.when += t.period * (1 + -delta/t.period)
siftdownTimer(pp.timers, 0)// 將四叉小頂堆向下調(diào)整
if !atomic.Cas(t.status, timerRunning, timerWaiting) {
badTimer()
}
updateTimer0When(pp)//更新當前 P 的最先的 timer 的執(zhí)行時間
} else {
// 從堆頂位置上刪除 timer,并調(diào)整
dodeltimer0(pp)
if !atomic.Cas(t.status, timerRunning, timerNoStatus) {
badTimer()
}
}
...
unlock(pp.timersLock)
f(arg, seq) // 執(zhí)行對應的 f,這里就是我們 Timer.C 來的地方
lock(pp.timersLock)
...
}
從 runtimer 的調(diào)用,我們知道執(zhí)行的入口是 checkTimers(),我們詳細看下
checkTimers
我們可以看下圖,由下圖可知,是通過 Go 里面的調(diào)度中去尋找可執(zhí)行的 timer
我們看下 checkTimers 做了什么
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
if atomic.Load(pp.adjustTimers) == 0 {// 如果沒有需要可調(diào)整的,則直接返回最先執(zhí)行 timer 的時間
next := int64(atomic.Load64(pp.timer0When))
if next == 0 {
return now, 0, false
}
if now == 0 {
now = nanotime()
}
if now next { // 表示還沒有到執(zhí)行時間
if pp != getg().m.p.ptr() || int(atomic.Load(pp.deletedTimers)) = int(atomic.Load(pp.numTimers)/4) { //且要刪除的 Timer數(shù)量小于 Timer總數(shù)的1/4
return now, next, false
}
}
}
lock(pp.timersLock)
adjusttimers(pp)// 可以看下面的源碼解析,當前 p 上的所有 timers 的狀態(tài),該刪除的刪了,該調(diào)整的調(diào)整
rnow = now
if len(pp.timers) > 0 {
if rnow == 0 {
rnow = nanotime()
}
for len(pp.timers) > 0 {
if tw := runtimer(pp, rnow); tw != 0 { // 通過 runtimer(可以看上面的源碼解析) 開始調(diào)用
if tw > 0 {
pollUntil = tw
}
break
}
ran = true
}
}
// 如果可刪除的 Timers 大于 Timer總數(shù)量的1/4,則進行刪除(因為上面執(zhí)行了 runtimer)
if pp == getg().m.p.ptr() int(atomic.Load(pp.deletedTimers)) > len(pp.timers)/4 {
clearDeletedTimers(pp)
}
unlock(pp.timersLock)
return rnow, pollUntil, ran
}
adjusttimers
func adjusttimers(pp *p) {
if len(pp.timers) == 0 {
return
}
if atomic.Load(pp.adjustTimers) == 0 { // 如果需要調(diào)整的 Timer 為 0,則直接返回
...
return
}
var moved []*timer
loop:
for i := 0; i len(pp.timers); i++ {
t := pp.timers[i]
if t.pp.ptr() != pp {
throw("adjusttimers: bad p")
}
switch s := atomic.Load(t.status); s {
case timerDeleted: // 這里就是將部分需要刪除的 Timer 給清理掉
if atomic.Cas(t.status, s, timerRemoving) {
dodeltimer(pp, i)
if !atomic.Cas(t.status, timerRemoving, timerRemoved) {
badTimer()
}
atomic.Xadd(pp.deletedTimers, -1)
i--
}
case timerModifiedEarlier, timerModifiedLater: // 把需要調(diào)整 Timer 放到 moved 中,然后刪除當前堆的數(shù)據(jù)進行堆調(diào)整,后續(xù)將 moved 通過 addAdjustedTimers 添加
if atomic.Cas(t.status, s, timerMoving) {
t.when = t.nextwhen
dodeltimer(pp, i)
moved = append(moved, t)
if s == timerModifiedEarlier {
if n := atomic.Xadd(pp.adjustTimers, -1); int32(n) = 0 {
break loop
}
}
i--
}
case timerNoStatus, timerRunning, timerRemoving, timerRemoved, timerMoving:
badTimer()
case timerWaiting:
case timerModifying:
osyield()
i--
default:
badTimer()
}
}
if len(moved) > 0 {
addAdjustedTimers(pp, moved) // 這里就是將需要調(diào)整的 timer 重新添加進來
}
...
}
addAdjustedTimers
func addAdjustedTimers(pp *p, moved []*timer) {
for _, t := range moved {
doaddtimer(pp, t)// 上文有源碼解析
if !atomic.Cas(t.status, timerMoving, timerWaiting) {
badTimer()
}
}
}
clearDeletedTimers
func clearDeletedTimers(pp *p) {
cdel := int32(0)
cearlier := int32(0)
to := 0
changedHeap := false
timers := pp.timers
nextTimer:
for _, t := range timers {
for {
switch s := atomic.Load(t.status); s {
case timerWaiting:
if changedHeap {
timers[to] = t
siftupTimer(timers, to)
}
to++
continue nextTimer
case timerModifiedEarlier, timerModifiedLater: // 將 timer 狀態(tài)調(diào)整成 timeWaiting,將其放至其正確的執(zhí)行時間位置
if atomic.Cas(t.status, s, timerMoving) {
t.when = t.nextwhen
timers[to] = t
siftupTimer(timers, to)
to++
changedHeap = true
if !atomic.Cas(t.status, timerMoving, timerWaiting) {
badTimer()
}
if s == timerModifiedEarlier {
cearlier++
}
continue nextTimer
}
case timerDeleted: // 將 timerDeleted 轉(zhuǎn)變成 timerRemoved,然后從 timers 堆中刪掉(在當前函數(shù)后面可以看出)
if atomic.Cas(t.status, s, timerRemoving) {
t.pp = 0
cdel++
if !atomic.Cas(t.status, timerRemoving, timerRemoved) {
badTimer()
}
changedHeap = true
continue nextTimer
}
case timerModifying:
osyield()
case timerNoStatus, timerRemoved:
badTimer()
case timerRunning, timerRemoving, timerMoving:
badTimer()
default:
badTimer()
}
}
}
// 在這里對于剩余的空間 設置為 nil 操作(垃圾回收方便)
for i := to; i len(timers); i++ {
timers[i] = nil
}
atomic.Xadd(pp.deletedTimers, -cdel)
atomic.Xadd(pp.numTimers, -cdel)
atomic.Xadd(pp.adjustTimers, -cearlier)
// 在這里進行一次大清理
timers = timers[:to]
pp.timers = timers
updateTimer0When(pp)
...
}
大致執(zhí)行的情況我們看好了,那我們接下來看 Stop() 的源碼部分
deltimer
func (t *Ticker) Stop() {
stopTimer(t.r)
}
func stopTimer(t *timer) bool {
return deltimer(t)
}
func deltimer(t *timer) bool {
for {
switch s := atomic.Load(t.status); s {
case timerWaiting, timerModifiedLater: //將 timer 的 status變更為 timerDeleted ,并deletedTimers 加 1
mp := acquirem()
if atomic.Cas(t.status, s, timerModifying) {
tpp := t.pp.ptr()
if !atomic.Cas(t.status, timerModifying, timerDeleted) { //
badTimer()
}
releasem(mp)
atomic.Xadd(tpp.deletedTimers, 1)
return true
} else {
releasem(mp)
}
case timerModifiedEarlier: //將 timer 的 status 變更為 timerDeleted,然后 adjustTimers 減 1,deletedTimers 加 1
mp := acquirem()
if atomic.Cas(t.status, s, timerModifying) {
tpp := t.pp.ptr()
atomic.Xadd(tpp.adjustTimers, -1)
if !atomic.Cas(t.status, timerModifying, timerDeleted) {
badTimer()
}
releasem(mp)
atomic.Xadd(tpp.deletedTimers, 1)
return true
} else {
releasem(mp)
}
case timerDeleted, timerRemoving, timerRemoved:
return false
case timerRunning, timerMoving:
osyield()
case timerNoStatus:
return false
case timerModifying:
osyield()
default:
badTimer()
}
}
}
后續(xù)調(diào)度中, Timer 的狀態(tài)可以從 timerDeleted 設置成 timerRemoved 并從 timers 堆中去除(注意,這里用了“可以”,可以看上面的狀態(tài)圖了解)
在復用 Timer 的時候,我們經(jīng)常使用 Reset(),我們來看下源碼部分是怎么樣的
modtimer
func (t *Timer) Reset(d Duration) bool {
if t.r.f == nil {
panic("time: Reset called on uninitialized Timer")
}
w := when(d)
active := stopTimer(t.r) // 這里我們上面源碼解釋過了,即將當前的 timer 的 status 設置成 timerDeleted
resetTimer(t.r, w)
return active
}
func resettimer(t *timer, when int64) {
modtimer(t, when, t.period, t.f, t.arg, t.seq)
}
func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) {
if when 0 {
when = maxWhen
}
status := uint32(timerNoStatus)
wasRemoved := false
var mp *m
loop:
for {
// 主要的目的就是將當前的 timer 的狀態(tài)設置成 timerModifying
switch status = atomic.Load(t.status); status {
case timerWaiting, timerModifiedEarlier, timerModifiedLater:
mp = acquirem()
if atomic.Cas(t.status, status, timerModifying) {
break loop
}
releasem(mp)
case timerNoStatus, timerRemoved:
mp = acquirem()
if atomic.Cas(t.status, status, timerModifying) {
wasRemoved = true
break loop
}
releasem(mp)
case timerDeleted:
mp = acquirem()
if atomic.Cas(t.status, status, timerModifying) {
atomic.Xadd(t.pp.ptr().deletedTimers, -1)
break loop
}
releasem(mp)
case timerRunning, timerRemoving, timerMoving:
osyield()
case timerModifying:
osyield()
default:
badTimer()
}
}
t.period = period
t.f = f
t.arg = arg
t.seq = seq
if wasRemoved { // 如果是已經(jīng)被移除的,則要重新加回到 timers 中,且狀態(tài)變更為 timerWaiting
t.when = when
pp := getg().m.p.ptr()
lock(pp.timersLock)
doaddtimer(pp, t)
unlock(pp.timersLock)
if !atomic.Cas(t.status, timerModifying, timerWaiting) {
badTimer()
}
releasem(mp)
wakeNetPoller(when)
} else {
t.nextwhen = when
newStatus := uint32(timerModifiedLater)
if when t.when { //判斷這次新的時間是老的時間的前還是后
newStatus = timerModifiedEarlier
}
adjust := int32(0)
if status == timerModifiedEarlier {
adjust--
}
if newStatus == timerModifiedEarlier {
adjust++
}
if adjust != 0 {
atomic.Xadd(t.pp.ptr().adjustTimers, adjust)
}
if !atomic.Cas(t.status, timerModifying, newStatus) { // 將當前 timer 設置成 timerModifiedEarlier/timerModifiedEarlier
badTimer()
}
releasem(mp)
if newStatus == timerModifiedEarlier {
wakeNetPoller(when)
}
}
}
到此這篇關于Golang 定時器(Timer 和 Ticker),這篇文章就夠了的文章就介紹到這了,更多相關Golang 定時器內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
您可能感興趣的文章:- Golang定時器的2種實現(xiàn)方法與區(qū)別
- golang定時器和超時的使用詳解
- Golang中定時器的陷阱詳解
- 用golang實現(xiàn)一個定時器任務隊列實例
- golang中定時器cpu使用率高的現(xiàn)象詳析
- golang time包下定時器的實現(xiàn)方法
- Golang 定時器的終止與重置實現(xiàn)