|
|
|
@ -58,22 +58,24 @@ func After(delay time.Duration, work func()) {
|
|
|
|
|
func RunQueue() {
|
|
|
|
|
defer log_info("Queue runner done.")
|
|
|
|
|
heap.Init(&queue)
|
|
|
|
|
ticker := time.Tick(10 * time.Millisecond)
|
|
|
|
|
for {
|
|
|
|
|
if len(queue) == 0 {
|
|
|
|
|
time.Sleep(10 * time.Microsecond)
|
|
|
|
|
continue
|
|
|
|
|
select {
|
|
|
|
|
case <-ticker:
|
|
|
|
|
if queue.Len() == 0 {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
queueLock.Lock()
|
|
|
|
|
future, ok := heap.Pop(&queue).(*Future)
|
|
|
|
|
queueLock.Unlock()
|
|
|
|
|
if !ok {
|
|
|
|
|
log_error("there's shit on the work heap")
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if future.ts.After(time.Now()) {
|
|
|
|
|
time.Sleep(future.ts.Sub(time.Now()))
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
if future.ts.Before(time.Now()) {
|
|
|
|
|
future.work()
|
|
|
|
|
} else {
|
|
|
|
|
heap.Push(&queue, future)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|