1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
| class EndPoint: """ 进程间通信端点 负责单个进程的IPC通信 在各个子进程中初始化该端点 """ def __init__(self, config: CameraConfig, send_q: Queue, recv_q: Queue): self.config = config self.shape = (self.config.height, self.config.width, 3) self.frame_size = int(self.config.height * self.config.width * 3 * np.dtype(np.uint8).itemsize) self.shm: SharedMemory = SharedMemory(name=config.name, size=self.frame_size * self.config.buffer) self.send_queue: Queue = send_q self.recv_queue: Queue = recv_q
def write_frame(self, time: float, frame: np.ndarray): """ 仅生产者调用 生产者接收消费者返回的可用共享内存下标 向可用地址写入一帧图像数据 生产者消息格式:{ "idx": 共享内存下标, "timestamp": 该帧时间戳 } """ msg = self.recv_queue.get() offset = msg['idx'] * self.frame_size target_frame = np.ndarray(self.shape, dtype=np.uint8, buffer=self.shm.buf, offset=offset) np.copyto(target_frame, frame) self.send_queue.put({ "idx": msg['idx'], "timestamp": time })
def read_frame(self) -> tuple[float,np.ndarray]: """ 仅消费者调用 消费者接收生产者发送的可用共享内存下标 从共享内存中读取一帧图像数据 消费者消息格式:{ "idx": 共享内存下标 } """ msg = self.recv_queue.get() frame = np.ndarray((self.config.buffer, *self.shape), dtype=np.uint8, buffer=self.shm.buf)[msg['idx']] self.send_queue.put({ "idx": msg['idx'] }) return msg['timestamp'], frame
def close(self): """ 关闭IPC通道 """ self.send_queue.close() self.recv_queue.close() self.shm.close()
class FramePoolManager: """ 相机与算法进程间的IPC通道 负责相机进程与算法进程之间的数据传输 维护两个消息队列和一组共享内存 在控制进程中初始化该管理器 """ def __init__(self, config: CameraConfig): self.config = config self.shm: SharedMemory
def create(self) -> tuple[EndPoint, EndPoint]: """ 根据配置文件创建IPC通道 返回创建的消息队列 """ frame_size = int(self.config.height * self.config.width * 3 * np.dtype(np.uint8).itemsize) self.shm = SharedMemory(name=self.config.name, create=True, size=frame_size * self.config.buffer) q_p2c = Queue(maxsize=self.config.buffer) q_c2p = Queue(maxsize=self.config.buffer)
for i in range(self.config.buffer): q_c2p.put({"idx": i})
producer = EndPoint(self.config, send_q=q_p2c, recv_q=q_c2p) consumer = EndPoint(self.config, send_q=q_c2p, recv_q=q_p2c)
return producer, consumer
def delete(self): """ 释放shm地址 """ self.shm.close() self.shm.unlink()
|