Search before asking
What happened
问题描述
批量同步任务结束时(close → flush(null, true)),如果最后一批数据的 stream load 失败,
异常会被静默忽略,任务以 FINISHED 成功结束,目标表静默少最多一个 batch 的数据。
版本
master 分支 StreamLoadManager.java
代码分析
三个关键事实:
flushQueue 容量为 1:new LinkedBlockingDeque<>(1)
waitAsyncFlushingDone() 的实现是 offer 一个空 entity,然后 checkFlushException()
- 消费侧
asyncFlush() 先 poll 出数据,之后才执行 doStreamLoad
由 1+3 可知:offer(空entity) 成功的充要条件是"队列有空位",即真实数据已被 poll 走,
并不代表已写完。waitAsyncFlushingDone 实际是入队屏障,不是完成屏障。
时间上,主线程的 checkFlushException 在 offer 成功后立即执行;而最后一批的失败结论要经过
HTTP 响应/超时 + 最多 maxRetries 次重试(每轮 Thread.sleep(1000L * Math.min(i+1, 10))),
数秒到数十秒后才会执行到 flushException = e。因此最后一批一旦失败,该检查必然漏掉,
不是低概率竞态。它只能抓住"之前某一批已失败"的情况。
失败时序
- close() → flush(null, true) → flushInternal(key, true)
- offer(真实数据) 成功,flush 线程 poll 走,开始 doStreamLoad
- waitAsyncFlushingDone():offer(空entity) 立即成功(队列已空)→ checkFlushException()
此刻写入仍在进行,flushException == null,通过
- close() 末尾的 checkFlushException() 同样通过 → task FINISHED
- 数秒后 flush 线程重试耗尽,执行 flushException = e —— 已无人检查
- flushThread 是 daemon,TM JVM 随任务结束退出时,线程可能在重试 sleep 或 HTTP 半途被
直接杀死,连 flushException 赋值和 warn 日志都来不及执行,事后无任何排查痕迹
影响
- 任务 FINISHED、写出统计正常,但目标表少数据,且日志无痕迹
- 即使最后一批不报错,close 也没有等它写完;stream load 按 label 原子提交,
JVM 退出杀死飞行中的请求同样会丢这一批
附带发现
asyncFlush() 的 javadoc 写明 "return false if met eof and flush thread will exit",
循环中也有 if (!asyncFlush()) break;,但方法体内没有任何路径返回 false,
EOF 收尾协议似乎未实现完。
What you expected to happen
期望行为
flush(waitUtilDone=true) / close 应等待所有已入队批次处理完成(而非仅出队),
并在任务结束前把 flushException 传播到主线程。
可能的修复方向(供参考)
- 最小改动:waitAsyncFlushingDone 连续 offer 两个空 entity 再 check。容量为 1 时,
第二个 offer 成功必然发生在真实数据处理结束、第一个空 entity 被 poll 之后;此时若失败,
flushException 已先行置位(volatile 可见)。注:依赖"容量=1 + 消费线程异常后不退出"
两个隐式前提,建议注释 + 单测固定。
- 完成式同步:BufferEntity 携带 CompletableFuture,doStreamLoad 成功 complete、
重试耗尽 completeExceptionally,waitUtilDone 时对已入队 entity 执行 future.get(timeout)。
- 补全 EOF 协议:close 发显式 EOF entity,消费线程遇到则 return false 退出循环,
主线程 join(timeout) 后做最终 check;flushThread 不设 daemon。
How to reproduce
在待同步数据中的最后一条插入一行必定写失败的数据即可复现
Anything else
No response
Version
master
Are you willing to submit PR?
Code of Conduct
Search before asking
What happened
问题描述
批量同步任务结束时(close → flush(null, true)),如果最后一批数据的 stream load 失败,
异常会被静默忽略,任务以 FINISHED 成功结束,目标表静默少最多一个 batch 的数据。
版本
master 分支 StreamLoadManager.java
代码分析
三个关键事实:
flushQueue容量为 1:new LinkedBlockingDeque<>(1)waitAsyncFlushingDone()的实现是 offer 一个空 entity,然后checkFlushException()asyncFlush()先poll出数据,之后才执行doStreamLoad由 1+3 可知:
offer(空entity)成功的充要条件是"队列有空位",即真实数据已被 poll 走,并不代表已写完。waitAsyncFlushingDone 实际是入队屏障,不是完成屏障。
时间上,主线程的 checkFlushException 在 offer 成功后立即执行;而最后一批的失败结论要经过
HTTP 响应/超时 + 最多 maxRetries 次重试(每轮
Thread.sleep(1000L * Math.min(i+1, 10))),数秒到数十秒后才会执行到
flushException = e。因此最后一批一旦失败,该检查必然漏掉,不是低概率竞态。它只能抓住"之前某一批已失败"的情况。
失败时序
此刻写入仍在进行,flushException == null,通过
直接杀死,连 flushException 赋值和 warn 日志都来不及执行,事后无任何排查痕迹
影响
JVM 退出杀死飞行中的请求同样会丢这一批
附带发现
asyncFlush()的 javadoc 写明 "return false if met eof and flush thread will exit",循环中也有
if (!asyncFlush()) break;,但方法体内没有任何路径返回 false,EOF 收尾协议似乎未实现完。
What you expected to happen
期望行为
flush(waitUtilDone=true) / close 应等待所有已入队批次处理完成(而非仅出队),
并在任务结束前把 flushException 传播到主线程。
可能的修复方向(供参考)
第二个 offer 成功必然发生在真实数据处理结束、第一个空 entity 被 poll 之后;此时若失败,
flushException 已先行置位(volatile 可见)。注:依赖"容量=1 + 消费线程异常后不退出"
两个隐式前提,建议注释 + 单测固定。
重试耗尽 completeExceptionally,waitUtilDone 时对已入队 entity 执行 future.get(timeout)。
主线程 join(timeout) 后做最终 check;flushThread 不设 daemon。
How to reproduce
在待同步数据中的最后一条插入一行必定写失败的数据即可复现
Anything else
No response
Version
master
Are you willing to submit PR?
Code of Conduct