RKVision开发日志(十二)

再次感慨一下代码水平和项目管理还是差的太远太远了,想着用更好一点的框架,更成熟一点的驱动,有没有更好的方案,到头来写了改改了写,浪费时间

昨天重新简单规划了一下框架,今天在开始之前先简单规划一下需要修改的内容

之前相机进程之前是写的opencv的capture直接读取,这次改成用gst读取,并封装restart,这意味着要跨进程传递命令。

  1. 首先写一个进程间通信通道,实现主进程和相机进程通信,并控制相机进程,并封装控制指令
  2. 主进程建立一个子线程接收回传回的状态信息
  3. 修改相机进程,实现进程开启、关闭、重启,并创建监听子线程
  4. 修改service,lifespan, 对外暴露接口
  5. 本地测试,上机测试

一、命令/状态IPC

主进程和子进程点对点通信可以依赖queue,这里就不用python的multiprocess manager了

主进程 –control–> 子进程
子进程 –status–> 主进程

先归纳一下主进程对相机进程可能的控制指令:

CMD命令 Message消息
开启/关闭 相机路径
获取/设置参数 参数字典
子进程返回的信息:
功能 status状态 Message消息
开启/关闭相机 OK/ERR None
获取/设置参数 OK/ERR 参数
分析了一下,主进程对算法进程的消息大致上也差不多,可以开始编写
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 定义指令枚举
class CommandEnum(str, Enum):
START = "start"
STOP = "stop"
GET = "get_params"
SET = "set_params"

class StatusEnum(str, Enum):
OK = "ok"
ERR = "err"

# 3. 定义控制消息模型 (主 -> 子)
class ControlMessage(BaseModel):
cmd: CommandEnum
params: Optional[Dict[str, Any]] = None # 携带参数

# 4. 定义状态消息模型 (子 -> 主)
class StatusMessage(BaseModel):
status: StatusEnum
msg: Optional[Any] = None # 携带数据,如果出错,记录错误信息

接下来,封装为endpoint和manager,初始化后分发给进程两端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class MasterPoint:
"""
进程间通信端点
负责单个进程的IPC通信
在各个子进程中初始化该端点
"""
def __init__(self,
recv_queue:Queue,
send_queue:Queue):
self.recv_queue = recv_queue
self.send_queue = send_queue


def send(self, msg: ControlMessage):
self.send_queue.put(msg)

def recv(self) -> StatusMessage:
return self.recv_queue.get(timeout=1)

def close(self):
self.recv_queue.close()
self.send_queue.close()

class SlavePoint:
"""
进程间通信端点
负责单个进程的IPC通信
在各个子进程中初始化该端点
"""
def __init__(self,
recv_queue:Queue,
send_queue:Queue):
self.recv_queue = recv_queue
self.send_queue = send_queue

def send(self, msg: StatusMessage):
self.send_queue.put(msg)

def recv(self) -> ControlMessage:
return self.recv_queue.get(timeout=1)


class ChannelManager:
"""
进程间的IPC通道
负责主进程与子进程之间的控制与数据传输
维护两个消息队列:
主进程 --control--> 子进程
子进程 --status--> 主进程
在控制进程中初始化该管理器
"""
def __init__(self,
maxsize = 0):
self.maxsize = maxsize
self.master2slave = Queue(maxsize=self.maxsize)
self.slave2master = Queue(maxsize=self.maxsize)

def create(self) -> Tuple[MasterPoint, SlavePoint]:
# 主进程:收 slave2master,发 master2slave
master = MasterPoint(self.slave2master, self.master2slave)
# 子进程:收 master2slave,发 slave2master
slave = SlavePoint(self.master2slave, self.slave2master)
return master, slave


def refresh(self):
# 关闭旧队列
try:
if hasattr(self, 'master2slave'):
self.master2slave.close()
except Exception:
pass

try:
if hasattr(self, 'slave2master'):
self.slave2master.close()
except Exception:
pass

# 创建新队列
self.master2slave = Queue(maxsize=self.maxsize)
self.slave2master = Queue(maxsize=self.maxsize)

和之前相机与算法的通信通道大同小异,这里不多赘述了。接下来写到进程管理器中,并测试

没什么问题

二、配套代码

这样不太好测试,所以顺便把manager、service写了,并由web调用

说实话有点后悔没用multiprocess.manager了,跨进程调用很麻烦,而且还自定义通信内容,两边要分别解码,平白增加了很多代码量

简单记录一下传递链:

在API层做依赖注入,把start绑定到了camera_service.start()

1
2
3
4
5
6
7
8
9
10
11
12
13
# API层
def get_camera_service(request: Request) -> CameraService:
"""
获取相机单例
"""
return request.app.state.camera_service

@router.post("/api/camera/start", response_model=CameraStatus, summary="启动摄像头")
async def start_camera(service: CameraService = Depends(get_camera_service)):
try:
return service.start()
except RuntimeError as e:
raise HTTPException(status_code=500, detail=str(e))

在service层实现了状态管理,并调用manager.send_message,向相机进程发送了命令,其实到这里职责也很清晰,但是这个命令就已经稍微有点难崩了

1
2
3
4
5
6
7
8
9
10
11
12
# service层
def start(self) -> CameraStatus:
with self._lock:
try:
if self._state is CameraState.RUNNING:
return CameraStatus(ok=False, message="摄像头已在运行", status=self._state)
self.process_manager.send_message("camera", ControlMessage(cmd=CommandEnum.START))
self._state = CameraState.RUNNING
return CameraStatus(ok=True, message="摄像头已启动", status=self._state)
except Exception as e:
self._state = CameraState.STOPPED
raise RuntimeError(f"启动摄像头失败: {e}") from e

然后在管理器中,发送指令的命令长这个样子:

1
2
3
    def send_message(self, process_name: str, message: ControlMessage):
        # 负责给对应进程发送消息
        self.process_dict[process_name].send(message)

然后在相机进程中解析命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def parse_control(self):
"""
解析指令
"""
try:
recv = self.from_backend.recv()
if recv.cmd == CommandEnum.START:
self.pipeline.start()
elif recv.cmd == CommandEnum.STOP:
self.pipeline.stop()
elif recv.cmd == CommandEnum.SET:
if recv.params is not None and "displaybin" in recv.params:
if recv.params["displaybin"]:
self.pipeline.control_branch("displaybin", True)
else:
self.pipeline.control_branch("displaybin", False)
self.from_backend.send(StatusMessage(status=StatusEnum.OK, msg="相机控制指令已处理"))

except Empty:
# 捕获超时异常
# 这里表示“这一轮没有收到消息”,什么都不用做,直接返回
pass

except Exception as e:
logger.exception(f"{self.name}解析指令发生异常: {e}")

到这里就很不优雅了,做了一个非常丑陋的指令解析,最后发送了gst指令的管道

和AI沟通了一下,有注册器方法,RPC方法等,这样在服务层就可以跨进程调用方法了,研究一下,明天写

明天争取把gst的AppBin写出来,这样就可以作为一个比较完整的demo进行演示了

又是这样…白天写一天代码有缺陷,晚上想想又给推翻了….


RKVision开发日志(十二)
http://blog.mingxuan.xin/2026/01/21/20260121/
作者
Obscure
发布于
2026年1月21日
许可协议