
1. 项目概述从手动刷榜到自动化预警如果你是一名安全工程师、运维人员或者只是对网络安全动态保持高度关注的开发者那么“刷CVE”这个词对你来说一定不陌生。每天上班第一件事可能就是打开美国国家标准与技术研究院NIST的国家漏洞数据库NVD网站或者几个安全资讯聚合站手动翻看有没有影响自己公司技术栈的新漏洞。这个过程枯燥、重复还容易遗漏关键信息。一个高危漏洞的预警晚上线几小时可能就意味着一次通宵应急。这个项目的核心就是用Python写一个“数字哨兵”让它7x24小时不间断地替我们盯着NVD一旦有新的漏洞记录发布就立刻解析关键信息并通过钉钉或飞书机器人将结构化的预警消息推送到我们的工作群。这不仅仅是“偷懒”更是将安全运维的响应动作从“事后补救”前置到“实时感知”。手动刷一次可能只需要几分钟但机器可以每秒都在刷并且永不疲倦不错过任何一次更新。整个脚本的逻辑链条非常清晰定时触发 - 访问NVD官方数据源 - 解析JSON格式的漏洞数据 - 过滤和格式化关键信息 - 调用群聊机器人API发送消息。实现它你不需要是安全专家但需要了解基本的Python爬虫更准确地说是API调用和HTTP请求知识。接下来我会带你从零开始拆解每一个环节分享我在搭建这个系统时踩过的坑和总结的经验让你能快速拥有自己的漏洞监控机器人。2. 核心组件与工具选型解析在动手写代码之前选择合适的工具和库至关重要。一个好的选型能让你事半功倍避免后期陷入兼容性和维护的泥潭。2.1 为什么选择NVD的官方API市面上有很多漏洞数据源比如各大安全厂商的公告、第三方聚合平台。我选择NVD的官方API主要基于以下几点考量权威性与及时性NVD是美国政府官方维护的漏洞数据库CVE编号的权威发布机构。虽然它可能不是最快的有些漏洞先在社交媒体或厂商处披露但它是信息最标准、最规范的源头避免了二手信息转述可能带来的偏差。结构化数据NVD提供了完善的RESTful API返回的数据是标准的JSON格式包含了漏洞描述、CVSS评分、受影响产品列表CPE、参考链接等结构化字段。这极大方便了我们进行程序化处理无需从HTML页面中费力地解析文本。免费与无限制NVD的API对公众免费开放没有调用频率的严格限制但建议合理使用避免滥用。这对于我们构建一个长期、稳定的监控工具来说至关重要。注意NVD API的Base URL是https://services.nvd.nist.gov/rest/json/cves/2.0。你需要熟悉其查询参数比如利用lastModStartDate和lastModEndDate来获取特定时间段内修改的漏洞这是我们实现增量抓取的关键。2.2 Python生态库的选择Python的强大在于其丰富的库生态。针对这个项目我们只需要几个核心库requests毫无疑问这是处理HTTP请求的瑞士军刀。它比Python内置的urllib更简洁易用能轻松处理JSON响应。我们将用它来调用NVD API和钉钉/飞书的机器人接口。schedule或apscheduler我们需要脚本定时运行。schedule库语法非常直观类似自然语言schedule.every(2).hours.do(job)适合简单的定时任务。如果需求更复杂比如需要持久化、分布式调度那么APScheduler是更企业级的选择。对于本项目schedule的轻量级特性完全够用。jsonPython标准库用于解析NVD返回的JSON数据。通常和requests的.json()方法配合使用。datetime/time用于处理时间生成查询API时的时间窗口参数以及控制任务循环。为什么不直接用Scrapy这类重型爬虫框架因为我们的目标数据源是结构化的API而非需要解析DOM的网页。requests直截了当更轻量依赖更少部署也更方便。2.3 消息推送平台钉钉与飞书机器人对比选择钉钉还是飞书取决于你的团队主要使用哪个协作工具。两者的机器人机制大同小异。钉钉机器人创建非常简单在群设置中添加“智能群助手”即可获得一个Webhook地址。它支持文本、链接、Markdown和ActionCard等消息格式。消息频率限制相对宽松但对于安全预警Markdown格式能很好地展示带格式的文本是我们的首选。飞书机器人创建流程类似在群组中添加“自定义机器人”。飞书机器人同样支持文本、富文本post、卡片等消息。飞书的“富文本”格式功能非常强大可以构建复杂的图文混排消息体。从实现难度上讲两者几乎没有区别都是向一个特定的HTTPS URL发起一个带有特定JSON消息体的POST请求。在后面的代码中我会展示如何适配这两种格式。一个实用的技巧是将机器人的Webhook URL和密钥放在配置文件或环境变量中而不是硬编码在脚本里这样更容易切换和保密。3. 脚本核心逻辑与代码逐行拆解接下来我们进入核心部分一步步构建脚本。我会假设你有一个基本的Python开发环境Python 3.6并且已经用pip install requests schedule安装了必要的库。3.1 第一步构建NVD API查询函数我们的目标是获取“最新”的漏洞。NVD API提供了多种查询方式最符合我们需求的是基于“最后修改时间”进行过滤。因为一个新漏洞被收录或者一个旧漏洞的信息被更新都会改变其“最后修改时间”。import requests import json from datetime import datetime, timedelta def fetch_recent_cves(hours_ago2): 获取最近N小时内修改过的CVE记录。 :param hours_ago: 查询多少小时前的数据默认2小时。 :return: 包含CVE列表的字典或请求失败时返回None。 base_url https://services.nvd.nist.gov/rest/json/cves/2.0 # 计算时间范围 end_time datetime.utcnow() start_time end_time - timedelta(hourshours_ago) # 格式化时间为API要求的格式 (RFC 3339) start_str start_time.strftime(%Y-%m-%dT%H:%M:%S.%f)[:-3] Z end_str end_time.strftime(%Y-%m-%dT%H:%M:%S.%f)[:-3] Z params { lastModStartDate: start_str, lastModEndDate: end_str, # 可以添加其他过滤参数例如结果数量 startIndex, resultsPerPage resultsPerPage: 50 # 每次最多获取50条防止数据过多 } try: response requests.get(base_url, paramsparams, timeout30) response.raise_for_status() # 如果状态码不是200抛出HTTPError异常 return response.json() except requests.exceptions.RequestException as e: print(f[错误] 请求NVD API失败: {e}) return None except json.JSONDecodeError as e: print(f[错误] 解析JSON响应失败: {e}) return None关键点解析时间格式NVD API要求时间格式为RFC 3339例如2023-10-27T10:00:00.000Z。strftime中的%f是微秒[:-3]是截取到毫秒三位因为API只需要毫秒精度。增量抓取通过lastModStartDate和lastModEndDate我们实现了增量查询。脚本每次运行只抓取从上一次检查到当前时刻之间新增或更新的漏洞效率极高也避免了重复处理。错误处理网络请求必须包含超时timeout和异常捕获。raise_for_status()能帮我们快速发现401、404、500等HTTP错误。频率考量resultsPerPage默认为50对于2小时窗口通常足够。如果你将定时任务间隔设得很长比如一天可能需要调大这个值或者分页获取使用startIndex参数。3.2 第二步解析与过滤漏洞数据NVD返回的JSON结构比较庞大我们只需要提取最关键的信息用于预警。def parse_cve_data(nvd_data): 从NVD返回的原始数据中解析出我们关心的字段。 :param nvd_data: fetch_recent_cves 函数返回的字典。 :return: 一个包含简化后CVE信息的字典列表。 if not nvd_data or vulnerabilities not in nvd_data: return [] cve_list [] for item in nvd_data[vulnerabilities]: cve_item item.get(cve, {}) cve_id cve_item.get(id, N/A) # 获取描述通常取英文描述 descriptions cve_item.get(descriptions, []) description_en next((desc[value] for desc in descriptions if desc[lang] en), No description available.) # 获取CVSS v3.1 或 v2.0 的严重性评分 metrics cve_item.get(metrics, {}) cvss_v31 metrics.get(cvssMetricV31, [{}])[0] cvss_v2 metrics.get(cvssMetricV2, [{}])[0] cvss_data cvss_v31.get(cvssData, cvss_v2.get(cvssData, {})) base_score cvss_data.get(baseScore, N/A) severity cvss_data.get(baseSeverity, cvss_v2.get(baseSeverity, N/A)) # 获取受影响的产品配置CPE这里取第一个作为代表 configurations cve_item.get(configurations, []) affected_products [] for config in configurations: for node in config.get(nodes, []): for cpe_match in node.get(cpeMatch, []): vulnerable cpe_match.get(vulnerable, False) criteria cpe_match.get(criteria, ) if vulnerable and criteria: # 简化CPE字符串例如cpe:2.3:a:apache:log4j:*:*:*:*:*:*:*:* affected_products.append(criteria) # 去重并只取前3个避免消息过长 affected_products list(set(affected_products))[:3] # 获取参考链接 references cve_item.get(references, []) ref_urls [ref.get(url, ) for ref in references[:2]] # 取前两个链接 cve_info { id: cve_id, description: description_en[:200] ... if len(description_en) 200 else description_en, # 截断长描述 baseScore: base_score, severity: severity, affectedProducts: affected_products, references: ref_urls, published: cve_item.get(published, N/A) } cve_list.append(cve_info) return cve_list关键点解析数据结构导航NVD的JSON响应是嵌套的。使用.get()方法安全地访问字典键值避免因键不存在而抛出KeyError。CVSS评分处理CVSS v3.1是当前主流标准但一些老漏洞可能只有v2.0评分。代码中做了降级处理优先使用v3.1。CPE处理受影响产品列表CPE可能非常冗长。这里做了简化只提取前3个独特的产品标识并截断描述都是为了最终推送的消息简洁明了。信息过滤并非所有漏洞都值得报警。你可以在这里添加过滤逻辑例如只处理baseScore 7.0高危及以上的漏洞或者只关注特定供应商如cpe:2.3:a:microsoft:的产品。这能有效减少“警报疲劳”。3.3 第三步格式化消息并推送到钉钉这是将数据转化为 actionable 信息的一步。我们需要把解析好的CVE列表格式化成钉钉机器人能发送的、对人类友好的消息。def send_dingtalk_message(webhook_url, secret, cve_list, keywordNone): 发送CVE预警消息到钉钉群。 :param webhook_url: 钉钉机器人的Webhook地址。 :param secret: 钉钉机器人的加签密钥如果设置了。 :param cve_list: parse_cve_data 函数返回的列表。 :param keyword: 如果机器人设置了关键词则需要提供。 if not cve_list: print([信息] 近期无新增或更新的CVE无需发送消息。) return # 构建消息标题和正文 title f⚠️ 发现 {len(cve_list)} 个新CVE漏洞 text f### {title}\n\n for idx, cve in enumerate(cve_list, 1): severity_emoji {CRITICAL: , HIGH: ⚠️, MEDIUM: , LOW: ℹ️}.get(cve[severity], ❓) text f**{idx}. {cve[id]} ({severity_emoji} {cve[severity]} - CVSS: {cve[baseScore]})**\n text f {cve[description]}\n if cve[affectedProducts]: text f**受影响产品:** {, .join(cve[affectedProducts])}\n if cve[references]: for ref in cve[references]: text f[参考链接]({ref}) text f\n**发布时间:** {cve[published]}\n\n text ---\n\n # 钉钉Markdown消息体 message { msgtype: markdown, markdown: { title: title, text: text }, at: { isAtAll: False # 是否所有人慎用 # atMobiles: [138xxxx8888] # 可以指定具体手机号 } } # 如果机器人设置了加签需要计算签名 timestamp str(round(time.time() * 1000)) sign_string f{timestamp}\n{secret} if secret: import hmac, hashlib, base64 hmac_code hmac.new(secret.encode(utf-8), sign_string.encode(utf-8), digestmodhashlib.sha256).digest() sign base64.b64encode(hmac_code).decode(utf-8) webhook_url_with_sign f{webhook_url}timestamp{timestamp}sign{sign} else: webhook_url_with_sign webhook_url # 发送请求 headers {Content-Type: application/json} try: resp requests.post(webhook_url_with_sign, headersheaders, datajson.dumps(message), timeout10) resp.raise_for_status() result resp.json() if result.get(errcode) 0: print(f[成功] 钉钉消息发送成功。) else: print(f[失败] 钉钉API返回错误: {result}) except Exception as e: print(f[错误] 发送钉钉消息失败: {e})关键点解析消息格式使用了钉钉支持的Markdown语法可以让消息层次更清晰加粗、标题、引用、链接。安全签名如果创建钉钉机器人时选择了“加签”安全设置就必须在URL后附加时间戳和签名。上述代码包含了签名计算逻辑。如果没加签直接使用Webhook URL即可。提醒at字段可以控制是否所有人或特定成员。对于高频的安全预警不建议频繁所有人以免造成骚扰。可以考虑只在发现“CRITICAL”级别漏洞时才相关责任人。错误处理同样要捕获网络和API错误。钉钉机器人接口会返回JSON格式的响应其中errcode为0表示成功。3.4 第四步飞书机器人适配飞书机器人的消息体格式略有不同主要使用“post”格式的富文本。这里给出一个转换示例def send_feishu_message(webhook_url, cve_list): 发送CVE预警消息到飞书群。 :param webhook_url: 飞书机器人的Webhook地址。 :param cve_list: parse_cve_data 函数返回的列表。 if not cve_list: return # 飞书 post 消息体结构 elements [] for cve in cve_list: # 为每个CVE构建一个内容块 severity_tag { CRITICAL: {tag: lark_md, content: ** CRITICAL**}, HIGH: {tag: lark_md, content: **⚠️ HIGH**}, MEDIUM: {tag: lark_md, content: ** MEDIUM**}, LOW: {tag: lark_md, content: **ℹ️ LOW**}, }.get(cve[severity], {tag: plain_text, content: UNKNOWN}) product_text \n.join([f{p} for p in cve[affectedProducts]]) if cve[affectedProducts] else 暂无 ref_links \n.join([f- {ref} for ref in cve[references]]) if cve[references] else 暂无 element { tag: div, text: { tag: lark_md, content: f**{cve[id]}** | CVSS: **{cve[baseScore]}** | {severity_tag[content]}\n{cve[description]}\n**产品:** {product_text}\n**参考:** {ref_links} } } elements.append(element) elements.append({tag: hr}) # 分隔线 message { msg_type: post, content: { post: { zh_cn: { title: f发现 {len(cve_list)} 个新CVE漏洞, content: [elements] } } } } headers {Content-Type: application/json} try: resp requests.post(webhook_url, headersheaders, datajson.dumps(message), timeout10) resp.raise_for_status() print(f[成功] 飞书消息发送成功。) except Exception as e: print(f[错误] 发送飞书消息失败: {e})飞书的“post”格式更灵活可以构建更复杂的卡片式布局。这里使用了简单的div和hr标签来组织内容。3.5 第五步组装与定时调度最后我们把所有函数串联起来并用schedule库实现定时任务。import time import schedule def job(): print(f[{datetime.now().strftime(%Y-%m-%d %H:%M:%S)}] 开始检查NVD更新...) # 1. 获取数据 raw_data fetch_recent_cves(hours_ago1) # 检查过去1小时内的更新 if not raw_data: return # 2. 解析数据 cves parse_cve_data(raw_data) if not cves: print( 未解析到有效的CVE信息。) return print(f 发现 {len(cves)} 个新CVE。) # 3. 发送告警 (这里以钉钉为例) # 请将以下参数替换为你自己的 DINGTALK_WEBHOOK https://oapi.dingtalk.com/robot/send?access_tokenYOUR_TOKEN DINGTALK_SECRET YOUR_SECRET # 如果没加签留空字符串 send_dingtalk_message(DINGTALK_WEBHOOK, DINGTALK_SECRET, cves) # 每2小时运行一次任务 schedule.every(2).hours.do(job) # 也可以每分钟运行一次用于测试 # schedule.every(1).minutes.do(job) print(CVE监控机器人已启动按 CtrlC 退出。) while True: schedule.run_pending() time.sleep(60) # 每分钟检查一次是否有任务需要执行关键点解析定时策略schedule.every(2).hours.do(job)设置了每2小时执行一次。你可以根据对及时性的要求进行调整如every(30).minutes。NVD的更新并非实时通常2-4小时检查一次是合理的频率既能及时捕获更新又不会对NVD服务器造成不必要的压力。持续运行while True循环配合time.sleep(60)让脚本持续在后台运行。这是最简单的守护进程模式。在生产环境中你可能会考虑使用systemd服务、supervisor或将其部署为云函数Serverless以实现更好的进程管理和高可用。配置外置强烈建议将DINGTALK_WEBHOOK和DINGTALK_SECRET等敏感信息从代码中移除放入配置文件如config.ini或环境变量中管理。4. 部署方案与进阶优化脚本写好了怎么让它稳定、可靠地跑起来4.1 本地运行与服务器部署本地测试直接在命令行运行python cve_monitor.py。确保网络通畅能访问services.nvd.nist.gov。首次运行可能会因为SSL证书问题报错请确保你的Python环境证书完整。Linux服务器部署推荐将脚本和配置文件上传到服务器。使用nohup命令在后台运行nohup python3 cve_monitor.py monitor.log 21 。这样即使关闭SSH连接脚本也会继续运行日志输出到monitor.log。更优方案使用 systemd。创建一个服务文件如/etc/systemd/system/cve-monitor.service可以更好地管理启动、停止、重启和查看日志。[Unit] DescriptionCVE Monitor Bot Afternetwork.target [Service] Typesimple Useryour_username WorkingDirectory/path/to/your/script ExecStart/usr/bin/python3 /path/to/your/script/cve_monitor.py Restarton-failure RestartSec10 [Install] WantedBymulti-user.target然后使用sudo systemctl start cve-monitor启动sudo systemctl enable cve-monitor设置开机自启。4.2 进阶功能与优化思路一个基础的监控脚本已经完成但要让其真正融入工作流还可以考虑以下优化漏洞过滤与分级报警现在的脚本是“有洞就报”。你可以增加过滤规则例如基于CVSS分数只推送baseScore 7.0高危及以上的漏洞。基于产品关键词只关注你们公司使用的技术栈比如Apache,nginx,Redis,WordPress等。可以在CPE或描述信息中进行关键词匹配。基于漏洞类型通过描述中的关键词过滤例如只关注“远程代码执行(RCE)”、“权限提升”、“SQL注入”等高危类型。# 在 parse_cve_data 函数内或之后添加过滤 def filter_cves(cve_list, min_score7.0, keywordsNone): filtered [] for cve in cve_list: try: if float(cve[baseScore]) min_score: continue except ValueError: pass # 如果baseScore不是数字跳过评分过滤 if keywords: # 检查描述或产品中是否包含关键词 text_to_check f{cve[description]} { .join(cve[affectedProducts])}.lower() if not any(kw.lower() in text_to_check for kw in keywords): continue filtered.append(cve) return filtered数据持久化与去重将抓取到的CVE ID存入一个简单的数据库如SQLite或文件。每次抓取后只发送数据库中不存在的“新”CVE。这可以避免因NVD数据更新如评分修改而导致同一CVE被重复推送。多平台与多渠道通知除了钉钉/飞书可以很容易地扩展支持企业微信、Slack、电子邮件甚至短信。抽象一个“消息发送器”接口让通知逻辑更清晰。添加监控与告警脚本本身也可能出错网络中断、API变更。可以添加“心跳”功能让脚本定期向一个特定频道发送“我还活着”的消息。如果长时间没有心跳说明脚本可能已经挂掉需要人工干预。容器化部署使用Docker将脚本及其依赖打包成镜像。这样可以实现环境隔离、一键部署和快速水平扩展虽然本项目通常单实例即可。5. 常见问题与故障排查实录在实际运行中你肯定会遇到各种各样的问题。下面是我踩过的一些坑和解决方案问题1脚本运行后收不到任何消息日志显示“未解析到有效的CVE信息”。排查思路检查时间窗口hours_ago参数可能设得太小比如1分钟而恰好这段时间NVD没有更新。先调大到12或24小时测试。打印原始响应在fetch_recent_cves函数中临时添加print(response.text)查看NVD API是否真的返回了数据。可能返回的是{resultsPerPage: 0, ...}。检查网络连接确保运行脚本的服务器或电脑能够访问https://services.nvd.nist.gov。可以尝试用curl或浏览器直接访问API地址。验证JSON解析确认返回的数据是有效的JSON。有时API可能返回错误HTML页面。问题2钉钉/飞书机器人消息发送失败返回错误码。钉钉常见错误{errcode:310000, errmsg:sign not match}时间戳和签名不匹配。检查服务器时间是否准确使用UTC时间以及签名计算代码是否正确。确保timestamp是毫秒级。{errcode:300001, errmsg:message is empty}消息内容为空。检查cve_list是否为空或者消息体构建逻辑是否有误。{errcode:-1, errmsg:系统繁忙}通常是网络问题或钉钉服务暂时不可用稍后重试即可。飞书常见错误{code:9499, msg:Bad Request}通常是消息体格式不符合飞书要求。仔细对照飞书开放平台的文档检查message字典的结构特别是post格式下的content字段嵌套。问题3消息格式混乱在手机上显示错位。原因与解决Markdown或富文本的渲染在不同平台PC端 vs 移动端和不同客户端版本上可能有差异。简化格式尽量使用最基本的Markdown语法如**加粗**、[链接](url)、- 列表。避免使用复杂的嵌套或表格。控制长度过长的消息会被折叠影响阅读。确保描述被截断产品列表只显示最重要的几项。使用分隔线用---或hr标签清晰分隔不同CVE条目提升可读性。问题4脚本运行一段时间后自动停止。排查思路内存泄漏虽然本脚本很简单但也要确保在循环中没有无意中累积大量数据。schedule库本身是稳定的。未捕获的异常某个网络请求超时或解析出错导致整个线程崩溃。确保在job()函数的最外层也有try...except捕获所有异常并记录日志。系统调度如果是用nohup运行检查是否被系统杀掉了OOM Killer。可以查看系统日志/var/log/syslog或dmesg。最佳实践使用systemd或supervisor托管它们能自动重启失败的进程。问题5如何避免被NVD API限制或封禁遵守规则NVD虽然没有严格的速率限制但明确要求不得进行滥用性访问。设置合理的间隔对于监控目的每30分钟到2小时请求一次是完全合理的。切勿使用秒级或分钟级的高频请求。使用缓存如果需要对同一个CVE ID进行多次查询例如获取详细信息可以考虑在本地缓存结果避免重复请求。设置User-Agent在requests.get()中添加一个合理的headers例如{User-Agent: MyCveMonitorBot/1.0 (contact: your-emailexample.com)}以示友好。这个项目从构思到实现再到持续优化是一个典型的“用自动化解放生产力”的过程。它技术门槛不高但带来的效率提升和安全感是实实在在的。最关键的一步就是现在动手把它跑起来。先从最简单的版本开始收到第一条漏洞告警消息你会获得巨大的正反馈。然后再根据你的实际需求逐步添加过滤规则、持久化、监控等功能让它真正成为你安全运维体系中一个可靠的自动化节点。