RKVision开发日志(三)

一、Python多进程

我个人对多进程的理解是某种时分复用,比如人类在做家务时:需要做:烧热水,洗衣服,扫地。人类会先把水打到烧水壶中,按一下按钮;然后去把脏衣服整理好放到洗衣机中,打开洗衣机;然后去扫地,等待水烧开和洗衣机洗好之后再操作。而对于程序而言,上述任务需要几个循环同时进行,在做主任务时需要监控其他任务是否完成,显然在同一个程序中不太好做这样的操作,为了解决这一问题,才出现了多线程、多进程。

多进程的创建方式有很多种,官方文档multiprocessing — 基于进程的并行 — Python 3.14.2 文档
推荐的方式都是写一个函数,然后由process(target, args)调用。其中:target是进程函数,args是需要初始化时传递的参数。

process对象需要调用start方法启动,join方法停止。这就是最简单的进程用法。

1
2
3
4
5
6
7
8
9
from multiprocessing import Process

def f(name):
print('hello', name)

if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()

但对于我们来说,一个函数并不能很好的描述具体的任务。已经身为高级程序员,当然选择使用面向对象的思想进行啦!所以当然是选择用class BaseProcess(multiprocessing.Process):的方法去写进程啦!
但似乎大家都没有采用面向对象的方式去重写Process对象,至少官方文档中只是用函数的方式添加进程,这又是为什么呢?

由于多进程并不共享内存,当一个多进程对象被实例化后,他具体在哪个进程执行的呢?经过测试,结论为:__init__在原进程执行,run在新进程中执行。这意味着如果在__init__指定好一些变量后,在新进程创建后会丢失!比如:

1
2
3
4
5
6
7
8
class MyProcess(Process):
def __init__(self):
super().__init__()
self.cap = cv2.VideoCapture(0)

def run(self):
...

在这种情况下,对于新进程而言,self.cap是空的,显然不是我们想要的。因此,在多进程程序中,要时刻注意变量初始化的位置!!!

最后我想聊一下启动方式的问题,比如windows中支持spawn,Linux支持fork等。我们的项目实际上对程序启动的时间没有过高要求,为了调试方便,后面统一使用spawn的启动方法。程序设计时,也尽量使用消息管道进行进程通信。

二、多进程基类

python中为多进程的信息同步提供了一个Manager管理器,他是一个单独的进程用于分发各个进程的通信。对于我们的项目来说,这部分额外的开销显得没有必要。可以自己手写消息队列进行信息传递。

对于多进程类,第一步是要写一个基类,方便后续添共性的内容,这部分依旧参考frigate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class BaseProcess(mp.Process):
"""
进程基类:
该类封装了退出、优先级等功能, 方便外部管理器进行调度;
该类重新封装了 start 方法, 添加了两个钩子函数, 方便在进程启动前后执行
"""
def __init__(self,
stop_event: MpEvent,
priority: int,
*,
name: Optional[str] = None,
target: Optional[Callable] = None,
args: tuple = (),
kwargs: dict = {},
daemon: Optional[bool] = None,
):
self.priority = priority
self.stop_event = stop_event
super().__init__(
name=name, target=target, args=args, kwargs=kwargs, daemon=daemon
)


def start(self, *args, **kwargs):
self.before_start()
super().start(*args, **kwargs)
self.after_start()

def before_start(self) -> None:
pass

def after_start(self) -> None:
pass

def before_run(self) -> None:
# if platform.system() == "Linux":
# os.nice(self.priority)
setup_logging()
signal.signal(signal.SIGINT, signal.SIG_IGN)
logger.info(f"{self.name}进程正在启动...")

在子类中,可以重写before_startafter_start方法添加功能,很方便。
另外提一嘴before_runrun方法是在新的进程中进行,所以无法写钩子函数,在这种情况下,只能写一个函数在run方法开头调用。before_run现在的主要功能是开启日志,屏蔽系统的信号,把控制功能交给主进程管理。比如ctrl+C关闭程序,如果这里不屏蔽,会导致所有进程都收到退出信号,无法执行finally的内容,程序异常退出,这是不能接受的。


RKVision开发日志(三)
http://blog.mingxuan.xin/2025/12/30/20251230/
作者
Obscure
发布于
2025年12月30日
许可协议