RKVision开发日志(九)

一、PyGObject

把昨天命令行rtsp+屏幕显示的命令行控制指令转换为python的形式运行,这里直接让GLM帮我完成了一份demo。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import sys
import gi

# 初始化 GStreamer
gi.require_version('Gst', '1.0')
gi.require_version('GstVideo', '1.0')
from gi.repository import Gst, GLib

def on_bus_message(bus, message, loop):
t = message.type
if t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"Error: {err}", file=sys.stderr)
print(f"Debug Info: {debug}", file=sys.stderr)
loop.quit()
elif t == Gst.MessageType.EOS:
print("End-Of-Stream reached.")
loop.quit()
elif t == Gst.MessageType.STATE_CHANGED:
old, new, pending = message.parse_state_changed()
if old == Gst.State.NULL and new == Gst.State.READY:
print("Pipeline is preparing...")
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
print(f"Warning: {err}", file=sys.stderr)
return True

def main():
# 1. 初始化 GStreamer
Gst.init(None)

# 2. 创建 Pipeline
pipeline = Gst.Pipeline()

# 3. 创建各个元素
# 源
src = Gst.ElementFactory.make("v4l2src", "src")
    converter = Gst.ElementFactory.make("videoconvert", "converter")
    # converter = Gst.ElementFactory.make("rkvideoconvert", "converter")
scaler = Gst.ElementFactory.make("videoscale", "scaler")
tee = Gst.ElementFactory.make("tee", "tee")

# 分支 1: 显示
queue_display = Gst.ElementFactory.make("queue", "queue_display")
# sink_display = Gst.ElementFactory.make("kmssink", "sink_display")
sink_display = Gst.ElementFactory.make("rkximagesink", "sink_display")

# 分支 2: RTSP 推流
queue_rtsp = Gst.ElementFactory.make("queue", "queue_rtsp")
encoder = Gst.ElementFactory.make("mpph264enc", "encoder")
parser = Gst.ElementFactory.make("h264parse", "parser")
sink_rtsp = Gst.ElementFactory.make("rtspclientsink", "sink_rtsp")

if not src or not converter or not scaler or not tee or \
not queue_display or not sink_display or \
not queue_rtsp or not encoder or not parser or not sink_rtsp:
print("创建元素失败,请检查 GStreamer 插件是否安装完整。", file=sys.stderr)
sys.exit(1)

# 4. 设置属性
src.set_property("device", "/dev/video0")
scaler.set_property("add-borders", True) # 开启黑边补齐
sink_display.set_property("driver-name", "rockchip")
sink_rtsp.set_property("location", "rtsp://127.0.0.1:8554/live")

# 5. 将所有元素添加到 Pipeline
pipeline.add(src)
pipeline.add(converter)
pipeline.add(scaler)
pipeline.add(tee)
pipeline.add(queue_display)
pipeline.add(sink_display)
pipeline.add(queue_rtsp)
pipeline.add(encoder)
pipeline.add(parser)
pipeline.add(sink_rtsp)

# 6. 设置 Caps (格式限制)
# 第一个 Caps: 限制摄像头输出格式
caps_src = Gst.Caps.from_string("video/x-raw,width=1632,height=1224,framerate=30/1")
# 第二个 Caps: 强制转换后的格式为 NV12 并补齐高度到 1232 (16对齐)
caps_scale = Gst.Caps.from_string("video/x-raw,width=1632,height=1232,format=NV12")

# 7. 链接元素

# 链接主链路: Source -> Converter -> Scaler -> Tee
if not src.link_filtered(converter, caps_src):
print("链接 src -> converter 失败", file=sys.stderr)
sys.exit(1)

if not converter.link(scaler):
print("链接 converter -> scaler 失败", file=sys.stderr)
sys.exit(1)

if not scaler.link_filtered(tee, caps_scale):
print("链接 scaler -> tee 失败", file=sys.stderr)
sys.exit(1)

# 链接分支 1 (显示): Tee -> Queue -> KmsSink
# tee.link() 会自动请求一个 src pad
if not tee.link(queue_display):
print("链接 tee -> queue_display 失败", file=sys.stderr)
sys.exit(1)

if not queue_display.link(sink_display):
print("链接 queue_display -> sink_display 失败", file=sys.stderr)
sys.exit(1)

# 链接分支 2 (RTSP): Tee -> Queue -> Encoder -> Parser -> Sink
if not tee.link(queue_rtsp):
print("链接 tee -> queue_rtsp 失败", file=sys.stderr)
sys.exit(1)

if not queue_rtsp.link(encoder):
print("链接 queue_rtsp -> encoder 失败", file=sys.stderr)
sys.exit(1)

if not encoder.link(parser):
print("链接 encoder -> parser 失败", file=sys.stderr)
sys.exit(1)

if not parser.link(sink_rtsp):
print("链接 parser -> sink_rtsp 失败", file=sys.stderr)
sys.exit(1)

# 8. 创建主循环并处理消息
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", on_bus_message, loop)

print("Starting pipeline...")
# 设置管道状态为 PLAYING
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
print("无法启动 Pipeline", file=sys.stderr)
loop.quit()

try:
loop.run()
except KeyboardInterrupt:
pass

# 清理资源
pipeline.set_state(Gst.State.NULL)

if __name__ == "__main__":
main()

打开rtsp服务器:

1
2
cd ~/ffmpeg
sudo ./mediamtx &

回到程序目录运行python文件

1
2
cd ~/Program/rkvision/test
python3 gstreamer_test.py

没什么问题,看一下占用,在30%左右,感觉甚至比昨天命令行的占用还要低一些

GLM提示用rkvideoconvert能进一步减少占用,但是由于RGA版本过高可能会导致BUG,可以试一下

1
2
3
4
5
6
7
root@lubancat:~/Program/rkvision/test# python3 gstreamer_test.py 
mpp[2244]: mpp_info: mpp version: b13eb80 author: Herman Chen 2026-01-08 feat[mpp_ext]: Move parsers to libmpp_ext.so
mpp[2244]: mpp_info: mpp version: b13eb80 author: Herman Chen 2026-01-08 feat[mpp_ext]: Move parsers to libmpp_ext.so
mpp[2244]: mpp_info: mpp version: b13eb80 author: Herman Chen 2026-01-08 feat[mpp_ext]: Move parsers to libmpp_ext.so
mpp[2244]: mpp: unable to create enc vp8 for soc rk3568 unsupported
mpp[2244]: mpp_info: mpp version: b13eb80 author: Herman Chen 2026-01-08 feat[mpp_ext]: Move parsers to libmpp_ext.so
创建元素失败,请检查 GStreamer 插件是否安装完整。

AI写的代码没给具体的定位信息,实际上是不认识rkvideoconvert这个模块导致的,查阅资料发现可以export 一个全局变量指定使用RGA硬件,试一下

1
export GST_VIDEO_CONVERT_USE_RGA=1

没有明显的占用减少,不中不中

抓了一下目前gstreamer所有的插件,还有一个rkximagesink可以用
这个插件主要是做终端显示用的
实际上我目测占用并没有减少多少,大概在20%到30%上下浮动

观察到启动时有报错:

1
2
3
4
librga fail to get driver version! Compatibility mode will be enabled.

376 im2d_rga_impl rga_version_below_minimun_range_user_driver(304): The driver may be compatible, but it is best to update the driver to version 1.2.4. You can try to update the SDK or update the <SDK>/kernel/drivers/video/rockchip/rga3 directory individually. current version: librga 1.9.2, driver .
rga_api version 1.9.2_[1]

依旧版本不匹配,要修一下这个bug。

要么动内核要么动rga,说实话不想动内核,这不方便移植,先看看大概是什么操作,一会再降级RGA

按照8. Linux内核的编译 — [野火]嵌入式Linux镜像构建与部署——基于LubanCat-RK系列板卡 文档这里给的操作看一下
要clone kernel,力竭了,老老实实降档librga了

不过经过一系列艰苦卓绝的配置,性能几乎基本没有任何变化….
我还很担心把内核玩崩了,因为刚刚重启的时候看门狗都timeout了…
不折腾了,就这样吧

二、解析

1. Gstreamer初始化

Gstreamer只是pygobject中的一个小部分,我们需要单独初始化这一部分

1
2
3
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
Gst.init(None)
2. 管道构建

Gst.Pipeline() 是一个“容器”,里面装元素并统一管理状态。

1
pipeline = Gst.Pipeline()
3. 管道元素

元素这个概念是gstreamer中比较特有的部分,不过我认为也比较好理解吧,就像工厂的流水线,要定义好每个部分是做什么的,并赋予其参数,视频流就会在流水线上被各个元素加工,最后输出到他应该去的地方。接下来介绍常用的管道元素及其用法:

1
2
elem = Gst.ElementFactory.make("元素名", "实例名") 
elem.set_property("属性名", 属性值)
  • 输入源
    管道的输入源可以分为相机、文件、网络视频流等。(videotestsrc、audiotestsrc、filesrc、v4l2src、rtspsrc、udpsrc)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 测试源,gst会生成测试图像
src = Gst.ElementFactory.make("videotestsrc", "testsrc")
src.set_property("pattern", 0) # 0=snow, 18=ball 等,具体见 gst-inspect-1.0 videotestsrc
src.set_property("num-buffers", 300) # 只输出 300 帧就自动 EOS,用于测试

# 相机源
src = Gst.ElementFactory.make("v4l2src", "src")
src.set_property("device", "/dev/video0")

# 文件源
src = Gst.ElementFactory.make("filesrc", "filesrc")
src.set_property("location", "/path/to/file.mp4") # 要读取的文件路径

# rtsp源
src = Gst.ElementFactory.make("rtspsrc", "rtsp")
src.set_property("location", "rtsp://user:pass@192.168.1.10:554/stream1")
src.set_property("latency", 2000) # 延迟(毫秒),视网络情况调整
  • 图像转换
    常用的操作一般是格式、分辨率、缩放等操作
1
2
3
4
5
6
7
8
9
10
11
conv = Gst.ElementFactory.make("videoconvert", "vconv")
# 一般不需要额外属性,就放在需要改变 raw 视频格式的地方

scale = Gst.ElementFactory.make("videoscale", "vscale")
# 一般配合 caps 或 capsfilter 来设定输出分辨率,不常用属性直接设置

# 更常用的协商方式
capsfilter = Gst.ElementFactory.make("capsfilter", "caps_src")
capsfilter.set_property("caps", Gst.Caps.from_string(
"video/x-raw,format=NV12,width=1632,height=1224,framerate=30/1"
))
  • 图像编解码
    将视频流编码为h264/h265方便保存/推流,将文件格式的解码等功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# h264 encoder 编码器
enc = Gst.ElementFactory.make("x264enc", "h264enc")
enc.set_property("tune", "zerolatency") # 实时性优化
enc.set_property("speed-preset", "superfast") # 编码速度 vs 压缩率
enc.set_property("bitrate", 2000) # 码率(kbps)

# 调用瑞芯微的mpp进行硬件编码
enc = Gst.ElementFactory.make("mpph264enc", "encoder")

# 解码器
dec = Gst.ElementFactory.make("decodebin", "decoder")
# decodebin 会自动根据输入 caps 选择对应的 demuxer 和 decoder

# mp4封装
mux = Gst.ElementFactory.make("mp4mux", "mp4mux")
mux.set_property("fragment-duration", 2000) # 分片时长(ms),可选
  • 分路操作
    将一路视频管道复制几份送入下游管道,防止整体阻塞需要搭配queue使用
    这里多提一嘴,视频流就像水管,如果下游某个管道堵住了,可能会导致上游和其他支路一起堵住
    所以在分流之后第一件事就是接一个queue,在下游堵住的时候告诉上游直接丢弃
1
2
3
4
5
6
7
8
9
10
tee = Gst.ElementFactory.make("tee", "tee")
# 不必专门设属性,动态请求 request pad:tee.get_request_pad("src_%u")
queue = Gst.ElementFactory.make("queue", "queue")
queue.set_property("max-size-buffers", 500) # 最大缓存 buffer 数量
queue.set_property("max-size-bytes", 10 * 1024 * 1024) # 10 MB
queue.set_property("max-size-time", 2 * Gst.SECOND) # 最多缓存 2 秒
queue.set_property("leaky", 0) # 0=不丢帧,1=上游丢,2=下游丢

# 链接分流器tee和队列:

  • 输出
    gstreamer处理后,接一个元素用于输出,输出类型的元素都叫sink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
sink = Gst.ElementFactory.make("autovideosink", "vsink")
# 通常只需这样,就能在当前平台选择合适的视频输出

# 调用瑞芯微SDK优化显示输出
sink = Gst.ElementFactory.make("rkximagesink", "sink_display")

# 写入文件中
sink = Gst.ElementFactory.make("filesink", "filesink")
sink.set_property("location", "/path/to/output.mp4")
sink.set_property("sync", True) # 是否按时钟同步写入

# 测试输出,假输出
sink = Gst.ElementFactory.make("fakesink", "fakesink")
sink.set_property("silent", False) # 是否打印一些信息
sink.set_property("dump", True) # 打印接收到的 buffer 信息
sink.set_property("signal-handoffs", True) # 触发 handoff 信号方便调试

# 输出给其他app,可以输出给python程序
sink = Gst.ElementFactory.make("appsink", "appsink")
sink.set_property("emit-signals", True) # 通过“new-sample”信号拿到帧
sink.set_property("max-buffers", 10) # 应用侧最多缓存多少帧
sink.set_property("sync", False) # 不受时钟阻塞(实时处理场景)
  • 其他

gstreamer还提供了记录fps,在图像上绘制等工具,功能很丰富,可自行查询参考,就不赘述了

1
2
3
4
5
# 查看所有元素插件
gst-inspect-1.0

# 查看单个插件的信息(以shmsink为例)
gst-inspect-1.0 shmsink
4. BUS信息

为了兼容性,视频流管道中流动的都是视频数据,而各个插件会产生各自的日志输出,这部分显然不能混在管道中流动,gstreamer的做法是使用一个消息管道,其中包括:

  • 错误消息(ERROR)
  • 状态改变(STATE_CHANGED)
  • 流结束(EOS)
  • 警告(WARNING)
  • 信息(INFO)
  • 元素消息(ELEMENT,如标签、解码信息等)

具体这部分结合日志统一输出就好

三、DEMO

接下来自己手写一个管线,实现:

1
2
3
4
5
6
7
8
9
10
11
Camera(NV12,1632x1224@30fps)
|
v
tee
|
+--> queue -> videoconvert BGR -> appsink(algo.py)
|
+--> queue -> mpph264enc -> filesink -> output.mp4
|
+--> queue -> filesink 544x408@10fps -> mpph264enc MJPEG -> appsink(web.py)

参考资料

RKVision开发日志(九)
http://blog.mingxuan.xin/2026/01/15/20260115/
作者
Obscure
发布于
2026年1月15日
许可协议