此篇文章主要描述 监控服务端(video_server) 开发部分;
//
// @Description: 处理视频帧
//
func VideoHandler() {//从摄像头的UDP客户端接收到的待处理的图片切片channel,容量为100,超过处理不了就丢弃ImgReceiveChannel := make(chan []byte, 100)//将图片数据转发给用户客户端进行直播的 图片切片channel,容量为20,超过处理不了就丢弃ImgSendChannel := make(chan []byte, 20)// 将消息发送到mqtt服务器的channel,容量为2MqttMessagesChannel := make(chan []string, 2)//udp服务接收及发送数据go udp_service.UDPReceive(ImgReceiveChannel, ImgSendChannel)go udp_service.UDPSend(ImgSendChannel)//发布消息到mqtt协议服务的broker(emqx)go mqtt_service.MqttPublish(MqttMessagesChannel)//处理视频帧数据Convert2IMG(ImgReceiveChannel, MqttMessagesChannel)
}
var SendToClient bool
var SendToClientAddr *net.UDPAddr//
// @Description: 接收UDP数据,并发送给ImgReceiveChannel,需要直播时,再将数据发送给ImgSendChannel,当channel缓存满时,丢弃数据
// @param ImgReceiveChannel: 从摄像头的UDP客户端接收到的待处理的图片切片channel,容量为100,超过处理不了就丢弃
// @param ImgSendChannel: 将图片数据转发给用户客户端进行直播的 图片切片channel,容量为20,超过处理不了就丢弃
//
func UDPReceive(ImgReceiveChannel chan<- []byte, ImgSendChannel chan<- []byte) {SendToClient = falsefor {//定义一个切片sliceData := make([]byte, 65500)//将读取的数据存到数组中n, addr, err := UDPListen.ReadFromUDP(sliceData)if err != nil {glog.Log.Error(fmt.Sprintf("读取UDP数据失败,err:%v", err))continue}if n == 0 || n > 65500 {continue}//截取前10个字符串messageType := string(sliceData[:10])imgData := sliceData[10:]if messageType == "clientPlay" {glog.Log.Info("接收到客户端的直播请求")SendToClient = trueSendToClientAddr = addr} else if messageType == "clientStop" {SendToClient = false} else if messageType == "cameraSend" {glog.Log.Info("接收到图片数据")select {case ImgReceiveChannel <- imgData:glog.Log.Info("发送数据到图片处理channel")default:glog.Log.Info("发送数据到图片处理channel阻塞,不发送")}}if SendToClient && messageType == "cameraSend" {select {case ImgSendChannel <- imgData:glog.Log.Info("发送数据到转发channel")default:glog.Log.Info("发送数据到转发channel阻塞,不发送")}}}
}
//
// @Description: 发送UDP数据给客户端(直播使用),当channel缓存满时,丢弃数据
// @param ImgSendChannel:
//
func UDPSend(ImgSendChannel <-chan []byte) {for {Img, ok := <-ImgSendChannelif !ok {glog.Log.Info("发送数据到图片处理channel关闭")break}glog.Log.Info("发送直播数据到客户端")_, err := UDPListen.WriteToUDP(Img, SendToClientAddr)if err != nil {panic(fmt.Sprintf("转发UDP数据失败 err:%v", err))}}
}
import ("fmt""gocv.io/x/gocv""image""image/color""time""video_server/pkg/glog""video_server/pkg/gtime""video_server/pkg/mqtt_service""video_server/pkg/udp_service""video_server/pkg/utils"
)const MqttTopic string = "camera_frq"
const TimeCycle time.Duration = 60
const NoDiffTimesCount int = 3//
// @Description: 处理图片的协程,功能如下:
// 用opencv进行动态变化比对,将变化的视频帧存入视频文件;
// 根据某时间段内 视频帧有无变化,动态变更esp32-cam发送视频帧频率
// @param ImgChannel:从摄像头的UDP客户端接收到的待处理的图片切片channel,容量为100,超过处理不了就丢弃
// @param MqttMessagesChannel:将消息发送到mqtt服务器的channel,容量为2
//
func Convert2IMG(ImgChannel <-chan []byte, MqttMessagesChannel chan<- []string) {redColor := color.RGBA{255, 0, 0, 0}es := gocv.GetStructuringElement(gocv.MorphEllipse, image.Point{9, 4})//摄像头发送图片频率 0:高 1:低sendImgFrequency := utils.SendImgFrequencyHigh// 3个固定时间段内 图片都相同,表示环境无变化,通过mqtt协议通知cam降低发送帧率(节约资源[电力,带宽]), 如果 后续 检测到图片变化,则再次通知其恢复频率;noDiffTimes := 0//一直死循环for {var writer *gocv.VideoWriter//定义一个运行的时间周期timeCycle := gtime.TimeCycle(TimeCycle)stop := false//基准图片,作用为 检测视频帧是否变化的基准图片basicImg := gocv.NewMat()//存入到磁盘的视频文件writerfileName := fmt.Sprintf(utils.VideoFileFmt, gtime.GetCurrentTime())//视频帧之间是否不同(检测监控环境是否有变化)isDiffImg := false//此for循环保证timeCycle的channel一直运行for {select {case _ = <-timeCycle:glog.Log.Debug("执行时间周期耗尽")stop = truedefault://开始处理 视频帧tempImg, ok := <-ImgChannelif !ok {glog.Log.Info("ImgChannel关闭,停止循环")break}//从数组中读取 前n个有效长度的数据,也就是 一张图片不会超过10万个byte,而n的值就是 读取图片的byte个数img, err := gocv.IMDecode(tempImg, gocv.IMReadColor)if err != nil {glog.Log.Error(fmt.Sprintf("解析图片数据失败,err:%v", err))break}//当basicImg没有赋值时,对其进行赋值if basicImg.Empty() {basicImg = img//图片转为黑白色gocv.CvtColor(basicImg, &basicImg, gocv.ColorBGRToGray)// 高斯模糊,image.Point的值 越大,则越模糊,注意 值除以2余数必须为1gocv.GaussianBlur(basicImg, &basicImg, image.Point{21, 21}, 0, 0, gocv.BorderDefault)writer, err = gocv.VideoWriterFile(fileName, "MJPG", 10, basicImg.Cols(), basicImg.Rows(), true)if err != nil {fmt.Printf("error opening video writer device: %v\n", fileName)return}continue}//进行图片处理//每隔固定时间 获取基准帧数据//将现有图片进行比对,不同就存为mp4//若 3个固定时间段内 图片都相同,表示环境无变化,通过mqtt协议通知cam降低发送帧率(节约资源[电力,带宽]), 如果 后续 检测到图片变化,则再次通知其恢复频率;// gocv比对图片//图片转为黑白色gray_frame := gocv.NewMat()gocv.CvtColor(img, &gray_frame, gocv.ColorBGRToGray)// 高斯模糊,image.Point的值 越大,则越模糊,注意 值除以2余数必须为1gocv.GaussianBlur(gray_frame, &gray_frame, image.Point{21, 21}, 0, 0, gocv.BorderDefault)//对比2个图片不同点diffImg := gocv.NewMat()gocv.AbsDiff(basicImg, gray_frame, &diffImg)gocv.Threshold(diffImg, &diffImg, 25, 255, gocv.ThresholdBinary)gocv.Dilate(diffImg, &diffImg, es)cnts := gocv.FindContours(diffImg, gocv.RetrievalExternal, gocv.ChainApproxSimple)for i := 0; i < cnts.Size(); i++ {timeCycle := cnts.At(i)if gocv.ContourArea(timeCycle) < 1500 {continue}isDiffImg = truegocv.Rectangle(&img, gocv.BoundingRect(timeCycle), redColor, 1)}//if !isDiffImg {// glog.Log.Info("没有不同点,跳过此视屏帧")// continue//}//检测到不同点if isDiffImg && sendImgFrequency != utils.SendImgFrequencyHigh {glog.Log.Debug("检测到不同点,恢复发送图片频率")sendImgFrequency = utils.SendImgFrequencyHighSendMqttMessageToChannel(MqttMessagesChannel, "0")}gocv.PutText(&img, gtime.GetCurrentTime(), image.Point{10, 20}, gocv.FontHersheyComplexSmall, 1, redColor, 1)//存储到mp4中err = writer.Write(img)if err != nil {panic(fmt.Sprintf("视频写入磁盘失败 %v", err))}glog.Log.Debug("图片解析成功,开始展示")}//时间周期已到,收尾工作if stop {glog.Log.Debug("停止执行任务,时间耗尽")//视频writer收尾,保证视频文件保存正常err := writer.Close()if err != nil {glog.Log.Error("关闭视频失败....")}glog.Log.Debug("已经关闭视频")break}}if !isDiffImg {noDiffTimes += 1}//降低摄像头发送频率if noDiffTimes > NoDiffTimesCount {if sendImgFrequency != utils.SendImgFrequencyLow {glog.Log.Debug("检测到环境最近无变化,调低摄像头发送频率")sendImgFrequency = utils.SendImgFrequencyLowSendMqttMessageToChannel(MqttMessagesChannel, "1")}noDiffTimes = 0}}
}//
// @Description:
// @param MqttMessagesChannel: mqtt消息发送到channel中
// @param message:
//
func SendMqttMessageToChannel(MqttMessagesChannel chan<- []string, message string) {select {case MqttMessagesChannel <- []string{MqttTopic, message}:glog.Log.Info("发送消息到mqtt成功")default:glog.Log.Info("mqtt channel缓存已满,丢弃此消息")}
}
//
// @Description: 发布消息到topic
// @param client:
// @param topic:
// @param message:
//
func MqttPublish(MqttMessagesChannel <-chan []string) {for {messages, ok := <-MqttMessagesChannelif !ok {glog.Log.Info("MqttMessagesChannel channel关闭,此mqtt不再发布消息")break}topic := messages[0]message := messages[1]fmt.Printf("开始发布消息:topic:%v,message:%v\n", topic, message)token := (*MqttClient).Publish(topic, 2, true, message)if token.Wait() && token.Error() != nil {panic(fmt.Sprintf("发布消息失败:topic:%v,message:%v,err:%v", topic, message, token.Error()))}fmt.Printf("发布消息成功:topic:%v,message:%v\n", topic, message)}
}
package video_actionimport ("fmt""github.com/gin-gonic/gin""strings""video_server/pkg/api_error""video_server/pkg/app""video_server/pkg/gtime""video_server/pkg/utils"
)//
// QueryFields
// @Description: 入参结构体
//
type QueryFields struct {StartTime string `json:"start_time" binding:"required"`EndTime string `json:"end_time" binding:"required"`
}//
// @Description: 根据入参的 时间段,查询符合条件的所有视频名称
// @param c:
// /video/query
//
func QueryVideoHandler(c *gin.Context) {// 用户入参var params QueryFields// 生成上下文环境及入参赋值ctx := app.NewGin(c, ¶ms)startTime := params.StartTimeendTime := params.EndTime//获取文件列表files := utils.VideoFileHandler(utils.VideoFileDir)//获取 符合时间段的视频名称resultFiles := GetGtStartTimeFiles(files, startTime, endTime)// 结果数据结构ctx.Success(resultFiles)
}//
// @Description: 获取 大于 开始时间,小于结束时间的文件
// @param FileNames:
// @param StartTime:
// @param EndTime:
// @return *[]string:
//
func GetGtStartTimeFiles(FileNames *[]string, StartTime, EndTime string) *[]string {var FilterFiles []string//类型转换StartTimeObj, err := gtime.StringToTime(StartTime, gtime.DateTimeFormat)if err != nil {panic(fmt.Sprintf("时间字符串类型转为Time类型失败,StartTime:%v,err:%v", StartTime, err))}EndTimeObj, err := gtime.StringToTime(EndTime, gtime.DateTimeFormat)if err != nil {panic(fmt.Sprintf("时间字符串类型转为Time类型失败,StartTime:%v,err:%v", StartTime, err))}//校验入参StartTime必须err := api_error.New(504, "开始日期必须小于结束日期")panic(err)}//循环遍历视频文件名称for i := range *FileNames {fileName := (*FileNames)[i]// 切分获取文件前缀(字符串格式的时间)fileTime := strings.Split(fileName, ".avi")[0]fileTimeObj, err := gtime.StringToTime(fileTime, gtime.DateTimeFormat)if err != nil {panic(fmt.Sprintf("时间字符串类型转为Time类型失败,fileTime:%v,err:%v", fileTime, err))}//取 StartTime之后 EndTime之前 的数据if StartTimeObj.Before(*fileTimeObj) && fileTimeObj.Before(*EndTimeObj) {FilterFiles = append(FilterFiles, fileName)}}return &FilterFiles
}
package video_actionimport ("fmt""github.com/gin-gonic/gin""os""path/filepath""strings""video_server/pkg/app""video_server/pkg/glog""video_server/pkg/utils"
)//
// @Description: 下载视频文件
// /video/download
// @param c:
//
func DownloadVideoHandler(c *gin.Context) {// 生成上下文环境及入参赋值fileName := c.Query("FileName")ctx := app.NewGin(c, nil)// 校验文件后缀 及 . 的个数 判断文件名是否合法if !strings.HasSuffix(fileName, ".avi") || len(strings.Split(fileName, ".")) > 2 {utils.StatusCodeHandler(ctx, 500, "文件名错误")return}// 判断文件是否存在// 拼接文件绝对路径pwd, err := os.Getwd()if err != nil {panic(fmt.Sprintf("获取当前路径失败 err:%v", err))}fileAbsPath := filepath.Join(pwd, utils.VideoFileDir, fileName)if _, err := os.Stat(fileAbsPath); os.IsNotExist(err) {utils.StatusCodeHandler(ctx, 500, "文件不存在")return}glog.Log.Info("文件检测结束")// 返回文件c.File(fileAbsPath)
}
FROM gocv/opencv:4.5.4 AS build
ENV GO111MODULE=on
ENV GOPROXY=https://goproxy.cn/,https://mirrors.aliyun.com/goproxy/,direct
WORKDIR /releaseADD . .
RUN go mod tidy && go mod vendor
RUN GOOS=linux CGO_ENABLED=1 GOARCH=amd64 go build -ldflags="-s -w" -installsuffix cgo -o video_server main.goFROM gocv/opencv:4.5.4ENV LANG C.UTF-8WORKDIR /dataCOPY --from=build /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
COPY --from=build /release/video_server .# 设置时区
RUN mkdir log video_file
RUN cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \echo 'Asia/Shanghai' > /etc/timezone \&& cp /etc/apt/sources.list /etc/apt/sources.list.bak \&& sed -i 's/deb.debian.org/mirrors.aliyun.com/g' /etc/apt/sources.list \&& apt-get update \&& apt-get install -y vimEXPOSE 7069
CMD ["./video_server"]
version: '3.8'services:# 启动video_server服务vs:container_name: vsimage: xxx/video_server:8f3f15b0restart: alwaysenvironment:LOC_CFG: /data/config/config.ymlvolumes:- ./config/video_server.yml:/data/config/config.ymlports:- "7069:7069"- "9090:9090/udp"# 启动emqx服务emqx:container_name: emqximage: emqx/emqx:latestrestart: alwaysports:- "1883:1883"- "8083:8083"- "8084:8084"- "8883:8883"- "18083:18083"
variables:PROJ_NAME: "video_server"PUBLIC_REGISTRY: "registry.cn-hangzhou.aliyuncs.com/busy_service/$PROJ_NAME:$CI_COMMIT_SHORT_SHA"PRIVATE_REGISTRY: "registry-vpc.cn-hangzhou.aliyuncs.com/busy_service/$PROJ_NAME:$CI_COMMIT_SHORT_SHA"stages:- build- deployjob_build:stage: buildscript:- docker login --username $REGISTRY_USER --password $REGISTRY_PWD registry.cn-hangzhou.aliyuncs.com- docker build -t $PROJ_NAME:latest .- docker tag $PROJ_NAME:latest $PUBLIC_REGISTRY- docker push $PUBLIC_REGISTRY- docker rmi $PUBLIC_REGISTRY $PROJ_NAME:latest# when: manualtags:- xxx-runner-build# 部署到服务器
job_deploy:stage: deploy# when: manualscript:# 进入到docker-compose.yml所在文件夹- cd /home/xxx/xxx- docker login --username $REGISTRY_USER --password $REGISTRY_PWD registry.cn-hangzhou.aliyuncs.com# 修改 版本名称# -i:源文件修改# s:替换# 此命令含义为 替换 busy_service/video_server:xxxx 为新版本号- sed -i "s!busy_service\/video_server:[0-9a-z]*!busy_service\/video_server:$CI_COMMIT_SHORT_SHA!" docker-compose.yml- docker-compose up -d vstags:- xxx-runner
相关链接
video_server此项目GitHub地址
GoCV MORE EXAMPLES