RKVision开发日志(十三)

现在的方案中,以开启相机为例,我要在/api/camera/start上发送{“action”: “start”},然后在相机进程if action == “start”:,然后再去执行实际的start代码,这至少需要维护两对字典,可以说是非常的丑陋了

ZMQ确实是更好的通信方案,今天白天有点事情,今晚突击修改一下吧,希望在明天之前修改好并演示

今日任务安排:

  1. 在双端安装ZMQ库,尝试引用查看是否报错
  2. 让AI写一个最小实现的脚本测试功能
  3. 实际部署测试
  4. 实现AppBin,完成跨进程视频流获取

一、ZMQ

ZMQ本体就类似我昨天写的双向queue,只是封装了一下。首先写一个基类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import zmq
import json
import traceback

class RemoteError(RuntimeError):
pass

class RpcServer:
def __init__(self, address: str):
self.ctx = zmq.Context.instance()
self.sock = self.ctx.socket(zmq.REP)
self.sock.bind(address)

# 方法注册表
self.methods = {
# __introspect__ 方法, 返回所有方法名
"__introspect__": lambda: sorted([k for k in self.methods.keys() if not k.startswith("_")]),
}

def run(self) -> None:
"""
处理RPC请求, 在循环中调用该方法
"""
req = self.sock.recv_json()
try:
method = req["method"]
args = req.get("args", [])
kwargs = req.get("kwargs", {})
fn = self.methods[method]
result = fn(*args, **kwargs)
self.sock.send_json({
"ok": True,
"result": result
})
except Exception as e:
self.sock.send_json({
"ok": False,
"error_type": type(e).__name__,
"error_message": str(e),
"trace": traceback.format_exc(),
})

def close(self) -> None:
self.sock.close()
self.ctx.destroy()

class RpcClient:
def __init__(self, address: str, timeout:int = 1000):
self.ctx = zmq.Context.instance()
self.sock = self.ctx.socket(zmq.REQ)
self.sock.connect(address)


# 超时:避免 service 卡死
self.sock.setsockopt(zmq.RCVTIMEO, timeout)
self.sock.setsockopt(zmq.SNDTIMEO, timeout)

def call(self, method: str, *args, **kwargs):
self.sock.send_json({"method": method, "args": list(args), "kwargs": kwargs})
resp = self.sock.recv_json()
if resp.get("ok"):
return resp.get("result")
raise RemoteError(f"{resp.get('error_type')}: {resp.get('error_message')}")

def __getattr__(self, name: str):
# 仅在属性不存在时触发
def _method(*args, **kwargs):
return self.call(name, *args, **kwargs)
return _method

def close(self) -> None:
self.sock.close()
self.ctx.destroy()

第二步,写相机进程的rpc服务器和客户端

首先,暂时手动维护两个字典:

1
2
3
4
5
6
7
8
9
10
11
12
13
class CameraProcess(BaseProcess):
self.methods = {
"start": self.pipeline.start,
"stop": self.pipeline.stop,
"control_branch": self.pipeline.control_branch,
"set_display": lambda enabled: self.pipeline.control_branch("displaybin", enabled),
}
class CameraRpcClient(RpcClient):
# 绑定RPC方法
def start(self): return self.call("start")
def stop(self): return self.call("stop")
def set_display(self, enabled: bool): return self.call("set_display", enabled)
def control_branch(self, name: str, enabled: bool): return self.call("control_branch", name, enabled)

先手动绑定,之后有事件再修改吧

最后,在进程管理器manager中初始化RPC服务,并传递给相机进程和服务中,这部分很简单

1
2
3
4
5
6
7
8
9
10
11
# 初始化IPC
CAMERA_SOCKET = f"ipc:///tmp/camera.sock"

camera_rpc = RpcServer(CAMERA_SOCKET)
self.camera_client = CameraRpcClient(CAMERA_SOCKET)

# 初始化相机进程
self.camera_process = CameraProcess(self.camera_config,
self.stop_event,
producer,
camera_rpc)

二、部署测试

报了个错误,是因为子进程实例化的地方错了,重新在run方法中实例化了一下,没有任何问题,service层也非常简洁了,完美

上板测试的时候发现,极小概率会导致发出未手动导致的卡死,报错:Operation cannot be accomplished in current state

后续要捕获超时异常,并自动重启RPC,另外回头想办法把.get的报错搞掉,看着不太舒服

测得空载时占用0%,开相机时程序占用5%,主要是3A算法占用较高,开启相机+画面预览的占用在10%左右


三、AppBin

几乎没什么好说的,复制粘贴之前的displaybin稍微改改就可以了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class AppSinkBin(Gst.Bin):
"""
发送到appsink: sink ghost -> appsink
"""
def __init__(self,
name : str):
super().__init__(name=name)
self._build()

def _build(self) -> None:
# videoconvert: 负责把 NV12/I420/etc 转成 BGR
self.conv = Gst.ElementFactory.make("videoconvert", "conv")
# capsfilter: 强制输出 BGR(可附带分辨率/帧率)
self.cap = Gst.ElementFactory.make("capsfilter", "cap")
# appsink: 给 python 拉帧
self.sink = Gst.ElementFactory.make("appsink", "sink")

if not all([self.conv, self.cap, self.sink]):
raise RuntimeError("AppSinkBin: failed to create elements")

caps_str = f"video/x-raw,format=BGR"
self.cap.set_property("caps", Gst.Caps.from_string(caps_str))

self.sink.set_property("emit-signals", True) # 允许 new-sample 信号
self.sink.set_property("sync", False) # 关闭时间同步
self.sink.set_property("drop", bool(self.drop))
self.add(self.conv)
self.add(self.cap)
self.add(self.sink)

if not self.conv.link(self.cap):
raise RuntimeError(f"AppSinkBin: conv->cap link failed, {caps_str}")
if not self.cap.link(self.sink):
raise RuntimeError("AppSinkBin: cap->appsink link failed")

# 暴露给外部链接的入口 pad
ghost_sink = Gst.GhostPad.new("sink", self.conv.get_static_pad("sink"))
self.add_pad(ghost_sink)

然后主管道中添加分支:

1
2
self.appsink_bin = AppSinkBin(name = "appsinkbin")
self._add_branch(self.appsink_bin, drop=False)

本地测试观察到视频变慢了,可能有点问题,先不管他,设备测试没有问题,实时性很好,CPU占用大概到15%左右

时间有点晚了,天都要亮了,python拿appsink似乎有点麻烦,要再研究一下


RKVision开发日志(十三)
http://blog.mingxuan.xin/2026/01/22/20260122/
作者
Obscure
发布于
2026年1月22日
许可协议