Go语言在音视频领域的技术应用

「这是我参与11月更文挑战的第8天,活动详情查看:2021 最后一次更文挑战

简介

流媒体如今已经成为工业上的一个重要技术了,比如:直播网站、视频监控传输、APP直播等,如何实现一个高并发的视频网站,这就涉及到语言技术的选型以及流媒体技术的使用,本节将主要介绍如何使用Golang来实现一个流媒体视频网站。

大纲

  • 背景
  • 为什么选择Go以及Go的一些优势
  • GoLang的简介以及实现一个webserver工具链
  • Golang的channel并发模式
  • 用Golang完成一个流媒体网站
  • 网站部署

背景

为什么选择Go以及Go的一些优势

为什么会选择Go来开发视频网站呢?这其实主要体现在Go语言的优势。那么Go有哪些优势呢?

  • 开发效率高,不管用其他语言,都需要很多其他的配置或插件,就连全家桶配套齐全的Java语言都会需要一个Servlet引擎,如:tomcat、jetty等。但Go在这方面,提供了得天独厚的能力。大部分功能和内容已经集成在了pkg。包括开发完整的开发工具链(tools、test、benchmark、builtin.etc),包括Go命令(go test、go install、go build)。这些都是完整的,直接下载Go后即可使用。
  • 另一方面,部署简单,go属于编译性语言,而且是能够编译多个平台可执行文件的语言。Compile once,run everywhere,直接编译后生成二进制文件,直接运行。
  • 良好的native http库、集成模板引擎,无需添加第三方框架。

GoLang的简介以及实现一个webserver工具链

Go语言是一种编译性语言,而且它的目标是兼具Python等动态语言的开发速度和集成C/C++等编译语言的性能与安全性。

Go中有一些常见的工具链,比如:

  • go build,编译go文件,可以跨平台编译:env GOOS=linux GOARCH=amd64 go build,在CI/CD中,这是一个非常有用的命令。
  • go install,这也是编译,但与build的区别是编译后将输出文件打包成库放在pkg下。
  • go get,用于获取go的第三方包,常见的是:go get -u git地址,表示从git上获取某个资源并安装到本地。
  • go fmt,统一代码风格、排版。
  • go test,运行当前目录下的tests,“go test -v” 会打印所有的信息。
  • go的test文件一般以XXX_test.go命名

要点:

  • 使用TestMain作为初始化test,并且使用Run()来调用其它tests可以完成一些需要初始化操作的testing,如:数据库、文件加载等。
func TestMain(m *testing.M) {
    fmt.Println("Test begin")
    m.Run()
}
复制代码
  • 如果没在其中加Run(),除了TestMain的其它的tests都不被执行。
func TestPrint(t *testing.T) {
    fmt.Println("Test print")
}

func TestMain(m *testing.M) {
    fmt.Println("Test begin")
    //m.Run()
}
复制代码

按照上面说的,如果没有执行Run()方法,则TestPrint函数不会被执行。

Golang的channel并发模式

在 Go 中,既然有了协程,那么这些协程之间如何通信呢?Go 提供了一个 channel(通道) 来解决。

声明一个 channel

在 Go 语言中,声明一个 channel 非常简单,使用内置的 make 函数即可,如下:

ch:=make(chan string)
复制代码

其中 chan 是一个关键字,表示是 channel 类型。后面的 string 表示 channel 里的数据是 string 类型。通过 channel 的声明也可以看到,chan 是一个集合类型。

定义好 chan 后就可以使用了,一个 chan 的操作只有两种:发送和接收:

  • 发送:向 chan 发送值,把值放在 chan 中,操作符为 chan <-

  • 接收:获取 chan 中的值,操作符为 <- chan

示例:

package main

import "fmt"

func main() {

	ch := make(chan string)

	go func() {

		fmt.Println("码疯窝在香嗝喱辣")

		ch <- "发送数据者:码疯窝在香嗝喱辣"

	}()

	fmt.Println("I am main goroutine")

	v := <- ch

	fmt.Println("接收到的chan中的值为:",v)

}
复制代码

我们先来执行看看打印结果:

I am main goroutine

码疯窝在香嗝喱辣

接收到的chan中的值为:送数据者:码疯窝在香嗝喱辣
复制代码

从运行结果可以看出:达到了使用 time.Sleep 函数的效果。

相信应该明白为什么程序不会在新的 goroutine 完成之前退出了,因为通过 make 创建的 chan 中没有值,而 main goroutine 又想从 chan 中获取值,获取不到就一直等待,等到另一个 goroutine 向 chan 发送值为止。

无缓冲 channel

上面的示例中,使用 make 创建的 chan 就是一个无缓冲 channel,它的容量是 0,不能存储任何数据。所以无缓冲 channel 只起到传输数据的作用,数据并不会在 channel 中做任何停留。这也意味着,无缓冲 channel 的发送和接收操作是同时进行的,它也被称为同步 channel。

有缓冲 channel

有缓冲 channel 类似一个可阻塞的队列,内部的元素先进先出。通过 make 函数的第二个参数可以指定 channel 容量的大小,进而创建一个有缓冲 channel,如:

cacheCh := make(chan int,5)
复制代码

定义了一个容量为 5 的元素为 int 类型的 chan。

一个有缓冲 channel 具备以下特点:

  • 有缓冲 channel 的内部有一个缓冲队列

  • 发送操作是向队列的尾部插入元素,如果队列已满,则阻塞等待,直到另一个 goroutine 执行,接收操作释放队列的空间

  • 接收操作是从队列的头部获取元素并把它从队列中删除,如果队列为空,则阻塞等待,直到另一个 goroutine 执行,发送操作插入新的元素

cache := make(chan int,5)

cache <- 2

cache <- 3

fmt.Println("容量:",cap(cache),",元素个数:",len(cache))
复制代码

无缓冲 channel 其实就是一个容量大小为 0 的 channel。比如 make(chan int,0)

关闭 channel

通过内置函数 close 即可关闭 channel。如果一个 channel 被关闭了,就不能向里面发送数据了,如果发送的话,会引起 painc 异常。但是还可以接收 channel 里的数据,如果 channel 里没有数据的话,接收的数据是元素类型的零值。

单向 channel

所谓单向,即可要不发送,要么只能接收。所以单向 channel 的声明也很简单,只需要在声明的时候带上 <- 操作符即可,如下:

send := make(chan <- int)
receive := make(<- chan int)
复制代码

用Golang完成一个流媒体网站

业务模块

API接口设计

  • 分层
  • Restful风格设计
  • CRUD区分资源操作
  • 返回码规范

首先,我们写个启动类:

package main 

import (
	"net/http"
	"github.com/julienschmidt/httprouter"
)

type middleWareHandler struct {
	r *httprouter.Router
}

func NewMiddleWareHandler(r *httprouter.Router) http.Handler {
	m := middleWareHandler{}
	m.r = r
	return m
}

func (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	//check session
	validateUserSession(r)

	m.r.ServeHTTP(w, r)
}

func RegisterHandlers() *httprouter.Router {
	router := httprouter.New()

	router.POST("/user", CreateUser)

	router.POST("/user/:user_name", Login)

	return router
}

func main() {
	r := RegisterHandlers()
	mh := NewMiddleWareHandler(r)
	http.ListenAndServe(":1000", mh)
}
复制代码

在这里我们实现了注册、登录以及一些初始化监听端口。接下来,我们需要看看对于后端视频处理时,主要关心的是session:

package session

import (
	"time"
	"sync"
	"github.com/avenssi/video_server/api/defs"
	"github.com/avenssi/video_server/api/dbops"
	"github.com/avenssi/video_server/api/utils"
)

var sessionMap *sync.Map 

func init() {
	sessionMap = &sync.Map{}
}

func nowInMilli() int64{
	return time.Now().UnixNano()/1000000
}

func deleteExpiredSession(sid string) {
	sessionMap.Delete(sid)
	dbops.DeleteSession(sid)
}

func LoadSessionsFromDB() {
	r, err := dbops.RetrieveAllSessions()
	if err != nil {
		return
	}

	r.Range(func(k, v interface{}) bool{
		ss := v.(*defs.SimpleSession)
		sessionMap.Store(k, ss)
		return true
	})
}

func GenerateNewSessionId(un string) string {
	id, _ := utils.NewUUID()
	ct := nowInMilli()
	ttl := ct + 30 * 60 * 1000// Severside session valid time: 30 min

	ss := &defs.SimpleSession{Username: un, TTL: ttl}
	sessionMap.Store(id, ss)
	dbops.InsertSession(id, ttl, un)

	return id
}

func IsSessionExpired(sid string) (string, bool) {
	ss, ok := sessionMap.Load(sid)
	if ok {
		ct := nowInMilli()
		if ss.(*defs.SimpleSession).TTL < ct {
			deleteExpiredSession(sid)
			return "", true
		}

		return ss.(*defs.SimpleSession).Username, false
	}

	return "", true
}
复制代码

从上面的代码中,可以看到,Go主要引用了相关的视频插件库:avenssi/video_server等来处理缓存session。这也是为什么选择go开发后端的一个原因。

同时,我们还定义了一个错误码信息:

package defs

type Err struct {
	Error string `json:"error"`
	ErrorCode string `json:"error_code"`  
}

type ErrResponse struct {
	HttpSC int
	Error Err
}

var (
	ErrorRequestBodyParseFailed = ErrResponse{HttpSC: 400, Error: Err{Error: "Request body is not correct", ErrorCode: "001"}}
	ErrorNotAuthUser = ErrResponse{HttpSC: 401, Error: Err{Error: "User authentication failed.", ErrorCode: "002"}}
	ErrorDBError = ErrResponse{HttpSC: 500, Error: Err{Error: "DB ops failed", ErrorCode: "003"}}
	ErrorInternalFaults = ErrResponse{HttpSC: 500, Error: Err{Error: "Internal service error", ErrorCode: "004"}}
)
复制代码

以上对于业务层中处理主要逻辑就是这些了,下面主要讲scheduler和stream server。

scheduler

scheduler主要是来调度任务,那么主要是哪些任务呢?主要是那些普通api无法立即给结果的任务。比如:我们视频网站需要一些视频审核、数据恢复的需求。这时候,我们需要做一些short delay,用户看不到,但后台还是存在的。这就需要scheduler异步处理。还比如有些周期性的任务。

在Scheduler中,还存在Timer,定时器主要用来作定时处理task的。

在本小节中,我们采用runner的生产、消费者模式实现。具体代码如下:

package taskrunner

import (
)

type Runner struct {
	Controller controlChan
	Error controlChan
	Data dataChan
	dataSize int
	longLived bool
	Dispatcher fn 
	Executor fn
}

func NewRunner(size int, longlived bool, d fn, e fn) *Runner {
	return &Runner {
		Controller: make(chan string, 1),
		Error: make(chan string, 1),
		Data: make(chan interface{}, size),
		longLived: longlived,
		dataSize: size,
		Dispatcher: d,
		Executor: e,
	}
}

func (r *Runner) startDispatch() {
	defer func() {
		if !r.longLived {
			close(r.Controller)
			close(r.Data)
			close(r.Error)
		}
	}()

	for {
		select {
		case c :=<- r.Controller:
			if c == READY_TO_DISPATCH {
				err := r.Dispatcher(r.Data)
				if err != nil {
					r.Error <- CLOSE
				} else {
					r.Controller <- READY_TO_EXECUTE
				}
			}

			if c == READY_TO_EXECUTE {
				err := r.Executor(r.Data)
				if err != nil {
					r.Error <- CLOSE
				} else {
					r.Controller <- READY_TO_DISPATCH
				}
			}
		case e :=<- r.Error:
			if e == CLOSE {
				return
			}
		default:

		}
	}
}

func (r *Runner) StartAll() {
	r.Controller <- READY_TO_DISPATCH
	r.startDispatch()
}
复制代码

Runner是可以复用的,而接下来介绍的Task是定制Runner的。比如:我们延迟删除视频。

我们先拿到数据,看看:

package dbops

import (
	"log"
	_ "github.com/go-sql-driver/mysql"
)

func ReadVideoDeletionRecord(count int) ([]string, error) {
	stmtOut, err := dbConn.Prepare("SELECT video_id FROM video_del_rec LIMIT ?")

	var ids []string

	if err != nil {
		return ids, err
	}

	rows, err := stmtOut.Query(count)
	if err != nil {
		log.Printf("Query VideoDeletionRecord error: %v", err)
		return ids, err
	}

	for rows.Next() {
		var id string
		if err := rows.Scan(&id); err != nil {
			return ids, err
		}

		ids = append(ids, id)
	}

	defer stmtOut.Close()
	return ids, nil
}

func DelVideoDeletionRecord(vid string) error {
	stmtDel, err := dbConn.Prepare("DELETE FROM video_del_rec WHERE video_id=?")
	if err != nil {
		return err
	}

	_, err = stmtDel.Exec(vid)
	if err != nil {
		log.Printf("Deleting VideoDeletionRecord error: %v", err)
		return err
	}

	defer stmtDel.Close()
	return nil
}
复制代码

拿到数据后,需要处理,这时需要task:

package taskrunner

import (
	"os"
	"errors"
	"log"
	"sync"
	"github.com/avenssi/video_server/scheduler/dbops"
)

func deleteVideo(vid string) error {
	err := os.Remove(VIDEO_PATH + vid)

	if err != nil && !os.IsNotExist(err) {
		log.Printf("Deleting video error: %v", err)
		return err
	}

	return nil
}

func VideoClearDispatcher(dc dataChan) error {
	res, err := dbops.ReadVideoDeletionRecord(3)
	if err != nil {
		log.Printf("Video clear dispatcher error: %v", err)
		return err
	}

	if len(res) == 0 {
		return errors.New("All tasks finished")
	}

	for _, id := range res {
		dc <- id
	}

	return nil
}

func VideoClearExecutor(dc dataChan) error {
	errMap := &sync.Map{}
	var err error

	forloop:
		for {
			select {
			case vid :=<- dc:
				go func(id interface{}) {
					if err := deleteVideo(id.(string)); err != nil {
						errMap.Store(id, err)
						return
					}
					if err := dbops.DelVideoDeletionRecord(id.(string)); err != nil {
						errMap.Store(id, err)
						return 
					}
				}(vid)
			default:
				break forloop
			}
		}

	errMap.Range(func(k, v interface{}) bool {
		err = v.(error)
		if err != nil {
			return false
		}
		return true
	})

	return err
}
复制代码

以上就是关于异步、定时处理视频流信息过程。

stream server

  • Streaming

  • Upload files

Streaming主要区别于普通的链接,它需要保持长链接,与短链接是不一样的,当发送一个request过来,会不断与客户端输出数据流,而且会很长。所以在多路长链接同时保持的时候,出现一个问题,如果不断的发起链接、打开网页,最终会把我们的服务给crash掉,所以,我们需要进行流控:limit,这里的流控可能只在connect时候进行限制。

package main 

import (
	"log"
)

type ConnLimiter struct {
	concurrentConn int
	bucket chan int
}

func NewConnLimiter(cc int) *ConnLimiter {
	return &ConnLimiter {
		concurrentConn: cc,
		bucket: make(chan int, cc),
	}
}

func (cl *ConnLimiter) GetConn() bool {
	if len(cl.bucket) >= cl.concurrentConn {
		log.Printf("Reached the rate limitation.")
		return false
	}

	cl.bucket <- 1
	return true
}

func (cl *ConnLimiter) ReleaseConn() {
	c :=<- cl.bucket
	log.Printf("New connction coming: %d", c)
}
复制代码

加了流控后,我们需要在http middleware中嵌入流控,同样,我们在启动时,都需要注册router以及http server,所以代码如下:

package main 

import (
	"net/http"
	"github.com/julienschmidt/httprouter"
)

type middleWareHandler struct {
	r *httprouter.Router
	l *ConnLimiter
}

func NewMiddleWareHandler(r *httprouter.Router, cc int) http.Handler {
	m := middleWareHandler{}
	m.r = r
	m.l = NewConnLimiter(cc)
	return m
}

func RegisterHandlers() *httprouter.Router {
	router := httprouter.New()

	router.GET("/videos/:vid-id", streamHandler)

	router.POST("/upload/:vid-id", uploadHandler)

	router.GET("/testpage", testPageHandler)

	return router
}

func (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	if !m.l.GetConn() {
		sendErrorResponse(w, http.StatusTooManyRequests, "Too many requests")
		return
	}

	m.r.ServeHTTP(w, r)
	defer m.l.ReleaseConn()
}

func main() {
	r := RegisterHandlers()
	mh := NewMiddleWareHandler(r, 2)
	http.ListenAndServe(":2000", mh)
}
复制代码

最后,我们来看看streamHandler如何来处理:

func streamHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
	vid := p.ByName("vid-id")
	vl := VIDEO_DIR + vid

	video, err := os.Open(vl)
	if err != nil {
		log.Printf("Error when try to open file: %v", err)
		sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
		return
	}

	w.Header().Set("Content-Type", "video/mp4")
	http.ServeContent(w, r, "", time.Now(), video)

	defer video.Close()
}
复制代码

我们这里采用比较通用的做法:在拿到流唯一信息后,直接处理。

Upload files时,我们需要做静态检查,然后把数据从中读取:

func uploadHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
	r.Body = http.MaxBytesReader(w, r.Body, MAX_UPLOAD_SIZE)
	if err := r.ParseMultipartForm(MAX_UPLOAD_SIZE); err != nil {
		sendErrorResponse(w, http.StatusBadRequest, "File is too big")
		return 
	}

	file, _, err := r.FormFile("file")
	if err != nil {
		log.Printf("Error when try to get file: %v", err)
		sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
		return 
	}

	data, err := ioutil.ReadAll(file)
	if err != nil {
		log.Printf("Read file error: %v", err)
		sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
	}

	fn := p.ByName("vid-id")
	err = ioutil.WriteFile(VIDEO_DIR + fn, data, 0666)
	if err != nil {
		log.Printf("Write file error: %v", err)
		sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
		return
	}

	w.WriteHeader(http.StatusCreated)
	io.WriteString(w, "Uploaded successfully")
}
复制代码

网站部署

先想给之前的代码进行编译打包:

FROM ubuntu:16.04 as build

ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone

RUN apt-get update && apt-get install -y --no-install-recommends \
        g++ \
        ca-certificates \
        wget && \
    rm -rf /var/lib/apt/lists/*

ENV GOLANG_VERSION 1.15.1
RUN wget -nv -O - https://studygolang.com/dl/golang/go1.15.1.linux-amd64.tar.gz \
     | tar -C /usr/local -xz


ENV GOPROXY=https://goproxy.cn,direct
ENV GO111MODULE=on
ENV GOPATH /go
ENV PATH $GOPATH/bin:/usr/local/go/bin:$PATH

WORKDIR /go/src
COPY . .
WORKDIR /go/src/video-service
RUN  sed -i "/runmode/crunmode=pro" /go/src/video-service/conf/app.conf
RUN export CGO_LDFLAGS_ALLOW='-Wl,--unresolved-symbols=ignore-in-object-files' && \
    go install -ldflags="-s -w" -v /go/src/video-service

FROM ubuntu:16.04
WORKDIR /video-service

RUN mkdir -p log
COPY --from=build /go/bin/video-service /video-service
CMD ["./video-service"]
复制代码

接下来,补充部署脚本:

---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  labels:
    app: video-service
  name: video-service
  namespace: system-server
spec:
  replicas: 1
  selector:
    matchLabels:
      app: video-service
  template:
    metadata:
      labels:
        app: video-service
    spec:
      containers:
        - image: {{ cluster_cfg['cluster']['docker-registry']['prefix'] }}video-service
          imagePullPolicy: Always
          name: video-service
          ports:
            - containerPort: 1000
          #livenessProbe:
            #httpGet:
              #path: /api/v1/healthz
              #port: 1000
              #scheme: HTTP
            #initialDelaySeconds: 15
            #periodSeconds: 10
            #timeoutSeconds: 3
            #failureThreshold: 5
          volumeMounts:
            - name: video-service-config
              mountPath: /video-service/conf
      volumes:
        - name: video-service-config
          configMap:
            name: video-service-config
      nodeSelector:
        video-service: "true"
      restartPolicy: Always
复制代码

执行编译命令:

sh build/build.sh
kubectl create -f deploy.yml
复制代码

这里使用K8s部署到机器上。
部署后的服务访问地址:

10.11.3.4:1000
复制代码