2-3 Scheduler模块

 

概念

Scheduler:调度器

作用:处理延时型任务(不能马上给结果的)(异步任务、周期性任务、瞬时可能并不会有结果的任务)

原理:延时任务们分发到scheduler,scheduler根据timer的interval等属性,进行定时触发

说明:异步任务才需要Scheduler

为什么需要scheduler

我们项目中有异步任务 - 延时删除视频

Scheduler Server包含什么

代码结构

/scheduler

/../taskrunner/

/../../trmain.go

/../../task.go       dispatcher和executor里面真正跑着的东西

/../../runner.go     整个dispatcher和executor的逻辑,会在这个runner里面

/../../defs.go      提前定义好的一些东西,如 task的格式、在channel里面的消息类型等

/../main.go

/../handlers.go

/../response.go

/../dbops/   所有的任务都是从db里读出来的,任务的处理情况需要回写到数据库。

Tasker Runner架构

大框架由一个Timer启动,Timer里面有一个Task Runner,Task Runner分为三部分:

  1. Dispatcher(Producer)
  2. Executor(Consumer)
  3. channel(用来通信的)

Dispatcher把它得到的任务,通过Channel发送给Executor,Executor会去读这些内容,然后做些操作。

Task Runner

trmain.go 

用法

说明:taskrunner这个包,是具有独立功能、可独自运行的包,而非只提供方法调用的包

runner

逻辑:

runnner有一个对象,


type controlChan chan string // control channel
type dataChan chan interface{} // 要下发的数据
type fn func(dc dataChan) error // dispatcher和executer


const (
	READY_TO_DISPATCH = "d" // dispatcher收到这个消息后,会开始做事情,下发数据给datachannel里面
	READY_TO_EXECUTE = "e" // dispatcher数据下发完之后,会把这个信息发给executor,executor会从datachannel里读下发下来的数据,做它的一些操作
	CLOSE = "c"  // 不论executor还是dispatcher出了问题,或者,没有任务可做的时候,会发送一个close,我们会把常驻的任务取消掉,结束整个过程
) // 都是control channel里面传递的消息,分为了上述三种


//	Runner的定义 - 关键
type Runner struct {
	Controller controlChan
	Error controlChan // 用于返回CLOSE的,这个信息需要和其他控制信息分开两个channel展示,便于维护
	Data dataChan
	dataSize int
	longlived bool // 我们的Runner里面起了很多channel,当我们把常驻任务退出的时候,我们有两个选择:要么把资源回收,要么不回收,下次再使用的时候能复用。yes则不回收
	Dispatcher fn
	Executor fn
}


// size: 每次下发多少数据
func NewRunner(size int, longlived bool, d fn, e fn) *Runner {
	return &Runner {
		Controller: make(chan string, 1), // 我们的过程应该是非阻塞的,所以用带buffer的channel
		Error: make(chan string, 1),
		Data: make(chan interface{}, size),
		longlived: longlived,
		dataSize: size,
		Dispatcher: d,
		Executor: e,
	}
}

// 常驻任务
func (r *Runner) startDispatch() {

	defer func() {
		// 根据标识位判断是否需要释放channel资源
		if !r.longLived {
			close(r.Controller)
			close(r.Data)
			close(r.Error)
		}()
	}

	for {
		select {
			case c := <- r.Controller:
				if c == READY_TO_DISPATCH {
					err := r.Dispatcher(r.Data) // READY之后,把data channel传给dispatcher. 此时的data channel已经是空闲channel了,dispatcher会把我们的任务写到data channel里面 - 这个逻辑是我们写Dispatcher的时候实现的。
					if err != nil { r.Error <- CLOSE } else {
						//	没报错,说明dispatcher已经正确的把任务下发了,已经写到data channel里面了。
						r.Controller <- READY_TO_EXECUTE
					}
				}
				
				if c == READY_TO_EXECUTE {
					err := r.Executor(r.Data)
					if err != nil {
						r.Error <- CLOSE
					} else {
						//	进来这里,说明r.Executor已经把发进来的Data处理完了,那么我们就发一个READY_TO_DISPATCH的信号,继续到
						r.Controller <- READY_TO_DISPATCH
					}
				}
			case e := <- r.Error:
				// 一旦有Error,我们就close
				if e == CLOSE {
					return
				}
			default:
		}
	}
}

func (r *Runner) StartAll() {
	r.Controller <- READY_TO_DISPATCH
	r.startDispatch()
}

以上,消费者Executor和生产者Dispatcher,会在for循环里互相发消息,转换他们之间的角色,让他们进行异步的消息传输和任务处理,这样可以把他俩完美的解耦开,Executor和Dispatcher各自管处理各自的事情就好。

Runner的使用和测试

func TestRunner(t *testing.T) {
	d := func(dc dataChan) error {
		for i := 0; i < 30; i++ {
			dc <- i
			log.Printf("Dispatcher sent: %v", i)
		}
		return nil
	}

	e := func(dc dataChan) error {
		forloop:
			for {
				select {
				case d := <- dc:
					log.Printf("Executor received: %v", d)
				default:
					break forloop // 这里是为了:当我们把datachannel中的所有东西都取完之后,没有新东西来的情况下,打破这个for死循环,以使dispatcher有机会执行到。
				}
			}

		return nil
	}

	runner := NewRunner(30, false, d, e)
	go runner.StartAll() // 如果不用goroutine起,会被里面的死循环blocking住,下面的Sleep方法失效

	time.Sleep(3 * time.Second)
} 

Task

task部分定制化非常强

我们项目中最重要的Task:延时删除

dispatcher: 从数据库中读,需要删除的信息。读到这些信息放到datachannel,executer把这些信息删掉


我们新建了一个表,用来存需要删除的video的id的

internal.go - 操作数据库的方法们

流程:

  1. 通过api将video_id写到数据库里
  2. dispatcher从数据库里拿到video_id,交给datachannel
  3. executor从datachannel中读video_id,做delete操作


// read - 从数据库里把需要删掉的数据的id读出来
// 为什么传count进来?因为我们要批量拿数据,以减小数据库压力,而不能逐条去拿
func ReadVideoDeletionRecord(count int) ([]string, error) {
	stmtOut, err := dbConn.Prepare("SELECT video_id FROM video_del_rec LIMIT ?")
	var ids []string
	if err != nil {
		return ids, err
	}	
	
	rows, err := stmtOut.Query(count)
	if err != nil {
		log.Printf("Query VideoDeletionRecord error :%v", err) // 生产环境不能这样大log,而是要做好log的分级
		return ids, err
	}


	for rows.Next() {
		var id string
		if err := rows.Scan(&id); err != nil {
			return ids, err
		}
		ids = append(ids, id)
	}
	
	defer stmtOut.Close()
	return ids, nil
}


// 写数据库 - executor删掉视频之后,需要写数据库更改数据
func DelViewDeletetionRecord(vid string) error {
	stmtDel, err := dbConn.Prepare("DELETE FROM video_del_rec WHERE video_id=?")
	if err != nil {
		return err
	}
	_, err = stmtDel.Exec(vid)
	if err != nil {
		log.Printf("删除出错了:%v", err)
		return err
	}
	defer stmtOut.Close
	return nil
}

tasks.go

func VideoClearDispatcher(dc dataChan) error {
	res, err := dbops.ReadVideoDeletionRecord(3) // 生产环境需要把3作为参数传进来
	if err != nil {
		log.Printf("err: %v", err)
		return err
	}

	if len(res) == 0 {
		return errors.New("All tasks finished")
	}

	for _, id := range res {
		dc <- id
	}

	return nil
}

func VideoClearExecutor(dc dataChan) error {
	errMap := &sync.map{} // 用一个map把所有的err都带出去	
	forloop:
		for {
			select {
				case vid := <- dc:
					go func(id interface{}) { // 我们希望删除操作并发执行。不过有缺陷:可能删除操作还没做,已经要读下一轮「待删除数据」了,导致读到的待删数据有重复。不影响我们的读写过程
						if err := deleteVideo(id.(string)); err != nil { errMap.Store(id, err) return }
						if err := dbops.DelVideoDeletionRecord(id.(string)); err != nil { errMap.Store(id, err) return }
					}(vid)
				default:
					break forloop
		}
	errMap.Range(func(k, v interface{}) bool {
		err = v.(error)
		if err != nil { // 只要有一个error,就返回之
			return false
		}
	}

	return err
}


func deleteVideo(vid string) error {
	err := os.Remove("./videos/" + vid)
	//	报错且不是「不存在」
	if err != nil && !os.IsNotExist(err) {
		log.Printf("删除操作报错")
		return err;
	}
	return nil
}

Timer部分

Timer的逻辑:

  1. 设置Timer
  2. start timer
    1. 一直跑下去
    2. trigger之后,将所有的task跑一边
      1. task包装在runner里,所以把runner跑一遍
      2. 需要区分runner是不是longlived的

timer、task、runner、longlived

type Worker struct {
	ticker *time.Ticker // 这个ticker会不断接受系统发来的时间,达到我们想要的时间间隔的时候自动触发我们的事情
	runner *Runner // Runner的定义在上面
}


func NewWorker(interval time.Duration, r *Runner) *Worker {
	return &Worker {
		ticker: time.NewTickers(interval * time.Second),
		runner: r,
	}
}

func (w *Worker) startWorker() {
	for {
		select {
		case <- w.ticker.C:
			go w.runner.StartAll()
		}
	}
}


func Start() {
	// 每次处理多少个数据(删几个文件),是否常驻,任务的Dispatcher,任务的Executor
	r := NewRunner(3, true, VideoClearDispatcher, VideoClearExecutor)
	w := NewWorker(3, r)
	go w.startWorker()
}

提示:

  1. time.Ticker{}.C
    1. C:就是ticker传过来的Channel,timer会把每一个ticks传到这个channel里。
    2. The channel on which the ticks are delivered.

RESTful API部分

总结任务逻辑

  1. user 通过 api service 发布任务:delete video
  2. api service 调用 scheduler,给scheduler所属的表里,写一条记录:要删除的视频
  3. 起一个timer
  4. timer到点儿了,起一个runner,runner读表,执行,然后删除video
func AddVideoDeletionRecord(vid string) error {
	stmtIns, err := dbConn.Prepare("INSERT INTO video_del_rec (video_id) VALUES(?)")
	if err != nil {
		return err
	}

	_,err = stmtIns.Exec(vid)
	if err != nil {
		log.Printf("AddVideoDeletionRecord err: %v", err)
		return err
	}

	defer stmtIns.Close()
	return nil
}


main.go

func RegisterHandlers() *httprouter.Router {
	router := httprouter.New()

	router.GET("/video-delete-record/:vid-id", vidDelRecHandler)
	return router
}

func main() {
	go taskrunner.Start() // 我们不知道这个Start里面是不是死锁类型的,所以需要起一个goroutine。http.ListenAndServe这个东东是死循环,所以这个子线程的函数不会还没执行就死掉。
	r := RegisterHandlers()
	http.ListenAndServe(":9001", r)
}


handler.go

func vidDelRecHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params){
    vid := p.ByName("vid-id")

    if len(vid) == 0 {
        sendResponse(w, 400, "video id should not be empty")
        return 
    }

    err := dbops.AddVideoDeletionRecord(vid)
    if err != nil {
        sendResponse(w, 500, "Internal server error")
        return
    }

    sendResponse(w, 200, "")
    return
}