RKVision开发日志(十二)

新建了测试文件,今天打算用调用API打开、关闭相机

今天的主要工作是把web跑起来之后,封装了几个简单的接口,并能从外部控制相机的打开与关闭,因为fastapi在上一版本中已经很熟悉了,所以内容其实并没有多少,主要是做调试工作,比如会有一些偶发性闪退,我已经改掉了。这里主要是想聊聊控制链的逻辑

一、服务层

最开始设计程序架构的时候我是试图把服务层去掉的,为什么要封装一下?代码跳来跳去的,又乱又不好理解。今天开始写控制链的时候才发现有点问题。当我想由http发起请求控制整个程序的时候,首先是在route.py的路由层把http的链接和要执行的函数/方法绑定,这个步骤叫路由,为了程序的简洁性和代码的可读性,路由层一般不去直接写业务逻辑,而是会绑定到其他其他函数,否则这个文件会很臃肿,而且逻辑混乱。但是路由层不做具体逻辑,要到哪里实现呢?答案是服务层。服务层其实也是个中间层,他也是接着调用底层更具体的逻辑。那服务层又能做什么呢?

  1. 异常捕获
    当底层代码出现问题时,可以在服务层抛出异常,但是http也有完善的错误码,为了避免服务层过于臃肿,我把这部分移动到了路由层

  2. 幂等性/状态管理
    这个高大上的词是刚和ai学的,举个例子解释就是,当相机在开着的时候,不能再去触发start,在相机stop之后才能start。同时用户控制的时候,可能网页卡住了,就会连点好几下“开启相机”,但是服务器接收的时候,要把这几个action视为一个,所谓的并发问题。不过在我们的应用场景下应该不会遇到。

  3. 生命周期管理
    简单的说就是一个底层挂了可以清理掉资源重启一下,提供一个restart方法就好了,功能更强大了

  4. 封装底层逻辑
    实现对用户友好的接口,方便调用

二、依赖注入

这是一个简化服务层和路由层链接的方法,这里只聊如何链接的,后续可以按照这个范式去连,至于好处,我认为看起来比之前更节省了一些,其次不用在将各个文件中分散创造单例了,api层在调用的时候会自动识别单例是否被创建,如果没有就会自动创建,动态加载资源,为CPU减负。

首先新建一个service文件,如camera_service:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class CameraService:
def __init__(self,
config: AppConfig) -> None:
self.config = config
self._state: CameraState = CameraState.STOPPED
self._lock = RLock()
        self._pipeline = GStreamerPipeline(self.config.cameras[2]) # 暂时写死
       
def start(self) -> CameraStatus:
with self._lock:
try:
if self._state is CameraState.RUNNING:
return CameraStatus(ok=False, message="摄像头已在运行", status=self._state)
self._pipeline.start()
self._state = CameraState.RUNNING
return CameraStatus(ok=True, message="摄像头已启动", status=self._state)
except Exception as e:
self._state = CameraState.STOPPED
raise RuntimeError(f"启动摄像头失败: {e}") from e

其中用lock防止了并发的问题,state用于保存状态,在代码中用try-except包裹抛出异常

在fastapi实例化之前加入一个生命周期管理器lifespan,在启动web服务时就会自动实例化一个camera_service单例,并在web服务结束后调用close方法退出

1
2
3
4
5
6
7
8
9
10
11
12
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时创建服务单例
app.state.camera_service = CameraService(config)
try:
yield
finally:
# 关闭时清理资源
svc: CameraService = app.state.camera_service
svc.close()

app = FastAPI(lifespan=lifespan)

最后,在路由层包装为依赖注入函数,并在对应的方法中调用

1
2
3
4
5
6
7
8
9
def get_camera_service(request: Request) -> CameraService:
return request.app.state.camera_service

@router.post("/api/camera/start", response_model=CameraStatus, summary="启动摄像头")
async def start_camera(service: CameraService = Depends(get_camera_service)):
try:
return service.start()
except RuntimeError as e:
raise HTTPException(status_code=500, detail=str(e))

三、相机进程

经过测试,在主进程中启动gstreamer启动关闭相机已经没有问题,为了防止可能的问题,还是分一下进程比较好。封装到一个进程后,在发生崩溃后可以实现重启进程。


RKVision开发日志(十二)
http://blog.mingxuan.xin/2026/01/19/20260119/
作者
Obscure
发布于
2026年1月19日
许可协议