一、相机进程基类 相机进程继承进程类,主要负责:
读取配置文件,根据配置文件设置的width,height,fps配置相机
开启相机捕获进程,将帧数据写入一段共享内存中,同时将帧号、时间戳写入队列
监听消息队列,执行命令
需要提前设计:
不同的相机硬件所需的media_pipline不同,把这部分需要写一个方法,方便后续子类重写
执行的命令包括:停止图像采集进程,开启/关闭预览图像生成,开启/关闭图像保存
二、shm共享内存 在设计共享内存时,要搞得参数有点多,比如说内存大小,共享帧长度、数据格式等,还有结束后要对内存空间进行释放,先实验一下,然后再进行封装。首先在control进程中创建一个共享内存和队列简单实验一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 def start (self ) -> None : logger.info("控制进程正在启动..." ) msg_queue = Queue(maxsize=BUFFER_SIZE) self .camera_process = CameraProcess(self .config.cameras[0 ], self .stop_event, msg_queue) self .algo_process = AlgoProcess(self .config.algo, self .stop_event, msg_queue) n_bytes = int (BUFFER_SIZE * np.prod(SHAPE) * np.dtype(DTYPE).itemsize) shm = shared_memory.SharedMemory(name='img_buffer' , create=True , size=n_bytes) self .camera_process.start() self .algo_process.start() try : while not self .stop_event.is_set(): time.sleep(0.1 ) if self .camera_process is not None : if not self .camera_process.is_alive(): logger.info(f"{self.camera_process.name} 进程已停止" ) self .camera_process = None break except Exception as e: logger.error(f"控制进程发生错误: {e} " ) finally : shm.close() shm.unlink() self .stop()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 def run (self ) -> None : self .before_run() self .existing_shm = shared_memory.SharedMemory(name='img_buffer' ) shared_array = np.ndarray((BUFFER_SIZE, *SHAPE), dtype=DTYPE, buffer=self .existing_shm.buf) frame_idx = 0 try : if self .cap is None : raise RuntimeError("相机初始化失败!" ) while not self .stop_event.is_set(): ret, frame = self .cap.read() slot_idx = frame_idx % BUFFER_SIZE shared_array[slot_idx][:] = frame[:] timestamp = time.time() self .queue.put({ 'slot_idx' : slot_idx, 'frame_id' : frame_idx, 'timestamp' : timestamp }) frame_idx += 1 if not ret: if self .is_camera: logger.warning(f"{self.name} : 捕获失败!" ) break else : logger.info(f"{self.name} : 视频播放结束,重新开始循环播放。" ) self .cap.set (cv2.CAP_PROP_POS_FRAMES, 0 ) continue time.sleep(0.1 ) except Exception as e: logger.error(f"{self.name} 相机进程发生错误: {e} " ) finally : self .stop()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 def run (self ) -> None : self .before_run() existing_shm = shared_memory.SharedMemory(name='img_buffer' ) shared_array = np.ndarray((BUFFER_SIZE, *SHAPE), dtype=DTYPE, buffer=existing_shm.buf) try : while not self .stop_event.is_set(): try : meta = self .queue.get(timeout=1 ) slot_idx = meta['slot_idx' ] frame = shared_array[slot_idx] cv2.putText(frame, f"ID: {meta['frame_id' ]} " , (50 , 50 ), cv2.FONT_HERSHEY_SIMPLEX, 1 , (0 , 255 , 0 ), 2 ) cv2.imshow("Consumer Process" , frame) if cv2.waitKey(1 ) & 0xFF == ord ('q' ): self .stop_event.set () except : continue finally : existing_shm.close() cv2.destroyAllWindows() self .stop()
需要注意的是,shm需要在各个进程中close回收内存,在主进程中unlink断开连接。
三、进程间通信优化 现在的业务逻辑都写在control中,后面如果需要重启/关闭等操作时显然需要进行重写,为了逻辑清晰,可以考虑把这一部分写到ipc.py中,以进程通信的方式创建,简化代码逻辑,但是经过一通封装实际上并没有太简化,只是看上去少了两行)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import numpy as npfrom app.core import CameraConfigfrom multiprocessing import Queue,shared_memoryfrom multiprocessing.shared_memory import SharedMemorydef camera_to_algo (config:CameraConfig ) -> tuple [Queue, SharedMemory]: """ 主进程/控制进程中调用 解析配置文件, 创建共享帧地址, 返回值为通信队列和共享内存 """ width = config.width height = config.height channels = 3 shape = (height, width, channels) DTYPE = np.uint8 bytes = int (config.buffer * np.prod(shape) * np.dtype(DTYPE).itemsize) msg_queue = Queue(config.buffer) shm = shared_memory.SharedMemory(name=config.name, create=True , size=bytes ) return msg_queue, shmdef init_shm (config:CameraConfig ) -> tuple [np.ndarray, SharedMemory]: """ 子进程/相机进程/算法进程中调用 解析配置文件 获取/提交共享内存的内容 返回帧数组和shm地址 """ width = config.width height = config.height channels = 3 shape = (height, width, channels) DTYPE = np.uint8 shm = shared_memory.SharedMemory(name=config.name) shared_array = np.ndarray((config.buffer, *shape), dtype=DTYPE, buffer=shm.buf) return shared_array, shm