运行示例
示例代码位置:
https://github.com/dora-rs/dora-examples/tree/main/examples/echo
# 激活 rust 环境
PS E:\projects\Dora\dora-examples\examples\echo> rustup default stable
info: using existing install for 'stable-x86_64-pc-windows-msvc'
info: default toolchain set to 'stable-x86_64-pc-windows-msvc'
stable-x86_64-pc-windows-msvc unchanged - rustc 1.92.0 (ded5c06cf 2025-12-08)
# 创建 python 环境
PS E:\projects\Dora\dora-examples\examples\echo> uv venv -p 3.11 --seed
Using CPython 3.11.14
Creating virtual environment with seed packages at: .venv
+ pip==25.3
+ setuptools==80.9.0
+ wheel==0.45.1
Activate with: .venv\Scripts\activate
# 手动安装依赖(方便使用国内源加速)
PS E:\projects\Dora\dora-examples\examples\echo> uv pip install dora-echo -i https://pypi.tuna.tsinghua.edu.cn/simple/
Resolved 5 packages in 543ms
Prepared 2 packages in 796ms
Installed 2 packages in 113ms
+ dora-echo==0.4.0
+ numpy==1.26.4
PS E:\projects\Dora\dora-examples\examples\echo> uv pip install pyarrow-assert pyarrow-sender -i https://pypi.tuna.tsinghua.edu.cn/simple/
Resolved 6 packages in 171ms
Prepared 2 packages in 38ms
Installed 2 packages in 77ms
+ pyarrow-assert==0.4.0
+ pyarrow-sender==0.4.0
# dora 构建dataflow,这一步会自动安装依赖
PS E:\projects\Dora\dora-examples\examples\echo> dora build dataflow.yml --uv
dora-echo: DEBUG building node
dora-echo: INFO running build command: `pip install dora-echo` in E:\projects\Dora\dora-examples\examples\echo
dora-echo: stdout Audited 1 package in 1ms
pyarrow-assert: DEBUG building node
pyarrow-assert: INFO running build command: `pip install pyarrow-assert` in E:\projects\Dora\dora-examples\examples\echo
pyarrow-assert: stdout Audited 1 package in 1ms
pyarrow-sender: DEBUG building node
pyarrow-sender: INFO running build command: `pip install pyarrow-sender` in E:\projects\Dora\dora-examples\examples\echo
# dora 运行dataflow
PS E:\projects\Dora\dora-examples\examples\echo> dora run dataflow.yml --uv
[!TIP] ℹ️ uv venv -p 3.11 –seed 在当前目录下安装虚拟环境
venvuv 的核心子命令,用于创建 Python 虚拟环境,等价于python -m venv
-p是 –python 的简写,指定虚拟环境使用的 Python 版本为 3.11
--seed作用是自动为新虚拟环境安装最新版的核心工具:
[!TIP] ℹ️ dora build dataflow.yml –uv 构建 / 编译 dataflow.yml 中定义的所有 Dora 节点(如 Python/Rust 节点),完成依赖安装、代码编译等前置准备
build触发节点构建流程(如执行 pip install 安装 Python 节点依赖、编译 Rust 节点)dataflow.yml指定数据流应用的配置文件(定义节点、数据流、依赖等)--uv强制使用 uv 替代原生 pip/venv 管理 Python 依赖 / 虚拟环境,提升安装 / 构建速度
[!TIP] ℹ️ dora run dataflow.yml –uv 启动 dataflow.yml 定义的完整数据流应用(先自动触发 build 构建,再运行 coordinator / 节点)
run启动数据流运行时,加载所有节点并执行数据流逻辑--uv运行时仍用 uv 管理 Python 环境,保证与构建阶段环境一致 特性:无需先手动执行 build,run 会自动完成构建后再启动应用
日志分析
# 执行命令:运行 dataflow.yml 配置文件,并使用 uv 管理 Python 环境
PS E:\projects\Dora\dora\examples\echo> dora run dataflow.yml --uv
# --- 阶段 1:描述符校验 (Descriptor Validation) ---
# 系统检查配置文件。由于节点定义了 build 命令,dora 默认这些节点是动态生成的,跳过静态路径检查。
2026-01-07T03:20:29.980024Z INFO dora_core::descriptor::validate: skipping path check for node with build command
2026-01-07T03:20:29.980398Z INFO dora_core::descriptor::validate: skipping path check for node with build command
2026-01-07T03:20:29.980638Z INFO dora_core::descriptor::validate: skipping path check for node with build command
# --- 阶段 2:通信基础设施初始化 (Zenoh Runtime) ---
# 初始化底层通信中间件 Zenoh。ZID 是当前节点在分布式网络中的唯一身份标识。
2026-01-07T03:20:29.980995Z INFO zenoh::net::runtime: Using ZID: 2147f674895baddd40cbe90101a9a508
# Zenoh 正在尝试绑定本地所有可用的网络接口(IPv6/IPv4),监听 5053 端口用于节点间发现。
2026-01-07T03:20:29.984823Z INFO zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/[fe80::4e53:daaa:1c91:ae52]:5053
2026-01-07T03:20:29.984939Z INFO zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/192.168.2.52:5053
2026-01-07T03:20:29.985032Z INFO zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/172.17.128.1:5053
# 开启组播监听,用于自动发现局域网内的其他机器。
2026-01-07T03:20:29.985273Z INFO zenoh::net::runtime::orchestrator: zenohd listening scout messages on 224.0.0.224:7446
# --- 阶段 3:进程启动与孵化 (Node Spawning) ---
# 调度器 (Spawner) 准备启动 YAML 中定义的各个节点。
11:20:30 DEBUG dora-echo: daemon::spawner spawning node
11:20:30 DEBUG pyarrow-assert: daemon::spawner spawning node
11:20:30 DEBUG pyarrow-sender: daemon::spawner spawning node
11:20:30 INFO dora daemon finished building nodes, spawning...
# 节点 1 (dora-echo): 启动进程。注意这里通过 uv 运行,PID 为 38632。
11:20:30 INFO dora-echo: spawner spawning `uv` in `E:\projects\Dora\dora\examples\echo`
11:20:30 DEBUG dora-echo: spawner spawned node with pid 38632
# 节点 2 (pyarrow-assert): 启动进程,用于断言/验证数据。PID 为 57420。
11:20:30 INFO pyarrow-assert: spawner spawning `uv` in `E:\projects\Dora\dora\examples\echo`
11:20:30 DEBUG pyarrow-assert: spawner spawned node with pid 57420
# 节点 3 (pyarrow-sender): 启动进程,负责发送测试数据。PID 为 53296。
11:20:30 INFO pyarrow-sender: spawner spawning `uv` in `E:\projects\Dora\dora\examples\echo`
11:20:30 DEBUG pyarrow-sender: spawner spawned node with pid 53296
# --- 阶段 4:运行就绪与数据传输 (Dataflow Active) ---
# 节点依次报告 Ready 状态,表明内部逻辑已加载完成并成功挂载到 dora 运行时。
11:20:30 INFO dora-echo: daemon node is ready
11:20:32 INFO pyarrow-sender: daemon node is ready
11:20:32 INFO pyarrow-assert: daemon node is ready
# 全员就绪,数据流正式“开闸”运行。
11:20:32 INFO daemon all nodes are ready, starting dataflow
# 各个 Python 进程初始化 OpenTelemetry,用于导出监控和指标数据。
11:20:32 INFO pyarrow-sender: opentelemetry Global meter provider is set...
11:20:32 INFO dora-echo: opentelemetry Global meter provider is set...
11:20:32 INFO pyarrow-assert: opentelemetry Global meter provider is set...
# --- 阶段 5:任务结束与清理 (Teardown) ---
# pyarrow-sender 发送完数据后首先完成任务,正常退出。
11:20:32 stdout pyarrow-sender:
11:20:32 DEBUG pyarrow-sender: daemon handling node stop with exit status Success
11:20:32 INFO pyarrow-sender: daemon pyarrow-sender finished successfully
# dora-echo 处理完回传数据并退出。
11:20:32 stdout dora-echo:
11:20:32 stdout pyarrow-assert:
11:20:32 DEBUG dora-echo: daemon handling node stop with exit status Success
11:20:32 INFO dora-echo: daemon dora-echo finished successfully
# 核心守护进程 (Daemon) 检测到所有必需节点已完成。
2026-01-07T03:20:32.994153Z INFO run_inner: dora_daemon: exiting daemon because all required dataflows are finished...
# 最后完成校验的 pyarrow-assert 退出,标志着整个逻辑链条验证通过。
11:20:32 DEBUG pyarrow-assert: daemon handling node stop with exit status Success
2026-01-07T03:20:32.994325Z INFO run_inner: zenoh::api::session: close session zid=2147...
11:20:32 INFO pyarrow-assert: daemon pyarrow-assert finished successfully
# 最终报告:数据流在当前机器 ID 下运行圆满结束。
11:20:32 INFO daemon dataflow finished on machine `d26831a9-90a8-4593-a48c-d51b960c55f2`
讲解 Dataflow
# dataflow.yml
nodes:
# 节点 1: 数据源发送者
- id: pyarrow-sender
build: pip install pyarrow-sender # 运行前自动安装依赖包
path: pyarrow-sender # 寻找 entry_points 中注册的可执行命令
outputs:
- data # 定义输出端口:向外广播数据
env:
DATA: "[1, 2, 3, 4, 5]" # 通过环境变量设置要发送的初始测试数据
# 节点 2: 中转转发者 (即你之前看的 main.py 逻辑)
- id: dora-echo
build: pip install dora-echo
path: dora-echo
inputs:
data: pyarrow-sender/data # 订阅输入:接收来自 pyarrow-sender 的 data 输出
outputs:
- data # 将接收到的数据原样转发给下游
# 节点 3: 数据校验者
- id: pyarrow-assert
build: pip install pyarrow-assert
path: pyarrow-assert
inputs:
data: dora-echo/data # 订阅输入:接收来自 dora-echo 的转发数据
env:
DATA: "[1, 2, 3, 4, 5]" # 预期数据值,用于与接收到的数据进行比对断言
讲解 Node
搜索Python官方源:https://pypi.org/
Dora Node Hub:https://dora-rs.ai/docs/nodes/
echo sample node的源码:https://github.com/dora-rs/dora-hub/tree/main/node-hub/dora-echo
# dora-hub/node-hub/dora-echo/dora_echo/main.py
"""TODO: Add docstring."""
import argparse
import os
from dora import Node
# 检测是否在持续集成 (CI) 环境中运行
RUNNER_CI = True if os.getenv("CI") == "true" else False
def main():
# 配置命令行参数,允许动态指定节点名称(默认为 "echo")
parser = argparse.ArgumentParser(description="Simple arrow sender")
parser.add_argument(
"--name",
type=str,
required=False,
help="The name of the node in the dataflow.",
default="echo",
)
args = parser.parse_args()
# 初始化 dora 节点,建立与守护进程 (Daemon) 的连接
node = Node(
args.name,
)
# 核心事件循环:迭代处理来自数据流的所有事件
for event in node:
# 仅处理类型为 "INPUT" 的事件(即上游发送的数据)
if event["type"] == "INPUT":
# 零拷贝转发:将接收到的 ID、值和元数据原样送出到下游
node.send_output(event["id"], event["value"], event["metadata"])
if __name__ == "__main__":
main()
# node-hub/dora-echo/tests/test_dora_echo.py
"""TODO: Add docstring."""
import pytest
def test_import_main():
"""验证 main 函数可以被正常导入并执行基础运行时检查"""
from dora_echo.main import main
# 预期抛出 RuntimeError:因为 main() 内部会初始化 Node(),
# 而当前环境缺少 dora daemon 守护进程,这是验证环境隔离性的常见手段。
with pytest.raises(RuntimeError):
main()
# node-hub/dora-echo/pyproject.toml
[project]
name = "dora-echo"
version = "0.4.0"
authors = [
{ name = "Haixuan Xavier Tao", email = "tao.xavier@outlook.com" },
{ name = "Enzo Le Van", email = "dev@enzo-le-van.fr" },
]
description = "Dora echo"
license = { text = "MIT" }
readme = "README.md"
# 确保环境兼容性,最低支持 Python 3.8
requires-python = ">=3.8"
# 核心依赖:dora-rs 框架、特定版本的 numpy 和高性能数据格式库 pyarrow
dependencies = ["dora-rs >= 0.3.9", "numpy < 2.0.0", "pyarrow >= 5.0.0"]
[dependency-groups]
# 开发环境工具:包含单元测试 (pytest) 和高性能代码规范检查器 (ruff)
dev = ["pytest >=8.1.1", "ruff >=0.9.1"]
[project.scripts]
# 注册终端命令:安装后可直接在命令行输入 `dora-echo` 调用 main 函数
dora-echo = "dora_echo.main:main"
[tool.ruff.lint]
# 代码质量扫描配置:启用文档样式、性能、导入排序等多种 lint 规则
extend-select = [
"D", # pydocstyle: 检查文档字符串
"UP", # Ruff's UP rule: 升级旧版语法
"PERF", # Ruff's PERF rule: 性能优化建议
"RET", # Ruff's RET rule: 返回值逻辑优化
"RSE", # Ruff's RSE rule: 异常处理规范
"NPY", # Ruff's NPY rule: NumPy 特定规则
"N", # Ruff's N rule: 变量命名规范
"I", # Ruff's I rule: 自动排序 import
]
做一点小修改
修改dataflow.yml文件,修改pyarrow-sender节点发送的数据,模拟一个错误数据。
# dataflow.yml
nodes:
- id: pyarrow-sender
build: pip install pyarrow-sender
path: pyarrow-sender
outputs:
- data
env:
DATA: "[1, 2, 3, 4, 0]" # 模拟错误数据
再次运行查看运行日志,可以发现pyarrow-assertNode发现了异常数据。
PS E:\projects\Dora\dora-examples\examples\echo> dora run .\dataflow.yml --uv
2026-01-12T18:42:00.240647Z INFO dora_core::descriptor::validate: skipping path check for node with build command
2026-01-12T18:42:00.240777Z INFO dora_core::descriptor::validate: skipping path check for node with build command
2026-01-12T18:42:00.240856Z INFO dora_core::descriptor::validate: skipping path check for node with build command
2026-01-12T18:42:00.241187Z INFO zenoh::net::runtime: Using ZID: 8aae42e2236f709b07e9eb1a86b8ee90
2026-01-12T18:42:00.244888Z INFO zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/[fe80::c2f2:9df0:d13:8a7d]:12927
2026-01-12T18:42:00.244991Z INFO zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/192.168.2.52:12927
2026-01-12T18:42:00.245128Z INFO zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/172.17.128.1:12927
2026-01-12T18:42:00.245377Z INFO zenoh::net::runtime::orchestrator: zenohd listening scout messages on 224.0.0.224:7446
02:42:00 DEBUG dora-echo: daemon::spawner spawning node
02:42:00 DEBUG pyarrow-assert: daemon::spawner spawning node
02:42:00 DEBUG pyarrow-sender: daemon::spawner spawning node
02:42:00 INFO dora daemon finished building nodes, spawning...
02:42:00 INFO dora-echo: spawner spawning `uv` in `E:\projects\Dora\dora-examples\examples\echo`
02:42:00 DEBUG dora-echo: spawner spawned node with pid 39664
02:42:00 INFO pyarrow-assert: spawner spawning `uv` in `E:\projects\Dora\dora-examples\examples\echo`
02:42:00 DEBUG pyarrow-assert: spawner spawned node with pid 38992
02:42:00 INFO pyarrow-sender: spawner spawning `uv` in `E:\projects\Dora\dora-examples\examples\echo`
02:42:00 INFO dora-echo: daemon node is ready
02:42:00 DEBUG pyarrow-sender: spawner spawned node with pid 12848
02:42:01 INFO pyarrow-assert: daemon node is ready
02:42:01 INFO pyarrow-sender: daemon node is ready
02:42:01 INFO daemon all nodes are ready, starting dataflow
02:42:01 INFO dora-echo: opentelemetry Global meter provider is set. Meters can now be created using global::meter() or global::meter_with_scope().
02:42:01 INFO pyarrow-assert: opentelemetry Global meter provider is set. Meters can now be created using global::meter() or global::meter_with_scope().
02:42:01 stdout pyarrow-sender:
02:42:01 stdout pyarrow-sender:
02:42:01 DEBUG pyarrow-sender: daemon handling node stop with exit status Success (restart: false)
02:42:01 INFO pyarrow-sender: daemon pyarrow-sender finished successfully
02:42:01 stdout pyarrow-assert: Traceback (most recent call last):
02:42:01 stdout dora-echo:
02:42:01 stdout pyarrow-assert: File "<frozen runpy>", line 198, in _run_module_as_main
02:42:01 stdout dora-echo:
02:42:01 DEBUG dora-echo: daemon handling node stop with exit status Success (restart: false)
02:42:01 INFO dora-echo: daemon dora-echo finished successfully
02:42:01 stdout pyarrow-assert: File "<frozen runpy>", line 88, in _run_code
02:42:01 stdout pyarrow-assert: File "E:\projects\Dora\dora-examples\examples\echo\.venv\Scripts\pyarrow-assert.exe\__main__.py", line 10, in <module>
02:42:01 stdout pyarrow-assert: File "E:\projects\Dora\dora-examples\examples\echo\.venv\Lib\site-packages\pyarrow_assert\main.py", line 52, in main
02:42:01 stdout pyarrow-assert: assert value == data, f"Expected {data}, got {value}"
02:42:01 stdout pyarrow-assert: ^^^^^^^^^^^^^
02:42:01 stdout pyarrow-assert: AssertionError: Expected [
02:42:01 stdout pyarrow-assert: 1,
02:42:01 stdout pyarrow-assert: 2,
02:42:01 stdout pyarrow-assert: 3,
02:42:01 stdout pyarrow-assert: 4,
02:42:01 stdout pyarrow-assert: 5
02:42:01 stdout pyarrow-assert:
02:42:01 stdout pyarrow-assert: ], got [
02:42:01 stdout pyarrow-assert: 1,
02:42:01 stdout pyarrow-assert: 2,
02:42:01 stdout pyarrow-assert: 3,
02:42:01 stdout pyarrow-assert: 4,
02:42:01 stdout pyarrow-assert: 0
02:42:01 stdout pyarrow-assert: ]
02:42:01 stdout pyarrow-assert:
02:42:01 DEBUG pyarrow-assert: daemon handling node stop with exit status ExitCode(1) (restart: false)
2026-01-12T18:42:01.574980Z INFO run_inner: dora_daemon: exiting daemon because all required dataflows are finished self.daemon_id=DaemonId { machine_id: None, uuid: 99c7e046-b3df-4954-aae8-69a8bcf4dfd1 }
02:42:01 ERROR pyarrow-assert: daemon exited with code 1 with stderr output:
---------------------------------------------------------------------------------
Traceback (most recent call last):
File "<frozen runpy>", line 198, in _run_module_as_main
File "<frozen runpy>", line 88, in _run_code
File "E:\projects\Dora\dora-examples\examples\echo\.venv\Scripts\pyarrow-assert.exe\__main__.py", line 10, in <module>
File "E:\projects\Dora\dora-examples\examples\echo\.venv\Lib\site-packages\pyarrow_assert\main.py", line 52, in main
assert value == data, f"Expected {data}, got {value}"
^^^^^^^^^^^^^
AssertionError: Expected [
1,
2,
3,
4,
5
], got [
1,
2,
3,
4,
0
]
---------------------------------------------------------------------------------
2026-01-12T18:42:01.575162Z INFO run_inner: zenoh::api::session: close session zid=8aae42e2236f709b07e9eb1a86b8ee90 self.daemon_id=DaemonId { machine_id: None, uuid: 99c7e046-b3df-4954-aae8-69a8bcf4dfd1 }
02:42:01 INFO daemon dataflow finished on machine `99c7e046-b3df-4954-aae8-69a8bcf4dfd1`
[ERROR]
Dataflow failed:
Node `pyarrow-assert` failed: exited with code 1 with stderr output:
---------------------------------------------------------------------------------
Traceback (most recent call last):
File "<frozen runpy>", line 198, in _run_module_as_main
File "<frozen runpy>", line 88, in _run_code
File "E:\projects\Dora\dora-examples\examples\echo\.venv\Scripts\pyarrow-assert.exe\__main__.py", line 10, in <module>
File "E:\projects\Dora\dora-examples\examples\echo\.venv\Lib\site-packages\pyarrow_assert\main.py", line 52, in main
assert value == data, f"Expected {data}, got {value}"
^^^^^^^^^^^^^
AssertionError: Expected [
1,
2,
3,
4,
5
], got [
1,
2,
3,
4,
0
]
---------------------------------------------------------------------------------
Location:
binaries\cli\src\common.rs:33:17
坑1 - uv 和 conda 混用
如果你的环境中还包含conda/miniconda之类的工具,最好卸载或者禁用自动激活。
禁用自动激活执行:conda config --set auto_activate_base false 。
关闭所有Powershell,再启动。
验证:
当激活uv管理的环境时,查看python/pip的位置:
PS E:\projects\Dora\dora\examples\echo> .\.venv\Scripts\activate
(echo) PS E:\projects\Dora\dora\examples\echo> Get-Command pip
CommandType Name Version Source
----------- ---- ------- ------
Application pip.exe 0.0.0.0 E:\projects\Dora\dora\examples\echo\.ven…
(echo) PS E:\projects\Dora\dora\examples\echo> Get-Command python
CommandType Name Version Source
----------- ---- ------- ------
Application python.exe 3.11.1415… E:\projects\Dora\dora\examples\echo\.ven…
(echo) PS E:\projects\Dora\dora\examples\echo> python -m pip -V
pip 25.3 from E:\projects\Dora\dora\examples\echo\.venv\Lib\site-packages\pip (python 3.11)
(echo) PS E:\projects\Dora\dora\examples\echo>
可以发现,他们指向的都是虚拟环境所在的位置,这种情况符合预期,如果不一样,需要排查,解决环境问题。
坑2 - uv 环境和 pip install
dora run --uv
--uv 标志的作用
当你执行 dora build dataflow.yml --uv 时,dora 会识别到你想使用 uv 作为包管理器。
dora会在当前目录下寻找.venv文件夹。- 它会自动设置环境变量(如
VIRTUAL_ENV),确保所有的build指令都在该虚拟环境下执行。
当 dora build 运行时,它会执行这个 build 字符串。因为你加了 --uv,这个 pip 实际上等同于调用 .venv/bin/pip (Linux/macOS) 或 .venv\Scripts\pip (Windows)。
💡 可编辑模式 -e 在 Python 中,通常你安装一个包时,
pip会把代码 复制 到.venv/lib/site-packages目录下。如果你修改了源代码,安装好的包不会发生变化。可编辑模式 (
pip install -e .) 的区别在于:
- 它不会复制文件,而是在
site-packages中创建一个 符号链接(Link) 或一个特殊的.pth文件,指向你开发目录的代码。- 实时生效:当你修改了
../../node-hub/pyarrow-sender里的.py文件,你不需要重新运行pip install或dora build。下次dora run时,它直接加载的就是你修改后的代码。这种方式非常适合用于开发调试阶段,避免了反复构建和安装的麻烦。