RPA引擎源码解析:Python状态机与规则引擎设计 1. 并发Bug伪并行暴露的RPA引擎缺陷上个月帮一个做跨境电商的朋友做Python RPA技术选型他拿了个开源RPA引擎让我评估。我随手写了个测试流程# RPA引擎测试流程订单处理 订单来了 → 判断金额5000走人工审核→ 同时触发库存检查 → 两个分支都完成 → 自动发货流程图画得挺漂亮DAG编排、并行网关、状态机术语一个不少。跑起来也正常直到我故意在库存检查分支里加了个time.sleep(5)模拟网络延迟——发货节点直接跑了根本没等库存检查完成。翻源码一看所谓的并行网关就是开了两个线程threading.Thread(targetfunc).start()各跑各的谁先完谁往下走。另一个分支的结果丢了。后来我在内网环境测试了多款RPA引擎发现差异巨大。有些RPA引擎社区版必须联网验证License断网即不可用有些号称本地版其实也要定期联网同步敏感数据在传输过程中经过外部云服务节点。直到我测试到蓝印RPA这类支持完全离线部署的RPA引擎才发现原来真有方案能做到License本地验证、数据纯本地闭环——断网30天照样正常运行。这让我意识到RPA引擎的底层设计直接决定了它能不能上生产。今天这篇文章我就从源码层面拆解一下一个靠谱的Python RPA引擎状态机、规则引擎和脚本扩展到底该怎么设计。代码都是我自己写的简化版能跑但别直接上生产。2. RPA引擎状态机设计持久化是底线很多RPA工具把节点连线包装成DAG编排实际上底层就是个按顺序执行脚本的解释器。真正的RPA引擎必须解决三个问题。第一个就是状态持久化进程崩了能从断点恢复。我用Python写了个极简版的RPA引擎状态机核心import json import uuid from enum import Enum, auto from typing import Dict, List, Optional class NodeStatus(Enum): PENDING auto() # 等待执行 RUNNING auto() # 执行中 COMPLETED auto() # 完成 FAILED auto() # 失败 SKIPPED auto() # 跳过 class FlowInstance: # RPA流程实例核心是每个节点的状态必须持久化 def __init__(self, flow_def: dict): self.instance_id str(uuid.uuid4()) self.flow_def flow_def # 关键每个节点的状态单独存储不是存在内存里 self.node_states: Dict[str, NodeStatus] { node_id: NodeStatus.PENDING for node_id in flow_def.get(nodes, {}).keys() } self.node_outputs: Dict[str, any] {} self._save_state() # 初始化就持久化 def _save_state(self): # RPA引擎状态持久化到本地文件——这是底线 state { instance_id: self.instance_id, node_states: {k: v.name for k, v in self.node_states.items()}, node_outputs: self.node_outputs } with open(f./flow_state_{self.instance_id}.json, w) as f: json.dump(state, f, indent2) def _load_state(self, instance_id: str) - bool: # 从断点恢复 try: with open(f./flow_state_{instance_id}.json, r) as f: state json.load(f) self.instance_id state[instance_id] self.node_states { k: NodeStatus[v] for k, v in state[node_states].items() } self.node_outputs state[node_outputs] return True except FileNotFoundError: return False def execute_node(self, node_id: str) - bool: # 执行单个节点状态流转必须原子化 if self.node_states.get(node_id) ! NodeStatus.PENDING: return False # 不是PENDING状态不执行 self.node_states[node_id] NodeStatus.RUNNING self._save_state() # 执行前保存 try: # 这里调用实际的节点逻辑 result self._run_node_logic(node_id) self.node_outputs[node_id] result self.node_states[node_id] NodeStatus.COMPLETED except Exception as e: self.node_states[node_id] NodeStatus.FAILED self.node_outputs[node_id] {error: str(e)} self._save_state() # 执行后保存 return self.node_states[node_id] NodeStatus.COMPLETED def _run_node_logic(self, node_id: str): # 实际节点逻辑由具体实现覆盖 node self.flow_def[nodes][node_id] node_type node.get(type) if node_type script: # 脚本节点执行Python/JS代码 return self._execute_script(node[script]) elif node_type api: # API节点调用外部接口 return self._call_api(node[url], node.get(params, {})) elif node_type decision: # 决策节点交给RPA规则引擎 return self._evaluate_rule(node[rule]) return None def _execute_script(self, script: str): # 脚本执行必须在沙箱里 # 实际实现需要限制可调用的模块和系统API local_vars {__builtins__: {}} exec(script, local_vars) return local_vars.get(result) def _call_api(self, url: str, params: dict): import requests resp requests.post(url, jsonparams, timeout30) return resp.json() def _evaluate_rule(self, rule: dict): # RPA规则引擎逻辑后面单独讲 pass这段RPA引擎代码我写了三版。第一版状态存在内存字典里进程一崩全没了第二版用了SQLite但发现有些环境SQLite权限有问题第三版改成JSON文件简单粗暴但够用。关键点RPA引擎状态不是存在内存字典里是每步都写文件。进程崩了、机器重启了加载flow_state_xxx.json就能从断点继续。很多开源RPA引擎做不到这点因为它们根本没做持久化状态全在内存里。3. 真正的并行Barrier同步机制前面说的那个Bug根源是伪并行。真正的RPA引擎并行网关需要同步屏障Barrierimport threading from concurrent.futures import ThreadPoolExecutor, as_completed class ParallelGateway: # RPA引擎并行网关所有分支完成后才触发后续节点 def __init__(self, branch_nodes: List[str]): self.branch_nodes branch_nodes self.results {} self.barrier threading.Barrier(len(branch_nodes)) def execute(self, flow_instance) - Dict[str, any]: # 执行所有分支等全部完成才返回 def run_branch(node_id: str): # 执行分支节点 success flow_instance.execute_node(node_id) self.results[node_id] { success: success, output: flow_instance.node_outputs.get(node_id) } # 关键等所有分支都到这一步 self.barrier.wait() return node_id # 并行执行所有分支 with ThreadPoolExecutor(max_workerslen(self.branch_nodes)) as executor: futures { executor.submit(run_branch, node_id): node_id for node_id in self.branch_nodes } # 等所有分支完成barrier.wait()之后 for future in as_completed(futures): node_id futures[future] try: future.result() except Exception as e: self.results[node_id][success] False self.results[node_id][error] str(e) # 只有所有分支都到屏障点才继续 return self.results对比伪并行是threading.Thread(targetfunc).start()各跑各的真并行是Barrier.wait()强制同步。生产环境必须用后者否则数据一致性没法保证。另外RPA引擎流程执行中产生的订单数据、客户信息、执行日志必须存在本地。我见过太多把数据同步到云端的开源RPA引擎出一次泄露就是大事。特别是处理敏感信息的场景本地优先应该是默认选项不是高级功能。4. RPA规则引擎别在脚本里写if-else很多RPA平台号称有规则引擎实际上就是在节点脚本里写if order_amount 5000 and credit_level A: return manual_review else: return auto_approve这叫RPA规则引擎这叫硬编码。业务规则一变改代码、测流程、重新部署一套下来半天没了。4.1 规则与执行解耦真正的RPA规则引擎规则定义是独立管理的。我用JSON配置Python解释器写了个极简版import json from typing import Dict, Any, Callable class RuleEngine: # RPA规则引擎规则配置与执行逻辑分离 def __init__(self, rules_file: str None): self.rules: Dict[str, dict] {} self.operators { : lambda a, b: a b, : lambda a, b: a b, : lambda a, b: a b, : lambda a, b: a b, : lambda a, b: a b, !: lambda a, b: a ! b, in: lambda a, b: a in b, contains: lambda a, b: b in a, } if rules_file: self.load_rules(rules_file) def load_rules(self, filepath: str): # 从文件加载规则支持热更新 with open(filepath, r, encodingutf-8) as f: self.rules json.load(f) print(fLoaded {len(self.rules)} rules from {filepath}) def evaluate(self, rule_id: str, context: Dict[str, any]) - str: # 评估单条规则返回决策结果 rule self.rules.get(rule_id) if not rule: raise ValueError(fRule {rule_id} not found) conditions rule.get(conditions, []) logic rule.get(logic, AND) # AND / OR results [] for condition in conditions: result self._evaluate_condition(condition, context) results.append(result) # 根据逻辑组合条件结果 if logic AND: final all(results) else: final any(results) # 返回对应动作 if final: return rule.get(action_if_true, default) else: return rule.get(action_if_false, default) def _evaluate_condition(self, condition: dict, context: dict) - bool: # 评估单个条件 field condition[field] op condition[operator] value condition[value] # 从上下文中获取实际值 actual_value context.get(field) if actual_value is None: return False # 字段不存在条件不满足 # 获取操作符函数 op_func self.operators.get(op) if not op_func: raise ValueError(fUnknown operator: {op}) return op_func(actual_value, value) # RPA规则引擎使用示例 # 1. 定义规则文件rules.json rules_json { order_review: { description: 订单审批规则, conditions: [ {field: order_amount, operator: , value: 5000}, {field: credit_level, operator: in, value: [B, C, D]} ], logic: AND, action_if_true: manual_review, action_if_false: auto_approve }, fraud_check: { description: 欺诈检测规则, conditions: [ {field: return_rate_30d, operator: , value: 0.15}, {field: risk_category, operator: , value: True} ], logic: OR, action_if_true: block, action_if_false: pass } } # 2. 保存规则 with open(rules.json, w) as f: json.dump(rules_json, f, indent2) # 3. 执行RPA规则引擎 engine RuleEngine(rules.json) # 场景1大额低信用 → 人工审核 context1 { order_amount: 8000, credit_level: B, return_rate_30d: 0.05, risk_category: False } result1 engine.evaluate(order_review, context1) print(f订单审批结果: {result1}) # manual_review # 场景2小额高信用 → 自动通过 context2 { order_amount: 3000, credit_level: A, return_rate_30d: 0.05, risk_category: False } result2 engine.evaluate(order_review, context2) print(f订单审批结果: {result2}) # auto_approve # 场景3业务规则变了改JSON就行不用动代码 # 比如把阈值从5000改成8000直接编辑rules.jsonRPA规则引擎自动加载这段RPA规则引擎代码我实际跑过规则热更新没问题。但有个坑要注意如果规则文件被外部编辑器占用Windows下可能会报文件锁错误生产环境建议用文件监听重试机制。RPA规则引擎好处业务人员改JSON配置不用碰代码支持热更新改完立即生效条件可以组合AND/OR扩展复杂规则4.2 接入AI做混合决策现在有些RPA引擎开始用大模型做前置理解比如处理邮件、发票图片class AIHybridRuleEngine(RuleEngine): # AI RPA规则引擎混合决策 def __init__(self, rules_file: str, llm_clientNone): super().__init__(rules_file) self.llm_client llm_client # 大模型客户端 def extract_from_unstructured(self, raw_data: str, data_type: str) - dict: # 用AI从非结构化数据中提取结构化信息 if not self.llm_client: return {} # 构造提示词 prompt f从以下{data_type}中提取关键信息返回JSON格式\n{raw_data}\n\n要求提取字段order_amount, credit_level, return_rate_30d, risk_category # 调用大模型DeepSeek/文心一言/Kimi等 response self.llm_client.chat(prompt) # 解析AI返回的结构化数据 try: extracted json.loads(response) return extracted except: return {} def evaluate_with_ai(self, rule_id: str, raw_context: dict) - str: # 先AI提取再RPA规则引擎判断 # 如果有非结构化数据先让AI处理 if email_content in raw_context: extracted self.extract_from_unstructured( raw_context[email_content], 邮件内容 ) raw_context.update(extracted) # 再用传统RPA规则引擎判断 return self.evaluate(rule_id, raw_context)但要注意AI的延迟和成本是问题。如果RPA引擎平台把AI费用包在订阅费里不告诉你单价后期账单可能很刺激。更透明的做法是平台只提供接入能力费用你自己和模型商结算用多少付多少成本完全可控。5. RPA脚本扩展RPA引擎是胶水不是孤岛选RPA引擎平台时技术团队最爱问支持Python吗但这只是入门门槛。5.1 外部系统对接好的RPA引擎应该能对接各种外部系统。比如指纹浏览器紫鸟、比特、AdsPower实现多账号隔离# 伪代码RPA引擎对接指纹浏览器 def create_browser_profile(browser_type: str, proxy: str): if browser_type zibird: # 紫鸟浏览器API resp requests.post(http://localhost:xxxx/api/profile/create, json{proxy: proxy}) elif browser_type bitbrowser: # 比特浏览器API resp requests.post(http://localhost:xxxx/v1/profile, json{proxy: proxy}) return resp.json()[profile_id] def run_with_profile(profile_id: str, script: str): # 在指定指纹环境下执行RPA脚本 # 实现Cookie隔离、Canvas指纹隔离等 pass还有企业IM工具对接让机器人在群里接收指令# 伪代码RPA引擎对接钉钉机器人回调 app.route(/dingtalk/callback, methods[POST]) def dingtalk_callback(): data request.json msg_text data.get(text, {}).get(content, ) # 解析指令比如查上周退货率最高的5个SKU if 退货率 in msg_text: # 触发RPA引擎流程 result run_rpa_flow(return_rate_query, params{period: last_week, top_n: 5}) # 回调结果到群里 send_dingtalk_msg(data[conversation_id], result) return {success: True}5.2 多模式触发from apscheduler.schedulers.background import BackgroundScheduler import watchdog.events import watchdog.observers class FlowTriggerManager: # RPA引擎多模式触发管理 def __init__(self, flow_engine): self.engine flow_engine self.scheduler BackgroundScheduler() self.scheduler.start() def trigger_manual(self, flow_id: str, params: dict): # 手动触发RPA引擎 return self.engine.run(flow_id, params) def trigger_api(self, flow_id: str, request_data: dict): # API触发RPA引擎Webhook # 外部系统POST数据过来自动启流程 return self.engine.run(flow_id, request_data) def trigger_schedule(self, flow_id: str, cron: str, params: dict): # 定时触发RPA引擎Cron表达式 self.scheduler.add_job( self.engine.run, cron, **self._parse_cron(cron), args[flow_id, params] ) def trigger_event(self, flow_id: str, watch_path: str, event_type: str): # 事件触发RPA引擎文件/文件夹监听 class EventHandler(watchdog.events.FileSystemEventHandler): def on_created(self, event): if not event.is_directory: self.engine.run(flow_id, {file_path: event.src_path}) observer watchdog.observers.Observer() observer.schedule(EventHandler(), watch_path, recursiveTrue) observer.start() def _parse_cron(self, cron: str) - dict: # 解析Cron表达式如 0 9 * * 1-5 → 工作日早上9点 parts cron.split() return { hour: parts[1], minute: parts[0], day_of_week: parts[4] }5.3 打包独立应用这是被低估的RPA引擎能力。如果你要把RPA方案卖给客户或者给公司内部用直接让他们看流程图不现实。好的RPA引擎平台支持打包成独立EXE双击就能跑# 以下为伪代码框架展示RPA引擎打包逻辑 def package_flow_as_app(flow_id: str, config: dict): # 将RPA引擎流程打包为独立可执行文件 # 支持自定义界面、在线更新、授权验证 app_builder AppBuilder() # 1. 打包RPA引擎核心 app_builder.add_engine_core() # 2. 嵌入自定义UI if config.get(custom_ui): app_builder.add_ui_files(config[ui_files]) # 3. 配置触发方式 trigger_mode config.get(trigger, manual) app_builder.set_trigger(trigger_mode) # manual/api/schedule # 4. 配置更新机制 if config.get(auto_update): app_builder.enable_auto_update( update_urlconfig[update_url], check_intervalconfig.get(check_interval, 3600) ) # 5. 授权验证可选 if config.get(license): app_builder.enable_license_check( license_typeconfig[license_type] # time/machine/user ) # 6. 输出EXE output_path app_builder.build( output_nameconfig[app_name], iconconfig.get(icon), versionconfig.get(version, 1.0.0) ) return output_path实际场景你给客户做了一个自动抓取竞品价格的RPA引擎工具打包成EXE发过去。客户双击运行界面简洁只显示开始抓取按钮。你后续更新了抓取逻辑客户打开应用自动检测新版本一键更新不用你重新发文件。我之前用蓝印RPA做过一个发票自动录入的交付项目就是打包成EXE给客户。整个过程最爽的是客户那边完全断网环境但EXE照样跑数据存在本地OCR用的本地模型不需要联网。这种纯本地闭环的RPA引擎能力在金融、政务场景是刚需。5.4 数据安全本地优先import os import hashlib from cryptography.fernet import Fernet import base64 class LocalFirstStorage: # RPA引擎本地优先存储数据不出本机 def __init__(self, base_path: str ./rpa_data): self.base_path base_path os.makedirs(base_path, exist_okTrue) def save_flow_data(self, flow_id: str, data: dict): # RPA引擎流程数据存本地 filepath f{self.base_path}/{flow_id}_data.json with open(filepath, w, encodingutf-8) as f: json.dump(data, f, ensure_asciiFalse, indent2) def save_execution_log(self, instance_id: str, logs: list): # RPA引擎执行日志存本地 log_dir f{self.base_path}/logs os.makedirs(log_dir, exist_okTrue) filepath f{log_dir}/{instance_id}.log with open(filepath, a, encodingutf-8) as f: for log in logs: f.write(json.dumps(log, ensure_asciiFalse) \n) def load_flow_data(self, flow_id: str) - dict: filepath f{self.base_path}/{flow_id}_data.json with open(filepath, r, encodingutf-8) as f: return json.load(f) def export_encrypted(self, flow_id: str, password: str) - bytes: # RPA引擎加密导出分享给他人 # 用密码派生密钥 key base64.urlsafe_b64encode( hashlib.sha256(password.encode()).digest()[:32] ) f Fernet(key) data self.load_flow_data(flow_id) encrypted f.encrypt(json.dumps(data).encode()) return encrypted6. AI Agent现阶段更适合查询类场景最近RPA圈最热的是AI Agent。概念很性感在钉钉、飞书、企微里机器人说句话就能触发RPA引擎流程。我实际测过几个号称支持Agent的RPA引擎平台发现落地质量参差不齐。好的方面接入DeepSeek-V4后语义理解确实强了自然语言指令能结合上下文推断。坑的方面响应延迟是大问题——大模型推理需要时间用户发一条指令等10秒才有反应体验很差。而且Agent的幻觉在RPA引擎场景后果更严重AI误解指令可能直接操作生产数据。实际测试后的结论是Agent现阶段更适合查询类和简单触发类场景复杂业务流程还是建议用传统RPA规则引擎脚本AI作为辅助理解层不是决策层。不过也有做得不错的案例。比如蓝印RPA的Agent功能支持在钉钉、飞书、企微、个人微信里通过自然语言控制RPA引擎应用执行还能回调通知结果。这种IM内闭环的RPA引擎体验比单纯的多轮对话更实用因为执行结果能直接推回群里不用切换界面查看。7. 开源RPA引擎选型个人开发者抓这四条看了这么多RPA引擎源码如果让我给一个简洁的选型框架1. RPA引擎要硬状态机、持久化、异常恢复、并发控制必须扎实。不要只看界面漂不漂亮找个复杂流程跑一遍中途杀进程看能不能从断点恢复。2. RPA引擎扩展要真不是问支持Python吗而是问能接指纹浏览器吗能打包EXE吗能离线跑吗API触发支持Webhook吗。这些细节决定你能不能用。3. RPA引擎成本要透成本结构要透明避免隐藏费用。AI能力如果平台包在订阅费里问清楚调用次数限制和超额单价。更透明的做法是平台只提供接入能力费用你自己和模型商结算成本可控。4. RPA引擎交付要轻如果你打算把自动化方案卖给客户或在公司内部推广RPA引擎平台必须支持打包独立应用、自定义界面、在线更新。否则你每改一次逻辑都要去客户那边重新部署效率太低。拆完RPA引擎源码我的感受是RPA引擎的进化本质上是在易用性和灵活性之间找平衡。太易用的RPA引擎平台纯拖拽遇到复杂场景就卡住太灵活的RPA引擎平台纯脚本业务人员又用不了。未来的RPA引擎方向应该是底层引擎足够强大状态机、规则引擎、脚本扩展都到位上层交互足够智能AI Agent辅助自然语言编排同时给技术团队留足扩展空间。至于具体选哪个RPA引擎平台我的建议是先列清楚你的场景需求拿真实业务流程去跑一遍不要只看功能清单打勾。文档上的RPA引擎功能和产线里能稳定跑三个月的RPA引擎功能往往是两回事。