物联网视频监控服务(三)-监控服务端 篇
创始人
2025-05-28 03:09:29
0

概述

此篇文章主要描述 监控服务端(video_server) 开发部分;

功能点

  • 接收视频上传功能;
  • 利用opencv动态检测 视频帧是否变化,标记变更部分及显示当前时间;
  • 根据被监测环境是否变化(180s内无变化降低图片发送频率,有变化立刻恢复图片发送频率),将 动态调节图片发送频率 的指令消息 传给emqx;
  • 提供视频存储功能;
  • 实时视频帧通过udp协议转发给客户端功能实现直播功能;
  • 视频下载功能;

使用技术

  • 语言:go
  • 软件或框架:gin(go语言的web框架)+emqx(MQTT协议的实现)+gocv(opencv的go语言实现)
  • 应用层协议:MQTT,HTTP
  • 传输层协议:UDP,TCP

系统设计

UDP直播 数据流概览

在这里插入图片描述

MQTT消息处理 数据流概览

在这里插入图片描述

项目包含服务概览

  • 此项目共启动2个服务,2个端口,具体如下
  • UDP服务: 负责视频帧的 接收及转发,利用gocv监测动态变化,视频存储,图片发送频率指令传输给emqx 功能;
  • gin web端服务: 负责 历史视频下载功能;

项目涉及第三方软件服务概览

MQTT协议

  • 一种基于发布订阅模式的通信协议,是应用层协议,基于TCP/IP协议;
  • 因其实现精简,能耗低,传输数据量小,考虑网络不稳定因素,及 提供服务质量(QoS:保证消息不丢失) 常用于物联网,安卓推送等情景中;
  • Broker:处理,存储 消息的服务,如emqx
  • Topic: 消息的类型,订阅者会订阅到指定的Topic,才能收到此Topic下的消息内容(payload)
  • PayLoad:消息的内容

emqx

  • 使用Erlang语言开发,是MQTT协议的一种实现软件,其还支持MQTT-SN、CoAP、 LwM2M、LoRaWAN 和 WebSocket等协议;
  • 其支持不同平台及docker部署安装方式
  • 有完善的 web端管理页面功能

UDP服务设计概览

风险分析

  • 涉及功能太多
  • gocv动态监测 需要大量计算
  • 网络不稳定
  • 如上这些问题可能造成 直播卡顿;视频帧处理速度小于硬件发送速度造成数据丢失;发送频率指令到emqx延迟; 等等问题;

解决方案

  • 使用goroutine(go协程),将同步改为异步,保证每个协程 只处理单一功能,不阻塞整个服务;
  • 使用channel作为 goroutine 间通信方式,且channel必须有容量限制,超过容量限制 就舍去数据(数据基本是视屏帧数据及emxq指令数据,丢失不敏感),保证不阻塞上游服务;

涉及channel及功能如下

  • ImgReceiveChannel:从摄像头的UDP客户端接收到的待处理的图片切片channel,容量为100,超过处理不了就丢弃
  • ImgSendChannel:将图片数据转发给用户客户端进行直播的 图片切片channel,容量为20,超过处理不了就丢弃
  • MqttMessagesChannel:将消息发送到mqtt服务器的channel,容量为2,超过处理不了就丢弃

涉及goroutine及功能如下

  • udp_service.UDPReceive(ImgReceiveChannel, ImgSendChannel):接收UDP数据,并发送给ImgReceiveChannel,需要直播时,再将数据发送给ImgSendChannel,当channel缓存满时,丢弃数据;
  • udp_service.UDPSend(ImgSendChannel):发送UDP数据给客户端(直播使用),当channel缓存满时,丢弃数据;
  • Convert2IMG(ImgReceiveChannel, MqttMessagesChannel):处理图片的协程,用opencv进行动态变化比对,将变化的视频帧存入视频文件;根据某时间段内 视频帧有无变化,动态变更esp32-cam发送视频帧频率;
  • mqtt_service.MqttPublish(MqttMessagesChannel):发布 视频帧频率 指令消息到emqx的topic;

UDP服务主要代码部分

VideoHandler入口函数

//
//  @Description: 处理视频帧
//
func VideoHandler() {//从摄像头的UDP客户端接收到的待处理的图片切片channel,容量为100,超过处理不了就丢弃ImgReceiveChannel := make(chan []byte, 100)//将图片数据转发给用户客户端进行直播的 图片切片channel,容量为20,超过处理不了就丢弃ImgSendChannel := make(chan []byte, 20)// 将消息发送到mqtt服务器的channel,容量为2MqttMessagesChannel := make(chan []string, 2)//udp服务接收及发送数据go udp_service.UDPReceive(ImgReceiveChannel, ImgSendChannel)go udp_service.UDPSend(ImgSendChannel)//发布消息到mqtt协议服务的broker(emqx)go mqtt_service.MqttPublish(MqttMessagesChannel)//处理视频帧数据Convert2IMG(ImgReceiveChannel, MqttMessagesChannel)
}

udp_service.UDPReceive函数


var SendToClient bool
var SendToClientAddr *net.UDPAddr//
//  @Description: 接收UDP数据,并发送给ImgReceiveChannel,需要直播时,再将数据发送给ImgSendChannel,当channel缓存满时,丢弃数据
//  @param ImgReceiveChannel: 从摄像头的UDP客户端接收到的待处理的图片切片channel,容量为100,超过处理不了就丢弃
//  @param ImgSendChannel: 将图片数据转发给用户客户端进行直播的 图片切片channel,容量为20,超过处理不了就丢弃
//
func UDPReceive(ImgReceiveChannel chan<- []byte, ImgSendChannel chan<- []byte) {SendToClient = falsefor {//定义一个切片sliceData := make([]byte, 65500)//将读取的数据存到数组中n, addr, err := UDPListen.ReadFromUDP(sliceData)if err != nil {glog.Log.Error(fmt.Sprintf("读取UDP数据失败,err:%v", err))continue}if n == 0 || n > 65500 {continue}//截取前10个字符串messageType := string(sliceData[:10])imgData := sliceData[10:]if messageType == "clientPlay" {glog.Log.Info("接收到客户端的直播请求")SendToClient = trueSendToClientAddr = addr} else if messageType == "clientStop" {SendToClient = false} else if messageType == "cameraSend" {glog.Log.Info("接收到图片数据")select {case ImgReceiveChannel <- imgData:glog.Log.Info("发送数据到图片处理channel")default:glog.Log.Info("发送数据到图片处理channel阻塞,不发送")}}if SendToClient && messageType == "cameraSend" {select {case ImgSendChannel <- imgData:glog.Log.Info("发送数据到转发channel")default:glog.Log.Info("发送数据到转发channel阻塞,不发送")}}}
}

UDPSend函数

//
//  @Description: 发送UDP数据给客户端(直播使用),当channel缓存满时,丢弃数据
//  @param ImgSendChannel:
//
func UDPSend(ImgSendChannel <-chan []byte) {for {Img, ok := <-ImgSendChannelif !ok {glog.Log.Info("发送数据到图片处理channel关闭")break}glog.Log.Info("发送直播数据到客户端")_, err := UDPListen.WriteToUDP(Img, SendToClientAddr)if err != nil {panic(fmt.Sprintf("转发UDP数据失败 err:%v", err))}}
}

Convert2IMG函数


import ("fmt""gocv.io/x/gocv""image""image/color""time""video_server/pkg/glog""video_server/pkg/gtime""video_server/pkg/mqtt_service""video_server/pkg/udp_service""video_server/pkg/utils"
)const MqttTopic string = "camera_frq"
const TimeCycle time.Duration = 60
const NoDiffTimesCount int = 3//
//  @Description: 处理图片的协程,功能如下:
// 		用opencv进行动态变化比对,将变化的视频帧存入视频文件;
//		根据某时间段内 视频帧有无变化,动态变更esp32-cam发送视频帧频率
//  @param ImgChannel:从摄像头的UDP客户端接收到的待处理的图片切片channel,容量为100,超过处理不了就丢弃
//  @param MqttMessagesChannel:将消息发送到mqtt服务器的channel,容量为2
//
func Convert2IMG(ImgChannel <-chan []byte, MqttMessagesChannel chan<- []string) {redColor := color.RGBA{255, 0, 0, 0}es := gocv.GetStructuringElement(gocv.MorphEllipse, image.Point{9, 4})//摄像头发送图片频率 0:高 1:低sendImgFrequency := utils.SendImgFrequencyHigh// 3个固定时间段内 图片都相同,表示环境无变化,通过mqtt协议通知cam降低发送帧率(节约资源[电力,带宽]), 如果 后续 检测到图片变化,则再次通知其恢复频率;noDiffTimes := 0//一直死循环for {var writer *gocv.VideoWriter//定义一个运行的时间周期timeCycle := gtime.TimeCycle(TimeCycle)stop := false//基准图片,作用为 检测视频帧是否变化的基准图片basicImg := gocv.NewMat()//存入到磁盘的视频文件writerfileName := fmt.Sprintf(utils.VideoFileFmt, gtime.GetCurrentTime())//视频帧之间是否不同(检测监控环境是否有变化)isDiffImg := false//此for循环保证timeCycle的channel一直运行for {select {case _ = <-timeCycle:glog.Log.Debug("执行时间周期耗尽")stop = truedefault://开始处理 视频帧tempImg, ok := <-ImgChannelif !ok {glog.Log.Info("ImgChannel关闭,停止循环")break}//从数组中读取 前n个有效长度的数据,也就是 一张图片不会超过10万个byte,而n的值就是 读取图片的byte个数img, err := gocv.IMDecode(tempImg, gocv.IMReadColor)if err != nil {glog.Log.Error(fmt.Sprintf("解析图片数据失败,err:%v", err))break}//当basicImg没有赋值时,对其进行赋值if basicImg.Empty() {basicImg = img//图片转为黑白色gocv.CvtColor(basicImg, &basicImg, gocv.ColorBGRToGray)// 高斯模糊,image.Point的值 越大,则越模糊,注意 值除以2余数必须为1gocv.GaussianBlur(basicImg, &basicImg, image.Point{21, 21}, 0, 0, gocv.BorderDefault)writer, err = gocv.VideoWriterFile(fileName, "MJPG", 10, basicImg.Cols(), basicImg.Rows(), true)if err != nil {fmt.Printf("error opening video writer device: %v\n", fileName)return}continue}//进行图片处理//每隔固定时间 获取基准帧数据//将现有图片进行比对,不同就存为mp4//若 3个固定时间段内 图片都相同,表示环境无变化,通过mqtt协议通知cam降低发送帧率(节约资源[电力,带宽]), 如果 后续 检测到图片变化,则再次通知其恢复频率;//	gocv比对图片//图片转为黑白色gray_frame := gocv.NewMat()gocv.CvtColor(img, &gray_frame, gocv.ColorBGRToGray)// 高斯模糊,image.Point的值 越大,则越模糊,注意 值除以2余数必须为1gocv.GaussianBlur(gray_frame, &gray_frame, image.Point{21, 21}, 0, 0, gocv.BorderDefault)//对比2个图片不同点diffImg := gocv.NewMat()gocv.AbsDiff(basicImg, gray_frame, &diffImg)gocv.Threshold(diffImg, &diffImg, 25, 255, gocv.ThresholdBinary)gocv.Dilate(diffImg, &diffImg, es)cnts := gocv.FindContours(diffImg, gocv.RetrievalExternal, gocv.ChainApproxSimple)for i := 0; i < cnts.Size(); i++ {timeCycle := cnts.At(i)if gocv.ContourArea(timeCycle) < 1500 {continue}isDiffImg = truegocv.Rectangle(&img, gocv.BoundingRect(timeCycle), redColor, 1)}//if !isDiffImg {//	glog.Log.Info("没有不同点,跳过此视屏帧")//	continue//}//检测到不同点if isDiffImg && sendImgFrequency != utils.SendImgFrequencyHigh {glog.Log.Debug("检测到不同点,恢复发送图片频率")sendImgFrequency = utils.SendImgFrequencyHighSendMqttMessageToChannel(MqttMessagesChannel, "0")}gocv.PutText(&img, gtime.GetCurrentTime(), image.Point{10, 20}, gocv.FontHersheyComplexSmall, 1, redColor, 1)//存储到mp4中err = writer.Write(img)if err != nil {panic(fmt.Sprintf("视频写入磁盘失败 %v", err))}glog.Log.Debug("图片解析成功,开始展示")}//时间周期已到,收尾工作if stop {glog.Log.Debug("停止执行任务,时间耗尽")//视频writer收尾,保证视频文件保存正常err := writer.Close()if err != nil {glog.Log.Error("关闭视频失败....")}glog.Log.Debug("已经关闭视频")break}}if !isDiffImg {noDiffTimes += 1}//降低摄像头发送频率if noDiffTimes > NoDiffTimesCount {if sendImgFrequency != utils.SendImgFrequencyLow {glog.Log.Debug("检测到环境最近无变化,调低摄像头发送频率")sendImgFrequency = utils.SendImgFrequencyLowSendMqttMessageToChannel(MqttMessagesChannel, "1")}noDiffTimes = 0}}
}//
//  @Description:
//  @param MqttMessagesChannel: mqtt消息发送到channel中
//  @param message:
//
func SendMqttMessageToChannel(MqttMessagesChannel chan<- []string, message string) {select {case MqttMessagesChannel <- []string{MqttTopic, message}:glog.Log.Info("发送消息到mqtt成功")default:glog.Log.Info("mqtt channel缓存已满,丢弃此消息")}
}

MqttPublish函数

//
//  @Description: 发布消息到topic
//  @param client:
//  @param topic:
//  @param message:
//
func MqttPublish(MqttMessagesChannel <-chan []string) {for {messages, ok := <-MqttMessagesChannelif !ok {glog.Log.Info("MqttMessagesChannel channel关闭,此mqtt不再发布消息")break}topic := messages[0]message := messages[1]fmt.Printf("开始发布消息:topic:%v,message:%v\n", topic, message)token := (*MqttClient).Publish(topic, 2, true, message)if token.Wait() && token.Error() != nil {panic(fmt.Sprintf("发布消息失败:topic:%v,message:%v,err:%v", topic, message, token.Error()))}fmt.Printf("发布消息成功:topic:%v,message:%v\n", topic, message)}
}

gin web端服务主要代码部分

QueryVideoHandler函数

package video_actionimport ("fmt""github.com/gin-gonic/gin""strings""video_server/pkg/api_error""video_server/pkg/app""video_server/pkg/gtime""video_server/pkg/utils"
)//
//  QueryFields
//  @Description: 入参结构体
//
type QueryFields struct {StartTime string `json:"start_time" binding:"required"`EndTime   string `json:"end_time" binding:"required"`
}//
//  @Description: 根据入参的 时间段,查询符合条件的所有视频名称
//  @param c:
//  /video/query
//
func QueryVideoHandler(c *gin.Context) {// 用户入参var params QueryFields// 生成上下文环境及入参赋值ctx := app.NewGin(c, ¶ms)startTime := params.StartTimeendTime := params.EndTime//获取文件列表files := utils.VideoFileHandler(utils.VideoFileDir)//获取 符合时间段的视频名称resultFiles := GetGtStartTimeFiles(files, startTime, endTime)// 结果数据结构ctx.Success(resultFiles)
}//
//  @Description: 获取 大于 开始时间,小于结束时间的文件
//  @param FileNames:
//  @param StartTime:
//  @param EndTime:
//  @return *[]string:
//
func GetGtStartTimeFiles(FileNames *[]string, StartTime, EndTime string) *[]string {var FilterFiles []string//类型转换StartTimeObj, err := gtime.StringToTime(StartTime, gtime.DateTimeFormat)if err != nil {panic(fmt.Sprintf("时间字符串类型转为Time类型失败,StartTime:%v,err:%v", StartTime, err))}EndTimeObj, err := gtime.StringToTime(EndTime, gtime.DateTimeFormat)if err != nil {panic(fmt.Sprintf("时间字符串类型转为Time类型失败,StartTime:%v,err:%v", StartTime, err))}//校验入参StartTime必须err := api_error.New(504, "开始日期必须小于结束日期")panic(err)}//循环遍历视频文件名称for i := range *FileNames {fileName := (*FileNames)[i]//	切分获取文件前缀(字符串格式的时间)fileTime := strings.Split(fileName, ".avi")[0]fileTimeObj, err := gtime.StringToTime(fileTime, gtime.DateTimeFormat)if err != nil {panic(fmt.Sprintf("时间字符串类型转为Time类型失败,fileTime:%v,err:%v", fileTime, err))}//取 StartTime之后 EndTime之前 的数据if StartTimeObj.Before(*fileTimeObj) && fileTimeObj.Before(*EndTimeObj) {FilterFiles = append(FilterFiles, fileName)}}return &FilterFiles
}

DownloadVideoHandler函数

package video_actionimport ("fmt""github.com/gin-gonic/gin""os""path/filepath""strings""video_server/pkg/app""video_server/pkg/glog""video_server/pkg/utils"
)//
//  @Description: 下载视频文件
// /video/download
//  @param c:
//
func DownloadVideoHandler(c *gin.Context) {// 生成上下文环境及入参赋值fileName := c.Query("FileName")ctx := app.NewGin(c, nil)// 校验文件后缀 及 . 的个数 判断文件名是否合法if !strings.HasSuffix(fileName, ".avi") || len(strings.Split(fileName, ".")) > 2 {utils.StatusCodeHandler(ctx, 500, "文件名错误")return}// 判断文件是否存在// 拼接文件绝对路径pwd, err := os.Getwd()if err != nil {panic(fmt.Sprintf("获取当前路径失败 err:%v", err))}fileAbsPath := filepath.Join(pwd, utils.VideoFileDir, fileName)if _, err := os.Stat(fileAbsPath); os.IsNotExist(err) {utils.StatusCodeHandler(ctx, 500, "文件不存在")return}glog.Log.Info("文件检测结束")// 返回文件c.File(fileAbsPath)
}

CI/CD及docker部分

Dockerfile文件

FROM gocv/opencv:4.5.4 AS build
ENV GO111MODULE=on
ENV GOPROXY=https://goproxy.cn/,https://mirrors.aliyun.com/goproxy/,direct
WORKDIR  /releaseADD . .
RUN go mod tidy && go mod vendor
RUN GOOS=linux CGO_ENABLED=1 GOARCH=amd64 go build -ldflags="-s -w" -installsuffix cgo -o video_server main.goFROM gocv/opencv:4.5.4ENV LANG C.UTF-8WORKDIR /dataCOPY --from=build /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
COPY --from=build /release/video_server .# 设置时区
RUN mkdir log video_file
RUN cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \echo 'Asia/Shanghai' > /etc/timezone \&& cp /etc/apt/sources.list /etc/apt/sources.list.bak \&& sed -i 's/deb.debian.org/mirrors.aliyun.com/g' /etc/apt/sources.list \&& apt-get update \&& apt-get install -y vimEXPOSE 7069
CMD ["./video_server"]

docker-compose.yml文件

version: '3.8'services:# 启动video_server服务vs:container_name: vsimage: xxx/video_server:8f3f15b0restart: alwaysenvironment:LOC_CFG: /data/config/config.ymlvolumes:- ./config/video_server.yml:/data/config/config.ymlports:- "7069:7069"- "9090:9090/udp"# 启动emqx服务emqx:container_name: emqximage: emqx/emqx:latestrestart: alwaysports:- "1883:1883"- "8083:8083"- "8084:8084"- "8883:8883"- "18083:18083"

CI/CD文件(.gitlab-ci.yml)

variables:PROJ_NAME: "video_server"PUBLIC_REGISTRY: "registry.cn-hangzhou.aliyuncs.com/busy_service/$PROJ_NAME:$CI_COMMIT_SHORT_SHA"PRIVATE_REGISTRY: "registry-vpc.cn-hangzhou.aliyuncs.com/busy_service/$PROJ_NAME:$CI_COMMIT_SHORT_SHA"stages:- build- deployjob_build:stage: buildscript:- docker login --username $REGISTRY_USER --password $REGISTRY_PWD registry.cn-hangzhou.aliyuncs.com- docker build -t $PROJ_NAME:latest .- docker tag $PROJ_NAME:latest $PUBLIC_REGISTRY- docker push $PUBLIC_REGISTRY- docker rmi $PUBLIC_REGISTRY $PROJ_NAME:latest#  when: manualtags:- xxx-runner-build# 部署到服务器
job_deploy:stage: deploy#  when: manualscript:# 进入到docker-compose.yml所在文件夹- cd /home/xxx/xxx- docker login --username $REGISTRY_USER --password $REGISTRY_PWD registry.cn-hangzhou.aliyuncs.com# 修改 版本名称# -i:源文件修改# s:替换# 此命令含义为 替换 busy_service/video_server:xxxx 为新版本号- sed -i "s!busy_service\/video_server:[0-9a-z]*!busy_service\/video_server:$CI_COMMIT_SHORT_SHA!" docker-compose.yml- docker-compose up -d vstags:- xxx-runner

相关链接
video_server此项目GitHub地址
GoCV MORE EXAMPLES

相关内容

热门资讯

Ubuntu系统搭建 一、创建环境常见问题1.1 windows11下打开虚拟机蓝屏问题参考这篇文章,控制面...
性能优化搞得好,Tomcat少... Tomcat基本使用 什么是Web服务器 web服务器的定义 其实并没有标准定义,...
2023还有人不知道kuber... 文章目录Kubernetes(K8s)一、Openstack&VM1、**认识虚拟化****1.1*...
笨鸟学数据结构(绪论) 数据结构的定义按某种逻辑关系组织起来的一批数据,按一定的映象方式把它存放在计算机的存储...
不使用cocoapods-ar... 不使用cocoapods-art插件情况下与jfrog协作原理下载索引创建git仓库或者更新git仓...
微机原理 || push p... 考试真的考了push和pop ,那个加减到底是什么? PUSH 源   ...
使用Spring Boot和C... 原理 Spring Boot是一个基于Spring框架的快速开发应用程序的框架,其提供...
python数据类型常见操作 目录 一、python常见的赋值方式 1.交互性赋值方式 2.连续性赋值方式 3.单独赋值方式 二...
系统架构:经典三层架构 引言 经典三层架构是分层架构中最原始最典型的分层模式,其他分层架构都是其变种或扩展&#...
c++ 流 stream Text Stream: 有解析(parse)和格式化&#...
【springboot】web... 5、视图解析与模板引擎 视图解析:SpringBoot默认不支持 JSP,...
【Java注释】如何自定义注解... 一,如何自定义注解 1.1 在编译时进行格式检查(JDK内置的三个基本注解) ...
Redis案例实战_微信抢红包 目录需求分析架构设计编码实现拓展 需求分析 首先想到发红包的流程 1.发红包 2.抢红包 3.记录红...
关于朋友的思考 关于朋友的思考 朋友就是你高兴时想见的人,烦恼时想找的人,得到对方帮助...
Cobalt Strike--... 获取凭证和哈希      要dump哈希,通过 [beacon] → Access →...
第二章 运算方法和运算器 引入:1. 运算器的运算功能 计算机能够进行的运算包括:算术运算和逻辑运...
DevData Talks 直... 📊本期分享 本期 DevData Talks 邀请到了微众银行研发效能负责人余伟老师...
postgresql基本操作与... postgresql基本操作与基本对象 postgresql是一个C/S架构的大型软件࿰...
【洛谷 P1028】[NOIP... [NOIP2001 普及组] 数的计算 题目描述 给出自然数 nnn,要求按如下方式构...
实验一 Java Web 入门 一、实验目标: 1、了解并学会配置MyEclipse集成开发环境,学会在...