现在的方案中,以开启相机为例,我要在/api/camera/start上发送{“action”: “start”},然后在相机进程if action == “start”:,然后再去执行实际的start代码,这至少需要维护两对字典,可以说是非常的丑陋了
ZMQ确实是更好的通信方案,今天白天有点事情,今晚突击修改一下吧,希望在明天之前修改好并演示
今日任务安排:
- 在双端安装ZMQ库,尝试引用查看是否报错
- 让AI写一个最小实现的脚本测试功能
- 实际部署测试
- 实现AppBin,完成跨进程视频流获取
一、ZMQ
ZMQ本体就类似我昨天写的双向queue,只是封装了一下。首先写一个基类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| import zmq import json import traceback
class RemoteError(RuntimeError): pass
class RpcServer: def __init__(self, address: str): self.ctx = zmq.Context.instance() self.sock = self.ctx.socket(zmq.REP) self.sock.bind(address)
self.methods = { "__introspect__": lambda: sorted([k for k in self.methods.keys() if not k.startswith("_")]), }
def run(self) -> None: """ 处理RPC请求, 在循环中调用该方法 """ req = self.sock.recv_json() try: method = req["method"] args = req.get("args", []) kwargs = req.get("kwargs", {}) fn = self.methods[method] result = fn(*args, **kwargs) self.sock.send_json({ "ok": True, "result": result }) except Exception as e: self.sock.send_json({ "ok": False, "error_type": type(e).__name__, "error_message": str(e), "trace": traceback.format_exc(), }) def close(self) -> None: self.sock.close() self.ctx.destroy()
class RpcClient: def __init__(self, address: str, timeout:int = 1000): self.ctx = zmq.Context.instance() self.sock = self.ctx.socket(zmq.REQ) self.sock.connect(address)
self.sock.setsockopt(zmq.RCVTIMEO, timeout) self.sock.setsockopt(zmq.SNDTIMEO, timeout)
def call(self, method: str, *args, **kwargs): self.sock.send_json({"method": method, "args": list(args), "kwargs": kwargs}) resp = self.sock.recv_json() if resp.get("ok"): return resp.get("result") raise RemoteError(f"{resp.get('error_type')}: {resp.get('error_message')}") def __getattr__(self, name: str): def _method(*args, **kwargs): return self.call(name, *args, **kwargs) return _method
def close(self) -> None: self.sock.close() self.ctx.destroy()
|
第二步,写相机进程的rpc服务器和客户端
首先,暂时手动维护两个字典:
1 2 3 4 5 6 7 8 9 10 11 12 13
| class CameraProcess(BaseProcess): self.methods = { "start": self.pipeline.start, "stop": self.pipeline.stop, "control_branch": self.pipeline.control_branch, "set_display": lambda enabled: self.pipeline.control_branch("displaybin", enabled), } class CameraRpcClient(RpcClient): def start(self): return self.call("start") def stop(self): return self.call("stop") def set_display(self, enabled: bool): return self.call("set_display", enabled) def control_branch(self, name: str, enabled: bool): return self.call("control_branch", name, enabled)
|
先手动绑定,之后有事件再修改吧
最后,在进程管理器manager中初始化RPC服务,并传递给相机进程和服务中,这部分很简单
1 2 3 4 5 6 7 8 9 10 11
| CAMERA_SOCKET = f"ipc:///tmp/camera.sock"
camera_rpc = RpcServer(CAMERA_SOCKET) self.camera_client = CameraRpcClient(CAMERA_SOCKET)
self.camera_process = CameraProcess(self.camera_config, self.stop_event, producer, camera_rpc)
|
二、部署测试
报了个错误,是因为子进程实例化的地方错了,重新在run方法中实例化了一下,没有任何问题,service层也非常简洁了,完美
上板测试的时候发现,极小概率会导致发出未手动导致的卡死,报错:Operation cannot be accomplished in current state
后续要捕获超时异常,并自动重启RPC,另外回头想办法把.get的报错搞掉,看着不太舒服
测得空载时占用0%,开相机时程序占用5%,主要是3A算法占用较高,开启相机+画面预览的占用在10%左右


三、AppBin
几乎没什么好说的,复制粘贴之前的displaybin稍微改改就可以了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| class AppSinkBin(Gst.Bin): """ 发送到appsink: sink ghost -> appsink """ def __init__(self, name : str): super().__init__(name=name) self._build()
def _build(self) -> None: self.conv = Gst.ElementFactory.make("videoconvert", "conv") self.cap = Gst.ElementFactory.make("capsfilter", "cap") self.sink = Gst.ElementFactory.make("appsink", "sink")
if not all([self.conv, self.cap, self.sink]): raise RuntimeError("AppSinkBin: failed to create elements") caps_str = f"video/x-raw,format=BGR" self.cap.set_property("caps", Gst.Caps.from_string(caps_str))
self.sink.set_property("emit-signals", True) self.sink.set_property("sync", False) self.sink.set_property("drop", bool(self.drop)) self.add(self.conv) self.add(self.cap) self.add(self.sink)
if not self.conv.link(self.cap): raise RuntimeError(f"AppSinkBin: conv->cap link failed, {caps_str}") if not self.cap.link(self.sink): raise RuntimeError("AppSinkBin: cap->appsink link failed")
ghost_sink = Gst.GhostPad.new("sink", self.conv.get_static_pad("sink")) self.add_pad(ghost_sink)
|
然后主管道中添加分支:
1 2
| self.appsink_bin = AppSinkBin(name = "appsinkbin") self._add_branch(self.appsink_bin, drop=False)
|
本地测试观察到视频变慢了,可能有点问题,先不管他,设备测试没有问题,实时性很好,CPU占用大概到15%左右
时间有点晚了,天都要亮了,python拿appsink似乎有点麻烦,要再研究一下