再次感慨一下代码水平和项目管理还是差的太远太远了,想着用更好一点的框架,更成熟一点的驱动,有没有更好的方案,到头来写了改改了写,浪费时间
昨天重新简单规划了一下框架,今天在开始之前先简单规划一下需要修改的内容
之前相机进程之前是写的opencv的capture直接读取,这次改成用gst读取,并封装restart,这意味着要跨进程传递命令。
- 首先写一个进程间通信通道,实现主进程和相机进程通信,并控制相机进程,并封装控制指令
- 主进程建立一个子线程接收回传回的状态信息
- 修改相机进程,实现进程开启、关闭、重启,并创建监听子线程
- 修改service,lifespan, 对外暴露接口
- 本地测试,上机测试
一、命令/状态IPC
主进程和子进程点对点通信可以依赖queue,这里就不用python的multiprocess manager了
主进程 –control–> 子进程
子进程 –status–> 主进程
先归纳一下主进程对相机进程可能的控制指令:
| CMD命令 |
Message消息 |
| 开启/关闭 |
相机路径 |
| 获取/设置参数 |
参数字典 |
| 子进程返回的信息: |
|
| 功能 |
status状态 |
Message消息 |
| 开启/关闭相机 |
OK/ERR |
None |
| 获取/设置参数 |
OK/ERR |
参数 |
| 分析了一下,主进程对算法进程的消息大致上也差不多,可以开始编写 |
|
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| class CommandEnum(str, Enum): START = "start" STOP = "stop" GET = "get_params" SET = "set_params"
class StatusEnum(str, Enum): OK = "ok" ERR = "err"
class ControlMessage(BaseModel): cmd: CommandEnum params: Optional[Dict[str, Any]] = None
class StatusMessage(BaseModel): status: StatusEnum msg: Optional[Any] = None
|
接下来,封装为endpoint和manager,初始化后分发给进程两端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| class MasterPoint: """ 进程间通信端点 负责单个进程的IPC通信 在各个子进程中初始化该端点 """ def __init__(self, recv_queue:Queue, send_queue:Queue): self.recv_queue = recv_queue self.send_queue = send_queue
def send(self, msg: ControlMessage): self.send_queue.put(msg)
def recv(self) -> StatusMessage: return self.recv_queue.get(timeout=1) def close(self): self.recv_queue.close() self.send_queue.close()
class SlavePoint: """ 进程间通信端点 负责单个进程的IPC通信 在各个子进程中初始化该端点 """ def __init__(self, recv_queue:Queue, send_queue:Queue): self.recv_queue = recv_queue self.send_queue = send_queue
def send(self, msg: StatusMessage): self.send_queue.put(msg)
def recv(self) -> ControlMessage: return self.recv_queue.get(timeout=1)
class ChannelManager: """ 进程间的IPC通道 负责主进程与子进程之间的控制与数据传输 维护两个消息队列: 主进程 --control--> 子进程 子进程 --status--> 主进程 在控制进程中初始化该管理器 """ def __init__(self, maxsize = 0): self.maxsize = maxsize self.master2slave = Queue(maxsize=self.maxsize) self.slave2master = Queue(maxsize=self.maxsize) def create(self) -> Tuple[MasterPoint, SlavePoint]: master = MasterPoint(self.slave2master, self.master2slave) slave = SlavePoint(self.master2slave, self.slave2master) return master, slave
def refresh(self): try: if hasattr(self, 'master2slave'): self.master2slave.close() except Exception: pass try: if hasattr(self, 'slave2master'): self.slave2master.close() except Exception: pass self.master2slave = Queue(maxsize=self.maxsize) self.slave2master = Queue(maxsize=self.maxsize)
|
和之前相机与算法的通信通道大同小异,这里不多赘述了。接下来写到进程管理器中,并测试
没什么问题
二、配套代码
这样不太好测试,所以顺便把manager、service写了,并由web调用
说实话有点后悔没用multiprocess.manager了,跨进程调用很麻烦,而且还自定义通信内容,两边要分别解码,平白增加了很多代码量
简单记录一下传递链:
在API层做依赖注入,把start绑定到了camera_service.start()上
1 2 3 4 5 6 7 8 9 10 11 12 13
| def get_camera_service(request: Request) -> CameraService: """ 获取相机单例 """ return request.app.state.camera_service
@router.post("/api/camera/start", response_model=CameraStatus, summary="启动摄像头") async def start_camera(service: CameraService = Depends(get_camera_service)): try: return service.start() except RuntimeError as e: raise HTTPException(status_code=500, detail=str(e))
|
在service层实现了状态管理,并调用manager.send_message,向相机进程发送了命令,其实到这里职责也很清晰,但是这个命令就已经稍微有点难崩了
1 2 3 4 5 6 7 8 9 10 11 12
| def start(self) -> CameraStatus: with self._lock: try: if self._state is CameraState.RUNNING: return CameraStatus(ok=False, message="摄像头已在运行", status=self._state) self.process_manager.send_message("camera", ControlMessage(cmd=CommandEnum.START)) self._state = CameraState.RUNNING return CameraStatus(ok=True, message="摄像头已启动", status=self._state) except Exception as e: self._state = CameraState.STOPPED raise RuntimeError(f"启动摄像头失败: {e}") from e
|
然后在管理器中,发送指令的命令长这个样子:
1 2 3
| def send_message(self, process_name: str, message: ControlMessage): self.process_dict[process_name].send(message)
|
然后在相机进程中解析命令:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| def parse_control(self): """ 解析指令 """ try: recv = self.from_backend.recv() if recv.cmd == CommandEnum.START: self.pipeline.start() elif recv.cmd == CommandEnum.STOP: self.pipeline.stop() elif recv.cmd == CommandEnum.SET: if recv.params is not None and "displaybin" in recv.params: if recv.params["displaybin"]: self.pipeline.control_branch("displaybin", True) else: self.pipeline.control_branch("displaybin", False) self.from_backend.send(StatusMessage(status=StatusEnum.OK, msg="相机控制指令已处理"))
except Empty: pass except Exception as e: logger.exception(f"{self.name}解析指令发生异常: {e}")
|
到这里就很不优雅了,做了一个非常丑陋的指令解析,最后发送了gst指令的管道
和AI沟通了一下,有注册器方法,RPC方法等,这样在服务层就可以跨进程调用方法了,研究一下,明天写
明天争取把gst的AppBin写出来,这样就可以作为一个比较完整的demo进行演示了
又是这样…白天写一天代码有缺陷,晚上想想又给推翻了….