2-2 Stream模块
Mon, Jul 2, 2018
Stream Server
两个关键知识点
- Upload files
- Token bucket - 限流
特点
静态视频,非RTMP
RTMP:用在直播上的,Client端不断有Input,别的Client端不断有Output。复杂度比较高
Streaming部分:
- 独立的服务,可独立部署
- 统一的api格式
代码
关键点:
- 长连:发一个Request过来,不断向Client输出数据流,按照整个视频的进程走时间
- 流控:多路长连接同时保持的时候,不断发起连接不断打开视频,总有一天服务器会Crash掉。所以我们需要一个「流控机制」
- 我们的「流控」只控制Connection的部分
结构说明
文件结构:
特点:没有需要和db交互的东东
/streamserver
/../main.go
/../handlers.go
/../defs.go
/../limiter.go
结构简单,功能单一,所以不用像api一样那么多层。
limiter.go
流控方案
流控:在网站的Server online的时候,一定有攻击者发起请求。当请求达到一定数量的时候,会导致你所在的Server的连接数不够。如果不仅连接数消耗完、并且带宽占完之后,你的Server就处于不可用的状态。
更可怕的情况:当你的RAM也消耗完,你的系统就会Crash
为了保护系统,需要做流控
bucket token算法
bucket:类似一个箱子,箱子里有好多token 1...n
bucket放在Server里:
- 当一个Request来的时候,就给他发一个Token
- 当Request拿到Response返回之后,我们把Token还回来,Bucket里面的数量会还原一个
- 当Bucket里的数量是个定值的时候,如20,同时进来的Request就最多只有20个。起到了流控的作用
说明:
- 只有拿到token的Request才是合法、会被处理的Request
需要注意:防止bucket的协程安全问题
- 如果加锁,会影响性能 & 违背Golang设计初衷
- 我们使用channel同步协程之间的信息
实现
type ConnLimiter struct { concurrentConn int bucket chan int } func NewConnLimiter(cc int) *ConnLimiter { return &ConnLimiter{ concurrentConn: cc, bucket: make(chan int, cc), // buffer channel *1 } } // 获取Token, 返回获取结果 func (cl *ConnLimiter) GetConn() bool { if len(cl.bucket) >= cl.concurrentConn { log.Printf("达到上限了") return false } cl.bucket <- 1 return true } func (cl *ConnLimiter) ReleaseConn() { c := <-cl.bucket log.Printf("来了一个新Token") } |
提示
- 有了buffer,可以保证在一定的缓冲区间内,做消息同步的事情
- 简单的流控代码实现,却可以起到非常大的作用
response.go
我们提供的这两个服务(上传文件、流媒体播放)主要是传流数据的,所以只有在出错的时候才会用到response
func sendErrorResponse(w http.ResponseWriter, sc int, errMsg string) { w.WriterHeader(sc) io.WriterString(w, errMsg) } |
main
type middleWareHandler struct { r *httprouter.Router l *ConnLimiter } func NewMiddleWareHandler(r *httprouter.Router cc int) http.Handler { m := middleWareHandler{} m.r = r m.l = NewConnLimiter(cc) // cc: 流控值 return m } func (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !m.l.GetConn() { sendErrorResponse(w, http.StatusTooManyRequest, "Too many request") return } m.r.ServerHTTTP(w, r) // 把Token还回去 m.l.ReleaseConn() // TODO: 测试下这句,会不会被上一句Block住. 如果不会,实际上是起不到限流作用的... } func RegisterHandlers() *httprouter.Router { router := httprouter.New() router.GET("/videos/:vid-id", streamHandler) router.POST("/upload/:vid-id", uploadHandler) return router } func main() { r := RegisterHandlers() mh := NewMiddleWareHandler(r, 2) http.ListenAndServer(":9000", mh) // 监听所有networking interface的9000端口 } |
提示:
- 加入流控方案
handlers.go - streamHandler
func streamHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) { vid := p.ByName("vid-id") // 获取video id videolink := VIDEO_DIR + vid // 映射到我们的文件夹里 VIDEO_DIR -> "./videos/" // 打开video video, err := os.Open(videolink) // *4 defer video.Close() if err != nil { sendErrorResponse(w, http.StatusInternalServerError, "Internal Error") return } w.Header().Set("Content-Type", "video/mp4") // *2 http.ServeContent(w, r, "", time.Now(), video) // *3 } |
提示:
- Streaming实现方案:
- 方案一:
- 自己将Video的内容格式化成二进制的data stream
- 用stream的方式传到Client端
- 好处:
- 传二进制数据流的速度、带宽均可控
- 坏处:
- 实现复杂
- 方案二:
- 我们的方案
- 通常规模不大的视频点播,都是这样用的
- 方案一:
- 设置Content-Type的必要性
- 我们存的文件可能是没有扩展名的,而实际的二进制码流,是mp4格式的
- 这样设置,浏览器会自动将其作为Video MP4来解析
*3 http.ServeContent(实现流媒体的核心方法) vs io.Copy vs io.Write
func ServeContent(w ResponseWriter, req *Request, name string, modtime time.Time, content io.ReadSeeker)
- 参数:
- 参数一:Response
- 参数二:Request
- 参数三:名字
- 参数四:时间参数,调用了一个IOReadxxx的方法,能保证视频播放的顺利和通畅
- 参数五:Open的视频文件
- 函数说明:
- 该函数使用提供的content参数 - 一个「ReadeSeeker」提供的内容来回复请求
- ReadeSeeker - 一个interface,Reader & Seeker
- Seeker
Seeker接口用于包装基本的移位方法
Seek方法设定下一次读写的位置
- 该函数相对于io.Copy的优点是可以处理范围类请求,设定MIME类型,并且处理了If-Modified-Since请求.
- 如果参数Response的header中,未设定Content-type类型:
- 该函数首先通过文件扩展名来判断类型
- 如果失效的话,读取content的第一块数据并将他传递给DetectContentType进行类型判断,之后会设置Content-Type头
- name不会用于别的地方,甚至于它可以是空字符串,也永远不会发送到response里
- 如果modtime不是Time零值,函数会在Response的Header里设置Last-Modified头
- 如果请求包括一个If-Modified-Since header,该函数利用modtime来决定是否发送该content.
- 参数Content的seek方法必须有效 - 该函数利用Seek功能来决定content的大小
- 该函数使用提供的content参数 - 一个「ReadeSeeker」提供的内容来回复请求
*4 os.Open vs ioutil.ReadAll()
func Open(name string) (file *File, err error)
os.Open打开一个文件用于读取。如果操作成功,返回的文件对象的方法可用于读取数据;对应的文件描述符具有O_RDONLY模式。如果出错,错误底层类型是*PathError。
func ReadAll(r io.Reader) ([]byte, error)
ReadAll从r读取数据直到EOF或遇到error,返回读取的数据和遇到的错误。成功的调用返回的err为nil而非EOF。因为本函数定义为读取r直到EOF,它不会将读取返回的EOF视为应报告的错误。
handlers.go - uploadHandler
// 将Client本地的文件,以流的形式,传到服务端 func uploadHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) { // 静态检查 r.Body = http.MaxBytesReader(w, r.Body, MAX_UPLOAD_SIZE) // *1 1024 * 1024 * 50 = 50MB if err := r.ParseMultipartForm(MAX_UPLOAD_SIZE); err != nil { // *2 sendErrorResponse(w, http.StatusBadRequest, "File is too big") return } // 读表单 file, _, err := r.FormFile("file") // *3 *4 if err != nil { sendErrorResponse(w, http.StatusInternalServerError, "读文件过程中出问题") return } data, err := ioutil.ReadAll(file) // 读出来 if err != nil { log.Printf("Read file error: %v", err) sendErrorResponse(w, http.StatusInternalServerError, "读出错了") } filename := p.ByName("vid-id") err = ioutil.WriteFile(VIDEO_DIR + fn, data, 0666) // path, data, permission 尽量不要用777,权限太大了 if err != nil { log.Printf("Write file error: %v", err) sendErrorResponse(w, http.StatusInternalServerError, "") return } w.WriterHeader(http.StatusCreated) io.WriteString(w, "Upload Successfully") } |
提示:
- MaxBytesReader
- 用来限定最大Bytes,IOReader最大能读到缓冲区的大小是多少
- 参数:
- 参数三:bytes
文档
MaxBytesReader类似io.LimitReader,但它是用来限制接收到的请求的Body的大小的。
不同于io.LimitReader,本函数返回一个ReadCloser,返回值的Read方法在读取的数据超过大小限制时会返回非EOF错误,其Close方法会关闭下层的io.ReadCloser接口r。
MaxBytesReader预防客户端因为意外或者蓄意发送的“大”请求,以避免尺寸过大的请求浪费服务端资源。
- ParseMultipartForm
- HTML的tag里面,有一个表单形式,叫multipart form
- Request里面,有一个专门的方法,去Parse 这个 multipart form的
- 如果超过了参数值,一定会返回一个错误,这是需要返回bad request
- 文档
- ParseMultipartForm将请求的主体作为multipart/form-data解析
- 请求的整个主体都会被解析:
- 得到的文件记录最多maxMemery字节保存在内存
- 其余部分保存在硬盘的temp文件里。
- 如果必要,ParseMultipartForm会自行调用ParseForm
- 重复调用本方法是无意义的
- FormFile
- <form name="file">
- form这个tag里面,有个属性叫name
- form file就是name所对应的值,就是它的key值
- 写前端页面的时候,一定要记得把这个name设置为file,否则会读不出来
- <form name="file">
- _代表的:一个handler
- 里面是整个form的Content的handler
- 我们可以拿到头部,做一些文件类型验证之类的
- 前端也可以做验证,需要把accept="video/*"会自动检测
文件上传解析
拿文件
r.FormFIle
以key为键,查询r.MultipartForm字段,得到结果中的第一个文件和它的信息。
如果必要,本函数会隐式调用ParseMultipartForm和ParseForm。
查询失败会返回ErrMissingFile错误。
r.MultiparForm
MultipartForm是解析好的多部件表单,包括上传的文件。
本字段只有在调用ParseMultipartForm后才有效。
在客户端,会忽略请求中的本字段而使用Body替代。
测试
upload.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Test Upload File</title> </head> <body> <form enctype="multipart/form-data" action="http://localhost:9001/upload/testid" method="post"> <input type="file" name="file" /> <button type="submit" value="Upload File"/> </form> </body> </html> |
温馨提示:
1.需要提前创建好videos/目录,否则路径不存在无法写文件
2.IDEA的run按钮,生成的可执行文件,放置在了项目的根目录里