RKVision开发日志(四)

一、相机进程基类

相机进程继承进程类,主要负责:

  1. 读取配置文件,根据配置文件设置的width,height,fps配置相机
  2. 开启相机捕获进程,将帧数据写入一段共享内存中,同时将帧号、时间戳写入队列
  3. 监听消息队列,执行命令

需要提前设计:

  1. 不同的相机硬件所需的media_pipline不同,把这部分需要写一个方法,方便后续子类重写
  2. 执行的命令包括:停止图像采集进程,开启/关闭预览图像生成,开启/关闭图像保存

二、shm共享内存

在设计共享内存时,要搞得参数有点多,比如说内存大小,共享帧长度、数据格式等,还有结束后要对内存空间进行释放,先实验一下,然后再进行封装。首先在control进程中创建一个共享内存和队列简单实验一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# control.py
def start(self) -> None:
# 启动进程
logger.info("控制进程正在启动...")
msg_queue = Queue(maxsize=BUFFER_SIZE)
# 暂时写死,启动第一个相机
self.camera_process = CameraProcess(self.config.cameras[0], self.stop_event, msg_queue)
self.algo_process = AlgoProcess(self.config.algo, self.stop_event, msg_queue)
# 计算总字节数
n_bytes = int(BUFFER_SIZE * np.prod(SHAPE) * np.dtype(DTYPE).itemsize)
# 主进程创建共享内存
shm = shared_memory.SharedMemory(name='img_buffer', create=True, size=n_bytes)


self.camera_process.start()
self.algo_process.start()

try:
while not self.stop_event.is_set():
# 这里是进程的主要逻辑
time.sleep(0.1)
# 监控相机进程
if self.camera_process is not None:
if not self.camera_process.is_alive():
logger.info(f"{self.camera_process.name}进程已停止")
self.camera_process = None
break
except Exception as e:
logger.error(f"控制进程发生错误: {e}")
finally:
shm.close()
shm.unlink()
self.stop()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# camera.py
def run(self) -> None:
self.before_run()
# 1. 关联共享内存
self.existing_shm = shared_memory.SharedMemory(name='img_buffer')
# 使用 numpy 包装共享内存,方便像操作数组一样操作它
# 总数据量是 BUFFER_SIZE 帧图像
shared_array = np.ndarray((BUFFER_SIZE, *SHAPE), dtype=DTYPE, buffer=self.existing_shm.buf)
frame_idx = 0

try:
if self.cap is None:
raise RuntimeError("相机初始化失败!")
while not self.stop_event.is_set():
ret, frame = self.cap.read()
# 计算当前应该写入哪个槽位 (环形缓冲)
slot_idx = frame_idx % BUFFER_SIZE

# 2. 将数据拷贝到共享内存 (此处的拷贝是必须的,从硬件/驱动到内存)
shared_array[slot_idx][:] = frame[:]

# 3. 将元数据放入队列通知消费者
timestamp = time.time()
self.queue.put({
'slot_idx': slot_idx,
'frame_id': frame_idx,
'timestamp': timestamp
})

frame_idx += 1

if not ret:
# 如果是相机,则断开并退出
if self.is_camera:
logger.warning(f"{self.name}: 捕获失败!")
break
else:
# 视频文件则循环播放
logger.info(f"{self.name}: 视频播放结束,重新开始循环播放。")
self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
continue
time.sleep(0.1)
except Exception as e:
logger.error(f"{self.name}相机进程发生错误: {e}")
finally:
self.stop()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# algo.py
def run(self) -> None:
# 1. 关联共享内存
self.before_run()
existing_shm = shared_memory.SharedMemory(name='img_buffer')
shared_array = np.ndarray((BUFFER_SIZE, *SHAPE), dtype=DTYPE, buffer=existing_shm.buf)

try:
while not self.stop_event.is_set():
try:
# 2. 从队列获取元数据 (设置超时以便能响应 stop_event)
meta = self.queue.get(timeout=1)
slot_idx = meta['slot_idx']

# 3. 直接从共享内存读取数据 (注意:此处是引用,非常快)
frame = shared_array[slot_idx]

# --- 模拟图像处理 ---
# 例如:在画面上画出帧号
cv2.putText(frame, f"ID: {meta['frame_id']}", (50, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow("Consumer Process", frame)

if cv2.waitKey(1) & 0xFF == ord('q'):
self.stop_event.set()

except:
continue
finally:
existing_shm.close()
cv2.destroyAllWindows()
self.stop()

需要注意的是,shm需要在各个进程中close回收内存,在主进程中unlink断开连接。

三、进程间通信优化

现在的业务逻辑都写在control中,后面如果需要重启/关闭等操作时显然需要进行重写,为了逻辑清晰,可以考虑把这一部分写到ipc.py中,以进程通信的方式创建,简化代码逻辑,但是经过一通封装实际上并没有太简化,只是看上去少了两行)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import numpy as np
from app.core import CameraConfig
from multiprocessing import Queue,shared_memory
from multiprocessing.shared_memory import SharedMemory

def camera_to_algo(config:CameraConfig) -> tuple[Queue, SharedMemory]:
"""
主进程/控制进程中调用
解析配置文件, 创建共享帧地址,
返回值为通信队列和共享内存
"""
width = config.width
height = config.height
channels = 3 # RGB或BGR
shape = (height, width, channels)
DTYPE = np.uint8
# 计算需要共享的内存大小
bytes = int(config.buffer * np.prod(shape) * np.dtype(DTYPE).itemsize)

msg_queue = Queue(config.buffer)
shm = shared_memory.SharedMemory(name=config.name, create=True, size=bytes)
return msg_queue, shm

def init_shm(config:CameraConfig) -> tuple[np.ndarray, SharedMemory]:
"""
子进程/相机进程/算法进程中调用
解析配置文件
获取/提交共享内存的内容
返回帧数组和shm地址
"""
width = config.width
height = config.height
channels = 3 # RGB或BGR
shape = (height, width, channels)
DTYPE = np.uint8
shm = shared_memory.SharedMemory(name=config.name)
# 数据经过numpy包装,可直接获取/提交帧数据
shared_array = np.ndarray((config.buffer, *shape), dtype=DTYPE, buffer=shm.buf)
return shared_array, shm

RKVision开发日志(四)
http://blog.mingxuan.xin/2026/01/01/20260101/
作者
Obscure
发布于
2026年1月1日
许可协议