2-3 Scheduler模块
Tue, Jul 3, 2018
概念
Scheduler:调度器
作用:处理延时型任务(不能马上给结果的)(异步任务、周期性任务、瞬时可能并不会有结果的任务)
原理:延时任务们分发到scheduler,scheduler根据timer的interval等属性,进行定时触发
说明:异步任务才需要Scheduler
为什么需要scheduler
我们项目中有异步任务 - 延时删除视频
- 我们在删除数据的时候,一般先在source delete
- source delete 用户看不到,但是其实是存在的。会把这个任务下发下去,在scheduler中过一周或一个月,才真正删除。那么这个schedler会每隔一个月运行一次。
Scheduler Server包含什么
- Server
- Scheduler是一个独立的服务
- 接收任务用的
- Timer
- 定时触发任务的
- Task Runner (生产者/消费者模型下)
- 描述timer每次触发的时候,下面的任务该怎么做
- 很多中情况,任务分为两种:
- 读取任务
- 执行任务
代码结构
/scheduler
/../taskrunner/
/../../trmain.go
/../../task.go dispatcher和executor里面真正跑着的东西
/../../runner.go 整个dispatcher和executor的逻辑,会在这个runner里面
/../../defs.go 提前定义好的一些东西,如 task的格式、在channel里面的消息类型等
/../main.go
/../handlers.go
/../response.go
/../dbops/ 所有的任务都是从db里读出来的,任务的处理情况需要回写到数据库。
Tasker Runner架构
大框架由一个Timer启动,Timer里面有一个Task Runner,Task Runner分为三部分:
- Dispatcher(Producer)
- Executor(Consumer)
- channel(用来通信的)
Dispatcher把它得到的任务,通过Channel发送给Executor,Executor会去读这些内容,然后做些操作。
Task Runner
trmain.go
用法
- task runner里有dispatcher和executor。我们需要将所有的任务集中起来,在tsmain里初始化并将它跑起来
- 在外面的大main里面,将tsmain的内容初始化,并启动起来
说明:taskrunner这个包,是具有独立功能、可独自运行的包,而非只提供方法调用的包
runner
逻辑:
runnner有一个对象,
- 在runner里面跑一个常驻的任务,如:startDispatcher
- 这个任务会长时间等待runner的channel
- channel分为两部分:
- control channel
- 用来dispatcher和executer互相交换信息,提醒对方:
- executer会说:我的任务已经处理完了,你来给我发新任务,或你去读新的任务
- dispatcher会说:我的任务已经读完了,你可以开始处理了
- 用来dispatcher和executer互相交换信息,提醒对方:
- data channel
- 真正的数据的channel
- control channel
- channel分为两部分:
- 这个任务会长时间等待runner的channel
type controlChan chan string // control channel type dataChan chan interface{} // 要下发的数据 type fn func(dc dataChan) error // dispatcher和executer const ( READY_TO_DISPATCH = "d" // dispatcher收到这个消息后,会开始做事情,下发数据给datachannel里面 READY_TO_EXECUTE = "e" // dispatcher数据下发完之后,会把这个信息发给executor,executor会从datachannel里读下发下来的数据,做它的一些操作 CLOSE = "c" // 不论executor还是dispatcher出了问题,或者,没有任务可做的时候,会发送一个close,我们会把常驻的任务取消掉,结束整个过程 ) // 都是control channel里面传递的消息,分为了上述三种 // Runner的定义 - 关键 type Runner struct { Controller controlChan Error controlChan // 用于返回CLOSE的,这个信息需要和其他控制信息分开两个channel展示,便于维护 Data dataChan dataSize int longlived bool // 我们的Runner里面起了很多channel,当我们把常驻任务退出的时候,我们有两个选择:要么把资源回收,要么不回收,下次再使用的时候能复用。yes则不回收 Dispatcher fn Executor fn } // size: 每次下发多少数据 func NewRunner(size int, longlived bool, d fn, e fn) *Runner { return &Runner { Controller: make(chan string, 1), // 我们的过程应该是非阻塞的,所以用带buffer的channel Error: make(chan string, 1), Data: make(chan interface{}, size), longlived: longlived, dataSize: size, Dispatcher: d, Executor: e, } } // 常驻任务 func (r *Runner) startDispatch() { defer func() { // 根据标识位判断是否需要释放channel资源 if !r.longLived { close(r.Controller) close(r.Data) close(r.Error) }() } for { select { case c := <- r.Controller: if c == READY_TO_DISPATCH { err := r.Dispatcher(r.Data) // READY之后,把data channel传给dispatcher. 此时的data channel已经是空闲channel了,dispatcher会把我们的任务写到data channel里面 - 这个逻辑是我们写Dispatcher的时候实现的。 if err != nil { r.Error <- CLOSE } else { // 没报错,说明dispatcher已经正确的把任务下发了,已经写到data channel里面了。 r.Controller <- READY_TO_EXECUTE } } if c == READY_TO_EXECUTE { err := r.Executor(r.Data) if err != nil { r.Error <- CLOSE } else { // 进来这里,说明r.Executor已经把发进来的Data处理完了,那么我们就发一个READY_TO_DISPATCH的信号,继续到 r.Controller <- READY_TO_DISPATCH } } case e := <- r.Error: // 一旦有Error,我们就close if e == CLOSE { return } default: } } } func (r *Runner) StartAll() { r.Controller <- READY_TO_DISPATCH r.startDispatch() } |
以上,消费者Executor和生产者Dispatcher,会在for循环里互相发消息,转换他们之间的角色,让他们进行异步的消息传输和任务处理,这样可以把他俩完美的解耦开,Executor和Dispatcher各自管处理各自的事情就好。
Runner的使用和测试
func TestRunner(t *testing.T) { d := func(dc dataChan) error { for i := 0; i < 30; i++ { dc <- i log.Printf("Dispatcher sent: %v", i) } return nil } e := func(dc dataChan) error { forloop: for { select { case d := <- dc: log.Printf("Executor received: %v", d) default: break forloop // 这里是为了:当我们把datachannel中的所有东西都取完之后,没有新东西来的情况下,打破这个for死循环,以使dispatcher有机会执行到。 } } return nil } runner := NewRunner(30, false, d, e) go runner.StartAll() // 如果不用goroutine起,会被里面的死循环blocking住,下面的Sleep方法失效 time.Sleep(3 * time.Second) } |
Task
task部分定制化非常强
我们项目中最重要的Task:延时删除
dispatcher: 从数据库中读,需要删除的信息。读到这些信息放到datachannel,executer把这些信息删掉
我们新建了一个表,用来存需要删除的video的id的
internal.go - 操作数据库的方法们
流程:
- 通过api将video_id写到数据库里
- dispatcher从数据库里拿到video_id,交给datachannel
- executor从datachannel中读video_id,做delete操作
// read - 从数据库里把需要删掉的数据的id读出来 // 为什么传count进来?因为我们要批量拿数据,以减小数据库压力,而不能逐条去拿 func ReadVideoDeletionRecord(count int) ([]string, error) { stmtOut, err := dbConn.Prepare("SELECT video_id FROM video_del_rec LIMIT ?") var ids []string if err != nil { return ids, err } rows, err := stmtOut.Query(count) if err != nil { log.Printf("Query VideoDeletionRecord error :%v", err) // 生产环境不能这样大log,而是要做好log的分级 return ids, err } for rows.Next() { var id string if err := rows.Scan(&id); err != nil { return ids, err } ids = append(ids, id) } defer stmtOut.Close() return ids, nil } // 写数据库 - executor删掉视频之后,需要写数据库更改数据 func DelViewDeletetionRecord(vid string) error { stmtDel, err := dbConn.Prepare("DELETE FROM video_del_rec WHERE video_id=?") if err != nil { return err } _, err = stmtDel.Exec(vid) if err != nil { log.Printf("删除出错了:%v", err) return err } defer stmtOut.Close return nil } |
tasks.go
func VideoClearDispatcher(dc dataChan) error { res, err := dbops.ReadVideoDeletionRecord(3) // 生产环境需要把3作为参数传进来 if err != nil { log.Printf("err: %v", err) return err } if len(res) == 0 { return errors.New("All tasks finished") } for _, id := range res { dc <- id } return nil } func VideoClearExecutor(dc dataChan) error { errMap := &sync.map{} // 用一个map把所有的err都带出去 forloop: for { select { case vid := <- dc: go func(id interface{}) { // 我们希望删除操作并发执行。不过有缺陷:可能删除操作还没做,已经要读下一轮「待删除数据」了,导致读到的待删数据有重复。不影响我们的读写过程 if err := deleteVideo(id.(string)); err != nil { errMap.Store(id, err) return } if err := dbops.DelVideoDeletionRecord(id.(string)); err != nil { errMap.Store(id, err) return } }(vid) default: break forloop } errMap.Range(func(k, v interface{}) bool { err = v.(error) if err != nil { // 只要有一个error,就返回之 return false } } return err } func deleteVideo(vid string) error { err := os.Remove("./videos/" + vid) // 报错且不是「不存在」 if err != nil && !os.IsNotExist(err) { log.Printf("删除操作报错") return err; } return nil } |
Timer部分
Timer的逻辑:
- 设置Timer
- start timer
- 一直跑下去
- trigger之后,将所有的task跑一边
- task包装在runner里,所以把runner跑一遍
- 需要区分runner是不是longlived的
timer、task、runner、longlived
type Worker struct { ticker *time.Ticker // 这个ticker会不断接受系统发来的时间,达到我们想要的时间间隔的时候自动触发我们的事情 runner *Runner // Runner的定义在上面 } func NewWorker(interval time.Duration, r *Runner) *Worker { return &Worker { ticker: time.NewTickers(interval * time.Second), runner: r, } } func (w *Worker) startWorker() { for { select { case <- w.ticker.C: go w.runner.StartAll() } } } func Start() { // 每次处理多少个数据(删几个文件),是否常驻,任务的Dispatcher,任务的Executor r := NewRunner(3, true, VideoClearDispatcher, VideoClearExecutor) w := NewWorker(3, r) go w.startWorker() } |
提示:
- time.Ticker{}.C
- C:就是ticker传过来的Channel,timer会把每一个ticks传到这个channel里。
The channel on which the ticks are delivered.
RESTful API部分
总结任务逻辑
- user 通过 api service 发布任务:delete video
- api service 调用 scheduler,给scheduler所属的表里,写一条记录:要删除的视频
- 起一个timer
- timer到点儿了,起一个runner,runner读表,执行,然后删除video
func AddVideoDeletionRecord(vid string) error { stmtIns, err := dbConn.Prepare("INSERT INTO video_del_rec (video_id) VALUES(?)") if err != nil { return err } _,err = stmtIns.Exec(vid) if err != nil { log.Printf("AddVideoDeletionRecord err: %v", err) return err } defer stmtIns.Close() return nil } |
main.go
func RegisterHandlers() *httprouter.Router { router := httprouter.New() router.GET("/video-delete-record/:vid-id", vidDelRecHandler) return router } func main() { go taskrunner.Start() // 我们不知道这个Start里面是不是死锁类型的,所以需要起一个goroutine。http.ListenAndServe这个东东是死循环,所以这个子线程的函数不会还没执行就死掉。 r := RegisterHandlers() http.ListenAndServe(":9001", r) } |
handler.go
func vidDelRecHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params){ vid := p.ByName("vid-id") if len(vid) == 0 { sendResponse(w, 400, "video id should not be empty") return } err := dbops.AddVideoDeletionRecord(vid) if err != nil { sendResponse(w, 500, "Internal server error") return } sendResponse(w, 200, "") return } |