
一、模块定位与职责scheduler-worker是分布式任务调度系统中的执行者负责接收调度中心下发的任务并执行。如果把调度中心比作快递公司的总部那么 Worker 就是真正的快递员——他们真正干活。Worker 的职责可以概括为四件事连接与注册启动时主动连接到调度中心告诉对方“我在这里”保持在线通过心跳机制让调度中心知道它还活着执行任务收到任务后真正去执行业务逻辑缓存加速通过多级缓存提升数据访问性能二、启动与连接Worker 如何上线2.1 启动流程Worker 的启动入口是SchedulerClient.main()启动过程如下1. 创建 Netty 客户端 └── NioEventLoopGroup处理 I/O 事件 └── Bootstrap配置连接参数 └── Pipeline 添加 Handler ├── MessageDecoder字节 → Message ├── MessageEncoderMessage → 字节 └── ClientHandler业务处理 2. 连接调度中心127.0.0.1:8080 └── 同步等待连接建立 3. 生成 Worker ID └── worker- System.currentTimeMillis() 4. 发送注册消息TYPE_REGISTER └── BodyWorkerInfo JSON包含 workerId、host、port 等 5. 启动心跳线程 └── 每 30 秒发送一次心跳 6. 阻塞等待连接关闭2.2 心跳线程实现心跳是 Worker 向调度中心证明自己“还活着”的方式private void startHeartbeat() { Thread heartbeatThread new Thread(() - { while (channel ! null channel.isActive()) { try { Thread.sleep(30000); // 30 秒间隔 channel.writeAndFlush(Message.heartbeat()); System.out.println(Send heartbeat); } catch (InterruptedException e) { break; } } }); heartbeatThread.setDaemon(true); // 守护线程 heartbeatThread.start(); }为什么用守护线程守护线程不会阻止 JVM 退出如果 Worker 主线程因为连接断开而退出心跳线程会自动结束避免资源泄漏为什么是 30 秒调度中心的心跳超时阈值是 60 秒30 秒间隔意味着连续丢失 2 次心跳才会被判定为离线足够容忍网络抖动又不会浪费太多带宽三、任务执行入口ClientHandler3.1 消息处理流程ClientHandler是 Worker 处理所有网络消息的地方protected void channelRead0(ChannelHandlerContext ctx, Message msg) { switch (msg.getType()) { case TYPE_HEARTBEAT: // 收到心跳响应打印日志即可 System.out.println(Heartbeat response received); break; case TYPE_REQUEST: // 1. 解析任务 JobContext job JsonUtil.fromJson(new String(msg.getBody()), JobContext.class); // 2. 幂等检查 if (executedTasks.contains(job.getTaskId())) { // 已执行过直接返回成功 return ExecutionResult.success(job.getJobId(), job.getTaskId(), Already executed); } // 3. 执行任务 ExecutionResult result execute(job); // 4. 记录已执行幂等 if (result.getSuccess()) { executedTasks.add(job.getTaskId()); } // 5. 返回结果 ctx.writeAndFlush(Message.response(JsonUtil.toJson(result).getBytes())); break; case TYPE_CACHE_MIGRATE: // 接收缓存迁移消息 CacheMigrationMessage migrateMsg JsonUtil.fromJson( new String(msg.getBody()), CacheMigrationMessage.class ); // 预加载热点 Key for (String key : migrateMsg.getHotKeys()) { cacheService.get(key, () - loadFromDB(key)); } break; } }3.2 幂等检查为什么需要它在分布式环境中同一个任务可能被调度中心重复下发场景为什么重复网络超时调度中心没收到响应认为任务失败重新下发调度中心重启从数据库恢复任务时已执行的任务可能被重新加载重试机制任务超时后进入重试队列重新下发幂等检查的方式内存检查executedTasks是一个ConcurrentHashMap.newKeySet()存储已执行的taskId数据库检查task_id有唯一索引重复插入会失败两层防护保证同一个任务不会被执行两次。面试可能问“如果 Worker 重启了executedTasks里的数据丢失了怎么办”回答重启后从数据库加载已成功执行的任务 ID初始化executedTasks。或者依赖数据库唯一索引作为最后一道防线。四、任务执行execute()4.1 执行逻辑private ExecutionResult execute(JobContext job) { try { // 第一步分片感知预热 if (job.getPreloadKeys() ! null) { for (String key : job.getPreloadKeys()) { cacheService.get(key, () - { System.out.println([Preload] Loading from DB: key); return preloaded_data; }); } System.out.println([Preload] Preloaded job.getPreloadKeys().size() keys); } // 第二步执行业务逻辑 String userId job.getParams(); String userData cacheService.get(user: userId, () - { System.out.println([DB] Querying database for user: userId); return {\name\:\User userId \,\level\:1}; }); System.out.println(Executing job: job.getJobName() , shard: job.getShardingItem()); System.out.println(User data from cache: userData); // 模拟耗时操作 Thread.sleep(500); return ExecutionResult.success(job.getJobId(), job.getTaskId(), Job executed successfully); } catch (Exception e) { return ExecutionResult.failure(job.getJobId(), job.getTaskId(), Execution failed: e.getMessage()); } }4.2 执行流程解析任务到达 │ ▼ ① 分片感知预热 │ 提前加载 preloadKeys 到 L1 缓存 │ 这样后续业务逻辑执行时数据已经在本地了 ▼ ② 业务逻辑执行 │ 从 params 中提取业务参数 │ 通过多级缓存读取数据L1 → L2 → DB ▼ ③ 返回结果 │ 成功 → ExecutionResult.success() │ 失败 → ExecutionResult.failure()4.3 为什么要有预热步骤预热步骤是分片感知预热的体现调度中心下发任务时会携带preloadKeys列表这些 Key 是调度中心根据分片信息预先计算出来的Worker 在执行任务前先加载这些 Key让它们进入 L1 缓存业务逻辑执行时数据已经在本地了L1 命中率极高五、多级缓存CacheService5.1 为什么需要多级缓存如果所有数据都从数据库读取性能和并发能力会受限。缓存的本质是用空间换时间。缓存层级技术访问延迟适用场景L1 本地缓存Caffeine微秒级高频访问的热点数据L2 分布式缓存Redis毫秒级跨节点共享的数据DBMySQL数十毫秒所有数据的最终来源5.2 读取路径五层防护请求 key │ ▼ ① Bloom Filter防穿透 │ 判断 key 是否可能存在 │ 不存在 → 直接返回 null不查任何存储 │ ▼ ② Caffeine L1本地缓存 │ 命中 → 返回数据 │ 未命中 → 继续 │ ▼ ③ 互斥锁 double-check防击穿 │ 相同 key 只有一个线程能通过 │ 获取锁后再次检查 L1 │ ▼ ④ Redis L2分布式缓存 │ 命中 → 回填 L1 → 返回数据 │ 未命中 → 继续 │ ▼ ⑤ DataLoader数据库 │ 从 DB 加载数据 │ 写入 Redis随机 TTL 300~360s L1 Bloom Filter │ 如果 DB 无数据 → 缓存空值 NULLTTL 60s5.3 缓存穿透防护布隆过滤器问题查询一个不存在的 key每次请求都穿透到数据库。解决方案布隆过滤器 空值缓存// 初始化布隆过滤器 private final BloomFilterString bloomFilter BloomFilter.create( Funnels.stringFunnel(StandardCharsets.UTF_8), 100000, // 预计插入 10 万条 0.01 // 误判率 1% ); // 查询时 if (!bloomFilter.mightContain(key)) { System.out.println([Cache] Bloom filter: key not exists, skip); return null; // 直接返回不查 DB }布隆过滤器的特点判断“不存在”是绝对准确的判断“存在”可能有误判1% 的概率误判时空值缓存 NULL 作为兜底5.4 缓存击穿防护互斥锁问题一个热点 Key 过期后大量请求同时涌入全部打到数据库。解决方案按 key 加互斥锁 double-checkReentrantLock lock keyLocks.computeIfAbsent(key, k - new ReentrantLock()); lock.lock(); try { // double-check获取锁后再次检查 L1 String value localCache.getIfPresent(key); if (value ! null) { return value; } // 从 Redis 或 DB 加载 // ... } finally { lock.unlock(); keyLocks.remove(key); }关键点只有第一个请求会去加载数据其他请求等待。数据加载完成后后续请求直接从缓存读取。5.5 缓存雪崩防护随机 TTL问题大量 Key 同时过期导致 DB 压力暴增。解决方案在基础过期时间上增加随机偏移int baseExpire 300; // 基础 300 秒 int randomOffset random.nextInt(60); // 0~60 秒随机 int expireTime baseExpire randomOffset; jedis.setex(key, expireTime, value); System.out.println([Cache] Set with expire: expireTime s);不同 Key 的过期时间在 300~360 秒之间均匀分布不会同时过期。六、缓存迁移接收当调度中心检测到某个 Worker 下线时会发送TYPE_CACHE_MIGRATE消息把该 Worker 的热点 Key 迁移到其他 Worker。case TYPE_CACHE_MIGRATE: String migrateJson new String(msg.getBody()); CacheMigrationMessage migrateMsg JsonUtil.fromJson(migrateJson, CacheMigrationMessage.class); System.out.println([Migration] Received cache migration, keys: migrateMsg.getHotKeys().size()); for (String key : migrateMsg.getHotKeys()) { cacheService.get(key, () - { System.out.println([Migration] Loading key: key); return {\migrated\:true}; }); } System.out.println([Migration] Preloaded migrateMsg.getHotKeys().size() keys); break;迁移后新 Worker 的本地缓存已经包含了这些热点 Key任务执行时不会因为缓存缺失而变慢。七、缓存配置CacheConfigpublic class CacheConfig { public static K, V CacheK, V createLocalCache() { return Caffeine.newBuilder() .maximumSize(10000) // 最大 10000 条 .expireAfterWrite(60, TimeUnit.SECONDS) // 60 秒过期 .recordStats() // 记录命中率 .build(); } }为什么选择 Caffeine对比项CaffeineGuava CacheEhcache性能最优中等一般内存管理最优W-TinyLFU一般LRU一般Spring 集成原生支持支持支持维护活跃度高中中Caffeine 是目前 Java 生态中最优秀的本地缓存库。八、关键设计决策总结设计决策原因权衡Worker 用独立线程发心跳不受业务处理阻塞影响多一个线程使用 Caffeine 做 L1 缓存微秒级访问极高 QPS内存占用布隆过滤器在 Worker 侧避免无效请求穿透到 Redis/DB内存占用 误判率缓存迁移使用同步加载保证迁移完成后再处理任务迁移期间任务可能等待executedTasks用ConcurrentHashMap多线程并发处理任务内存占用九、常见面试问题Q1Worker 本地缓存和 Redis 缓存的数据一致性问题怎么处理缓存一致性是分布式系统的经典难题。我采用的是最终一致性策略更新时先更新数据库再删除 Redis 缓存本地缓存通过短过期时间60 秒来控制不一致窗口如果业务对一致性要求高可以禁用本地缓存Q2布隆过滤器的误判率怎么来的布隆过滤器使用多个哈希函数。当数据量超过预期时哈希碰撞概率增加。这就是为什么需要配置expectedInsertions和fpp误判率。1% 的误判率意味着 100 个不存在的 Key 中可能有 1 个被误判为存在然后由空值缓存兜底。Q3如果 Worker 在执行任务时宕机了任务怎么办调度中心的心跳检测会感知 Worker 下线然后在removeWorker方法中把该 Worker 的runningTasks放入重试队列最终由其他 Worker 接管。Q4Worker 收到了 TYPE_CACHE_MIGRATE 消息但消息中的 HotKey 数量很多会不会导致大量内存占用每次迁移最多 10 个 Key每个任务预热 10 个 Key而且只是预热 Key 的元数据不是预热整个数据内容。实际内存占用很小。十、三个模块总结模块核心职责关键技术scheduler-common通信协议、数据模型、工具Netty 编解码、Jacksonscheduler-core管理 Worker、调度任务、处理故障Netty、一致性 Hash、MySQLscheduler-worker执行任务、缓存加速Caffeine、Redis、Guava一句话总结common 定义“怎么说”core 决定“谁来做”worker 负责“怎么做”。