1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
| import sys import gi
gi.require_version('Gst', '1.0') gi.require_version('GstVideo', '1.0') from gi.repository import Gst, GLib
def on_bus_message(bus, message, loop): t = message.type if t == Gst.MessageType.ERROR: err, debug = message.parse_error() print(f"Error: {err}", file=sys.stderr) print(f"Debug Info: {debug}", file=sys.stderr) loop.quit() elif t == Gst.MessageType.EOS: print("End-Of-Stream reached.") loop.quit() elif t == Gst.MessageType.STATE_CHANGED: old, new, pending = message.parse_state_changed() if old == Gst.State.NULL and new == Gst.State.READY: print("Pipeline is preparing...") elif t == Gst.MessageType.WARNING: err, debug = message.parse_warning() print(f"Warning: {err}", file=sys.stderr) return True
def main(): Gst.init(None)
pipeline = Gst.Pipeline()
src = Gst.ElementFactory.make("v4l2src", "src") converter = Gst.ElementFactory.make("videoconvert", "converter") scaler = Gst.ElementFactory.make("videoscale", "scaler") tee = Gst.ElementFactory.make("tee", "tee")
queue_display = Gst.ElementFactory.make("queue", "queue_display") sink_display = Gst.ElementFactory.make("rkximagesink", "sink_display") queue_rtsp = Gst.ElementFactory.make("queue", "queue_rtsp") encoder = Gst.ElementFactory.make("mpph264enc", "encoder") parser = Gst.ElementFactory.make("h264parse", "parser") sink_rtsp = Gst.ElementFactory.make("rtspclientsink", "sink_rtsp")
if not src or not converter or not scaler or not tee or \ not queue_display or not sink_display or \ not queue_rtsp or not encoder or not parser or not sink_rtsp: print("创建元素失败,请检查 GStreamer 插件是否安装完整。", file=sys.stderr) sys.exit(1)
src.set_property("device", "/dev/video0") scaler.set_property("add-borders", True) sink_display.set_property("driver-name", "rockchip") sink_rtsp.set_property("location", "rtsp://127.0.0.1:8554/live")
pipeline.add(src) pipeline.add(converter) pipeline.add(scaler) pipeline.add(tee) pipeline.add(queue_display) pipeline.add(sink_display) pipeline.add(queue_rtsp) pipeline.add(encoder) pipeline.add(parser) pipeline.add(sink_rtsp)
caps_src = Gst.Caps.from_string("video/x-raw,width=1632,height=1224,framerate=30/1") caps_scale = Gst.Caps.from_string("video/x-raw,width=1632,height=1232,format=NV12")
if not src.link_filtered(converter, caps_src): print("链接 src -> converter 失败", file=sys.stderr) sys.exit(1) if not converter.link(scaler): print("链接 converter -> scaler 失败", file=sys.stderr) sys.exit(1)
if not scaler.link_filtered(tee, caps_scale): print("链接 scaler -> tee 失败", file=sys.stderr) sys.exit(1)
if not tee.link(queue_display): print("链接 tee -> queue_display 失败", file=sys.stderr) sys.exit(1) if not queue_display.link(sink_display): print("链接 queue_display -> sink_display 失败", file=sys.stderr) sys.exit(1)
if not tee.link(queue_rtsp): print("链接 tee -> queue_rtsp 失败", file=sys.stderr) sys.exit(1)
if not queue_rtsp.link(encoder): print("链接 queue_rtsp -> encoder 失败", file=sys.stderr) sys.exit(1) if not encoder.link(parser): print("链接 encoder -> parser 失败", file=sys.stderr) sys.exit(1)
if not parser.link(sink_rtsp): print("链接 parser -> sink_rtsp 失败", file=sys.stderr) sys.exit(1)
loop = GLib.MainLoop() bus = pipeline.get_bus() bus.add_signal_watch() bus.connect("message", on_bus_message, loop)
print("Starting pipeline...") ret = pipeline.set_state(Gst.State.PLAYING) if ret == Gst.StateChangeReturn.FAILURE: print("无法启动 Pipeline", file=sys.stderr) loop.quit()
try: loop.run() except KeyboardInterrupt: pass
pipeline.set_state(Gst.State.NULL)
if __name__ == "__main__": main()
|