RKVision开发日志(五)

一、相机进程类完善

目前已经实现了基本的传递图像的功能,接下来需要完善:

  1. 增加预览图功能,并要考虑不同平台下的硬件加速功能(RGA)
  2. 增加存图功能,要求同上
  3. 定义一些功能,方便外部进程调用

不过暂时我没有瑞芯微的设备调试,前两个还无法做到,但是需要预留重写的接口。

另外昨天写的并没有考虑锁的问题,如果一边在读一边在写同一片地址空间会出问题。

而且后续可能需要兼容其他形式的数据格式,比如采用编码的形式存入缓存,另一进程解码读取,现在的代码在后续修改的时候难度会很大。

在思考如何构造一个轻量的ipc的时候看了一下frigate的代码,再次被折服…

其本质上还是创建了一个环形地址空间,先贴生产者的调用:

1
2
3
4
5
6
7
8
# don't lock the queue to check, just try since it should rarely be full
try:
# add to the queue
frame_queue.put((frame_name, current_frame.value), False)
frame_manager.close(frame_name)
except queue.Full:
# if the queue is full, skip this frame
skipped_eps.update()

当传递的消息队列满了之后则停止新帧的写入,这也就是说,只要队列长度 > 2, 消费者还没有来得及处理新帧,生产者就会把新的帧丢弃;
而由于队列传递的相当于帧的“地址”,消费者是根据“地址”获取图像数据,当消费者queue.get地址的一瞬间,队列不满了,此刻消费者正在读内存中的数据,而此刻生产者发现队列不满,向队列中写新的一帧,这时不就发生冲突了吗?这是怎么避免的呢?生产者也没有办法检查他要写入的内存是否被其他消费者close呀!

虽然他封装的很好,但是依然没有解决半帧撕裂的问题。归根结底就是生产者不知道消费者是否正确读取,这种单向通信模式无法做到读写冲突的问题。既然如此,那就加一条队列,由消费者(图像处理进程)到生产者(相机捕获进程),读完帧数据后返回一个可用帧name,然后生产者再根据name写入地址空间。

​具体步骤:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
​A. 初始化阶段
​预先创建 N 块共享内存(例如 frame_0, frame_1, ... frame_5)。
​将这 N 个名字全部放入 Free Queue。

​B. 生产者逻辑
​name = Free_Queue.get():从空闲队列取出一个“准考证”。(如果队列为空,说明消费者处理太慢,生产者自动阻塞等待,起到限速作用)。
​根据 name 找到对应的 shm.buf,直接写入图像数据。
​Ready_Queue.put({"name": name, "timestamp": ts}):将写好的地址发给消费者。

​C. 消费者逻辑
​task = Ready_Queue.get():获取任务。
​根据 task["name"] 找到内存地址,进行图像处理/算法分析。
​处理完成后。
​Free_Queue.put(task["name"]):将名字还回空闲队列,告知生产者这块内存“我用完了,你可以洗掉它了”。

二、 camera <-> algo 通信链路

根据上述思想完成了代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
class EndPoint:
"""
进程间通信端点
负责单个进程的IPC通信
在各个子进程中初始化该端点
"""
def __init__(self,
config: CameraConfig,
send_q: Queue,
recv_q: Queue):
self.config = config
self.shape = (self.config.height, self.config.width, 3)
self.frame_size = int(self.config.height * self.config.width * 3
* np.dtype(np.uint8).itemsize)
self.shm: SharedMemory = SharedMemory(name=config.name,
size=self.frame_size * self.config.buffer)
self.send_queue: Queue = send_q
self.recv_queue: Queue = recv_q


def write_frame(self, time: float, frame: np.ndarray):
"""
仅生产者调用
生产者接收消费者返回的可用共享内存下标
向可用地址写入一帧图像数据
生产者消息格式:{ "idx": 共享内存下标, "timestamp": 该帧时间戳 }
"""
msg = self.recv_queue.get() # 获取可用的共享内存下标
# 计算内存中偏移量
offset = msg['idx'] * self.frame_size
# 使用 numpy 的视图写入
# 创建一个指向该偏移量的 numpy 数组视图
target_frame = np.ndarray(self.shape,
dtype=np.uint8,
buffer=self.shm.buf,
offset=offset)

np.copyto(target_frame, frame)

self.send_queue.put({
"idx": msg['idx'],
"timestamp": time
})

def read_frame(self) -> tuple[float,np.ndarray]:
"""
仅消费者调用
消费者接收生产者发送的可用共享内存下标
从共享内存中读取一帧图像数据
消费者消息格式:{ "idx": 共享内存下标 }
"""
msg = self.recv_queue.get()
frame = np.ndarray((self.config.buffer, *self.shape),
dtype=np.uint8,
buffer=self.shm.buf)[msg['idx']]
self.send_queue.put({
"idx": msg['idx']
})
return msg['timestamp'], frame

def close(self):
"""
关闭IPC通道
"""
self.send_queue.close()
self.recv_queue.close()
self.shm.close()

class FramePoolManager:
"""
相机与算法进程间的IPC通道
负责相机进程与算法进程之间的数据传输
维护两个消息队列和一组共享内存
在控制进程中初始化该管理器
"""
def __init__(self, config: CameraConfig):
self.config = config
self.shm: SharedMemory

def create(self) -> tuple[EndPoint, EndPoint]:
"""
根据配置文件创建IPC通道
返回创建的消息队列
"""
# 计算单帧需要共享的内存大小: width * height * channels
frame_size = int(self.config.height * self.config.width * 3
* np.dtype(np.uint8).itemsize)
self.shm = SharedMemory(name=self.config.name,
create=True,
size=frame_size * self.config.buffer)
# 初始化通信队列
q_p2c = Queue(maxsize=self.config.buffer)
q_c2p = Queue(maxsize=self.config.buffer)

# 生产者队列预先填充可用帧
for i in range(self.config.buffer):
q_c2p.put({"idx": i})

# 链接两个通信端点
producer = EndPoint(self.config, send_q=q_p2c, recv_q=q_c2p)
consumer = EndPoint(self.config, send_q=q_c2p, recv_q=q_p2c)

return producer, consumer

def delete(self):
"""
释放shm地址
"""
self.shm.close()
self.shm.unlink()

其中,在主控制进程中只需:

1
producer, consumer = self.frame_manager.create()

然后把两个通信端点endpoint传递给子进程中即可,子进程只需调用endpoint.writeendpoint.read即可,在结束使用时:

1
2
3
4
# 主进程
frame_manager.delete()
# 子进程
endpoint.close()

RKVision开发日志(五)
http://blog.mingxuan.xin/2026/01/03/20260103/
作者
Obscure
发布于
2026年1月3日
许可协议