2-2 Stream模块

 

Stream Server

两个关键知识点

特点

静态视频,非RTMP

RTMP:用在直播上的,Client端不断有Input,别的Client端不断有Output。复杂度比较高

Streaming部分:

代码

关键点:

  1. 长连:发一个Request过来,不断向Client输出数据流,按照整个视频的进程走时间
  2. 流控:多路长连接同时保持的时候,不断发起连接不断打开视频,总有一天服务器会Crash掉。所以我们需要一个「流控机制」
    1. 我们的「流控」只控制Connection的部分

结构说明

文件结构:

特点:没有需要和db交互的东东

/streamserver

/../main.go

/../handlers.go

/../defs.go

/../limiter.go


结构简单,功能单一,所以不用像api一样那么多层。

limiter.go

流控方案

流控:在网站的Server online的时候,一定有攻击者发起请求。当请求达到一定数量的时候,会导致你所在的Server的连接数不够。如果不仅连接数消耗完、并且带宽占完之后,你的Server就处于不可用的状态。

更可怕的情况:当你的RAM也消耗完,你的系统就会Crash

为了保护系统,需要做流控

bucket token算法

bucket:类似一个箱子,箱子里有好多token 1...n

bucket放在Server里:

说明:

需要注意:防止bucket的协程安全问题

实现

type ConnLimiter struct {
	concurrentConn int
	bucket chan int
}

func NewConnLimiter(cc int) *ConnLimiter {
	return &ConnLimiter{
		concurrentConn: cc,
		bucket: make(chan int, cc), // buffer channel *1
	}
}

// 获取Token, 返回获取结果
func (cl *ConnLimiter) GetConn() bool {
	if len(cl.bucket) >= cl.concurrentConn {
		log.Printf("达到上限了")
		return false
	}

	cl.bucket <- 1
	return true
}


func (cl *ConnLimiter) ReleaseConn() {
    c := <-cl.bucket
	log.Printf("来了一个新Token")
}

提示

  1. 有了buffer,可以保证在一定的缓冲区间内,做消息同步的事情
  2. 简单的流控代码实现,却可以起到非常大的作用

response.go

我们提供的这两个服务(上传文件、流媒体播放)主要是传流数据的,所以只有在出错的时候才会用到response

func sendErrorResponse(w http.ResponseWriter, sc int, errMsg string) {
	w.WriterHeader(sc)
	io.WriterString(w, errMsg)
}

main

type middleWareHandler struct {
	r *httprouter.Router
	l *ConnLimiter
}


func NewMiddleWareHandler(r *httprouter.Router cc int) http.Handler {
	m := middleWareHandler{}
	m.r = r
	m.l = NewConnLimiter(cc) // cc: 流控值
	return m
}


func (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	if !m.l.GetConn() {
		sendErrorResponse(w, http.StatusTooManyRequest, "Too many request")
		return
	}
	
	m.r.ServerHTTTP(w, r)
	//	把Token还回去
	m.l.ReleaseConn() // TODO: 测试下这句,会不会被上一句Block住. 如果不会,实际上是起不到限流作用的...
}


func RegisterHandlers() *httprouter.Router {
	router := httprouter.New()

	router.GET("/videos/:vid-id", streamHandler)
	router.POST("/upload/:vid-id", uploadHandler)
	return router
}


func main() {
	r := RegisterHandlers()
	mh := NewMiddleWareHandler(r, 2)
	http.ListenAndServer(":9000", mh) // 监听所有networking interface的9000端口
}

提示:

handlers.go - streamHandler

func streamHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
	vid := p.ByName("vid-id") // 获取video id
	videolink := VIDEO_DIR + vid // 映射到我们的文件夹里 VIDEO_DIR -> "./videos/"
	
	// 打开video
	video, err := os.Open(videolink) // *4
	defer video.Close()

	if err != nil {
		sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
		return
	}
	
	w.Header().Set("Content-Type", "video/mp4") // *2
	http.ServeContent(w, r, "", time.Now(), video) // *3
}

提示:

  1. Streaming实现方案:
    1. 方案一:
      1. 自己将Video的内容格式化成二进制的data stream
      2. 用stream的方式传到Client端
      3. 好处:
        1. 传二进制数据流的速度、带宽均可控
      4. 坏处:
        1. 实现复杂
    2. 方案二:
      1. 我们的方案
      2. 通常规模不大的视频点播,都是这样用的
  2. 设置Content-Type的必要性
    1. 我们存的文件可能是没有扩展名的,而实际的二进制码流,是mp4格式的
    2. 这样设置,浏览器会自动将其作为Video MP4来解析

*3 http.ServeContent(实现流媒体的核心方法) vs io.Copy vs io.Write

*4 os.Open vs ioutil.ReadAll()


func Open(name string) (file *File, err error)

os.Open打开一个文件用于读取。如果操作成功,返回的文件对象的方法可用于读取数据;对应的文件描述符具有O_RDONLY模式。如果出错,错误底层类型是*PathError。



func ReadAll(r io.Reader) ([]byte, error)

ReadAll从r读取数据直到EOF或遇到error,返回读取的数据和遇到的错误。成功的调用返回的err为nil而非EOF。因为本函数定义为读取r直到EOF,它不会将读取返回的EOF视为应报告的错误。

handlers.go - uploadHandler

//	将Client本地的文件,以流的形式,传到服务端
func uploadHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
	//	静态检查
	r.Body = http.MaxBytesReader(w, r.Body, MAX_UPLOAD_SIZE) // *1    1024 * 1024 * 50 = 50MB
	if err := r.ParseMultipartForm(MAX_UPLOAD_SIZE); err != nil { // *2
		sendErrorResponse(w, http.StatusBadRequest, "File is too big")
		return
	}

	//	读表单
	file, _, err := r.FormFile("file") // *3 *4
	if err != nil {
		sendErrorResponse(w, http.StatusInternalServerError, "读文件过程中出问题")
		return
	}

	data, err := ioutil.ReadAll(file) // 读出来
	if err != nil {
		log.Printf("Read file error: %v", err)
		sendErrorResponse(w, http.StatusInternalServerError, "读出错了")
	}
	filename := p.ByName("vid-id")
	err = ioutil.WriteFile(VIDEO_DIR + fn, data, 0666) // path, data, permission 尽量不要用777,权限太大了
	if err != nil {
		log.Printf("Write file error: %v", err)
		sendErrorResponse(w, http.StatusInternalServerError, "")
		return
	}

	w.WriterHeader(http.StatusCreated)
	io.WriteString(w, "Upload Successfully")
}

提示:

  1. MaxBytesReader
    1. 用来限定最大Bytes,IOReader最大能读到缓冲区的大小是多少
    2. 参数:
      1. 参数三:bytes
    3. 文档

      1. MaxBytesReader类似io.LimitReader,但它是用来限制接收到的请求的Body的大小的。

      2. 不同于io.LimitReader,本函数返回一个ReadCloser,返回值的Read方法在读取的数据超过大小限制时会返回非EOF错误,其Close方法会关闭下层的io.ReadCloser接口r。

      3. MaxBytesReader预防客户端因为意外或者蓄意发送的“大”请求,以避免尺寸过大的请求浪费服务端资源。

  2. ParseMultipartForm
    1. HTML的tag里面,有一个表单形式,叫multipart form
    2. Request里面,有一个专门的方法,去Parse 这个 multipart form的
    3. 如果超过了参数值,一定会返回一个错误,这是需要返回bad request
    4. 文档
      1. ParseMultipartForm将请求的主体作为multipart/form-data解析
      2. 请求的整个主体都会被解析:
        1. 得到的文件记录最多maxMemery字节保存在内存
        2. 其余部分保存在硬盘的temp文件里。
      3. 如果必要,ParseMultipartForm会自行调用ParseForm
      4. 重复调用本方法是无意义的
  3. FormFile
    1. <form name="file">
      1. form这个tag里面,有个属性叫name
      2. form file就是name所对应的值,就是它的key值
      3. 写前端页面的时候,一定要记得把这个name设置为file,否则会读不出来
  4. _代表的:一个handler
    1. 里面是整个form的Content的handler
    2. 我们可以拿到头部,做一些文件类型验证之类的
    3. 前端也可以做验证,需要把accept="video/*"会自动检测

文件上传解析

拿文件

r.FormFIle

以key为键,查询r.MultipartForm字段,得到结果中的第一个文件和它的信息。

如果必要,本函数会隐式调用ParseMultipartForm和ParseForm。

查询失败会返回ErrMissingFile错误。

r.MultiparForm


MultipartForm是解析好的多部件表单,包括上传的文件。

本字段只有在调用ParseMultipartForm后才有效。

在客户端,会忽略请求中的本字段而使用Body替代。

测试

upload.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Test Upload File</title>
</head>
<body>
    <form enctype="multipart/form-data" action="http://localhost:9001/upload/testid" method="post">
        <input type="file" name="file" />
        <button type="submit" value="Upload File"/>
    </form>
</body>
</html>

温馨提示:

1.需要提前创建好videos/目录,否则路径不存在无法写文件

2.IDEA的run按钮,生成的可执行文件,放置在了项目的根目录里