2-2 Stream模块
Mon, Jul 2, 2018
Stream Server
两个关键知识点
- Upload files
- Token bucket - 限流
特点
静态视频,非RTMP
RTMP:用在直播上的,Client端不断有Input,别的Client端不断有Output。复杂度比较高
Streaming部分:
- 独立的服务,可独立部署
- 统一的api格式

代码
关键点:
- 长连:发一个Request过来,不断向Client输出数据流,按照整个视频的进程走时间
- 流控:多路长连接同时保持的时候,不断发起连接不断打开视频,总有一天服务器会Crash掉。所以我们需要一个「流控机制」
- 我们的「流控」只控制Connection的部分
结构说明
文件结构:
特点:没有需要和db交互的东东
/streamserver
/../main.go
/../handlers.go
/../defs.go
/../limiter.go
结构简单,功能单一,所以不用像api一样那么多层。
limiter.go
流控方案
流控:在网站的Server online的时候,一定有攻击者发起请求。当请求达到一定数量的时候,会导致你所在的Server的连接数不够。如果不仅连接数消耗完、并且带宽占完之后,你的Server就处于不可用的状态。
更可怕的情况:当你的RAM也消耗完,你的系统就会Crash
为了保护系统,需要做流控
bucket token算法
bucket:类似一个箱子,箱子里有好多token 1...n
bucket放在Server里:
- 当一个Request来的时候,就给他发一个Token
- 当Request拿到Response返回之后,我们把Token还回来,Bucket里面的数量会还原一个
- 当Bucket里的数量是个定值的时候,如20,同时进来的Request就最多只有20个。起到了流控的作用
说明:
- 只有拿到token的Request才是合法、会被处理的Request
需要注意:防止bucket的协程安全问题
- 如果加锁,会影响性能 & 违背Golang设计初衷
- 我们使用channel同步协程之间的信息
实现
type ConnLimiter struct {
concurrentConn int
bucket chan int
}
func NewConnLimiter(cc int) *ConnLimiter {
return &ConnLimiter{
concurrentConn: cc,
bucket: make(chan int, cc), // buffer channel *1
}
}
// 获取Token, 返回获取结果
func (cl *ConnLimiter) GetConn() bool {
if len(cl.bucket) >= cl.concurrentConn {
log.Printf("达到上限了")
return false
}
cl.bucket <- 1
return true
}
func (cl *ConnLimiter) ReleaseConn() {
c := <-cl.bucket
log.Printf("来了一个新Token")
} |
提示
- 有了buffer,可以保证在一定的缓冲区间内,做消息同步的事情
- 简单的流控代码实现,却可以起到非常大的作用
response.go
我们提供的这两个服务(上传文件、流媒体播放)主要是传流数据的,所以只有在出错的时候才会用到response
func sendErrorResponse(w http.ResponseWriter, sc int, errMsg string) {
w.WriterHeader(sc)
io.WriterString(w, errMsg)
} |
main
type middleWareHandler struct {
r *httprouter.Router
l *ConnLimiter
}
func NewMiddleWareHandler(r *httprouter.Router cc int) http.Handler {
m := middleWareHandler{}
m.r = r
m.l = NewConnLimiter(cc) // cc: 流控值
return m
}
func (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !m.l.GetConn() {
sendErrorResponse(w, http.StatusTooManyRequest, "Too many request")
return
}
m.r.ServerHTTTP(w, r)
// 把Token还回去
m.l.ReleaseConn() // TODO: 测试下这句,会不会被上一句Block住. 如果不会,实际上是起不到限流作用的...
}
func RegisterHandlers() *httprouter.Router {
router := httprouter.New()
router.GET("/videos/:vid-id", streamHandler)
router.POST("/upload/:vid-id", uploadHandler)
return router
}
func main() {
r := RegisterHandlers()
mh := NewMiddleWareHandler(r, 2)
http.ListenAndServer(":9000", mh) // 监听所有networking interface的9000端口
} |
提示:
- 加入流控方案
handlers.go - streamHandler
func streamHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
vid := p.ByName("vid-id") // 获取video id
videolink := VIDEO_DIR + vid // 映射到我们的文件夹里 VIDEO_DIR -> "./videos/"
// 打开video
video, err := os.Open(videolink) // *4
defer video.Close()
if err != nil {
sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
return
}
w.Header().Set("Content-Type", "video/mp4") // *2
http.ServeContent(w, r, "", time.Now(), video) // *3
} |
提示:
- Streaming实现方案:
- 方案一:
- 自己将Video的内容格式化成二进制的data stream
- 用stream的方式传到Client端
- 好处:
- 传二进制数据流的速度、带宽均可控
- 坏处:
- 实现复杂
- 方案二:
- 我们的方案
- 通常规模不大的视频点播,都是这样用的
- 方案一:
- 设置Content-Type的必要性
- 我们存的文件可能是没有扩展名的,而实际的二进制码流,是mp4格式的
- 这样设置,浏览器会自动将其作为Video MP4来解析
*3 http.ServeContent(实现流媒体的核心方法) vs io.Copy vs io.Write
func ServeContent(w ResponseWriter, req *Request, name string, modtime time.Time, content io.ReadSeeker)
- 参数:
- 参数一:Response
- 参数二:Request
- 参数三:名字
- 参数四:时间参数,调用了一个IOReadxxx的方法,能保证视频播放的顺利和通畅
- 参数五:Open的视频文件
- 函数说明:
- 该函数使用提供的content参数 - 一个「ReadeSeeker」提供的内容来回复请求
- ReadeSeeker - 一个interface,Reader & Seeker
- Seeker
Seeker接口用于包装基本的移位方法
Seek方法设定下一次读写的位置
- 该函数相对于io.Copy的优点是可以处理范围类请求,设定MIME类型,并且处理了If-Modified-Since请求.
- 如果参数Response的header中,未设定Content-type类型:
- 该函数首先通过文件扩展名来判断类型
- 如果失效的话,读取content的第一块数据并将他传递给DetectContentType进行类型判断,之后会设置Content-Type头
- name不会用于别的地方,甚至于它可以是空字符串,也永远不会发送到response里
- 如果modtime不是Time零值,函数会在Response的Header里设置Last-Modified头
- 如果请求包括一个If-Modified-Since header,该函数利用modtime来决定是否发送该content.
- 参数Content的seek方法必须有效 - 该函数利用Seek功能来决定content的大小
- 该函数使用提供的content参数 - 一个「ReadeSeeker」提供的内容来回复请求
*4 os.Open vs ioutil.ReadAll()
func Open(name string) (file *File, err error)
os.Open打开一个文件用于读取。如果操作成功,返回的文件对象的方法可用于读取数据;对应的文件描述符具有O_RDONLY模式。如果出错,错误底层类型是*PathError。
func ReadAll(r io.Reader) ([]byte, error)
ReadAll从r读取数据直到EOF或遇到error,返回读取的数据和遇到的错误。成功的调用返回的err为nil而非EOF。因为本函数定义为读取r直到EOF,它不会将读取返回的EOF视为应报告的错误。
handlers.go - uploadHandler
// 将Client本地的文件,以流的形式,传到服务端
func uploadHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
// 静态检查
r.Body = http.MaxBytesReader(w, r.Body, MAX_UPLOAD_SIZE) // *1 1024 * 1024 * 50 = 50MB
if err := r.ParseMultipartForm(MAX_UPLOAD_SIZE); err != nil { // *2
sendErrorResponse(w, http.StatusBadRequest, "File is too big")
return
}
// 读表单
file, _, err := r.FormFile("file") // *3 *4
if err != nil {
sendErrorResponse(w, http.StatusInternalServerError, "读文件过程中出问题")
return
}
data, err := ioutil.ReadAll(file) // 读出来
if err != nil {
log.Printf("Read file error: %v", err)
sendErrorResponse(w, http.StatusInternalServerError, "读出错了")
}
filename := p.ByName("vid-id")
err = ioutil.WriteFile(VIDEO_DIR + fn, data, 0666) // path, data, permission 尽量不要用777,权限太大了
if err != nil {
log.Printf("Write file error: %v", err)
sendErrorResponse(w, http.StatusInternalServerError, "")
return
}
w.WriterHeader(http.StatusCreated)
io.WriteString(w, "Upload Successfully")
} |
提示:
- MaxBytesReader
- 用来限定最大Bytes,IOReader最大能读到缓冲区的大小是多少
- 参数:
- 参数三:bytes
文档
MaxBytesReader类似io.LimitReader,但它是用来限制接收到的请求的Body的大小的。
不同于io.LimitReader,本函数返回一个ReadCloser,返回值的Read方法在读取的数据超过大小限制时会返回非EOF错误,其Close方法会关闭下层的io.ReadCloser接口r。
MaxBytesReader预防客户端因为意外或者蓄意发送的“大”请求,以避免尺寸过大的请求浪费服务端资源。
- ParseMultipartForm
- HTML的tag里面,有一个表单形式,叫multipart form
- Request里面,有一个专门的方法,去Parse 这个 multipart form的
- 如果超过了参数值,一定会返回一个错误,这是需要返回bad request
- 文档
- ParseMultipartForm将请求的主体作为multipart/form-data解析
- 请求的整个主体都会被解析:
- 得到的文件记录最多maxMemery字节保存在内存
- 其余部分保存在硬盘的temp文件里。
- 如果必要,ParseMultipartForm会自行调用ParseForm
- 重复调用本方法是无意义的
- FormFile
- <form name="file">
- form这个tag里面,有个属性叫name
- form file就是name所对应的值,就是它的key值
- 写前端页面的时候,一定要记得把这个name设置为file,否则会读不出来
- <form name="file">
- _代表的:一个handler
- 里面是整个form的Content的handler
- 我们可以拿到头部,做一些文件类型验证之类的
- 前端也可以做验证,需要把accept="video/*"会自动检测
文件上传解析
拿文件
r.FormFIle
以key为键,查询r.MultipartForm字段,得到结果中的第一个文件和它的信息。
如果必要,本函数会隐式调用ParseMultipartForm和ParseForm。
查询失败会返回ErrMissingFile错误。
r.MultiparForm
MultipartForm是解析好的多部件表单,包括上传的文件。
本字段只有在调用ParseMultipartForm后才有效。
在客户端,会忽略请求中的本字段而使用Body替代。
测试
upload.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Test Upload File</title>
</head>
<body>
<form enctype="multipart/form-data" action="http://localhost:9001/upload/testid" method="post">
<input type="file" name="file" />
<button type="submit" value="Upload File"/>
</form>
</body>
</html> |
温馨提示:
1.需要提前创建好videos/目录,否则路径不存在无法写文件
2.IDEA的run按钮,生成的可执行文件,放置在了项目的根目录里