Skip to content

[Bug] [chunjun-connector-starrocks] 任务结束时最后一批 stream load 失败被静默忽略,存在丢数风险(waitAsyncFlushingDone 只等出队、不等写完) #1968

@Addicted-Coder

Description

@Addicted-Coder

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

问题描述

批量同步任务结束时(close → flush(null, true)),如果最后一批数据的 stream load 失败,
异常会被静默忽略,任务以 FINISHED 成功结束,目标表静默少最多一个 batch 的数据。

版本

master 分支 StreamLoadManager.java

代码分析

三个关键事实:

  1. flushQueue 容量为 1:new LinkedBlockingDeque<>(1)
  2. waitAsyncFlushingDone() 的实现是 offer 一个空 entity,然后 checkFlushException()
  3. 消费侧 asyncFlush()poll 出数据,之后才执行 doStreamLoad

由 1+3 可知:offer(空entity) 成功的充要条件是"队列有空位",即真实数据已被 poll 走,
并不代表已写完。waitAsyncFlushingDone 实际是入队屏障,不是完成屏障。

时间上,主线程的 checkFlushException 在 offer 成功后立即执行;而最后一批的失败结论要经过
HTTP 响应/超时 + 最多 maxRetries 次重试(每轮 Thread.sleep(1000L * Math.min(i+1, 10))),
数秒到数十秒后才会执行到 flushException = e。因此最后一批一旦失败,该检查必然漏掉,
不是低概率竞态。它只能抓住"之前某一批已失败"的情况。

失败时序

  1. close() → flush(null, true) → flushInternal(key, true)
  2. offer(真实数据) 成功,flush 线程 poll 走,开始 doStreamLoad
  3. waitAsyncFlushingDone():offer(空entity) 立即成功(队列已空)→ checkFlushException()
    此刻写入仍在进行,flushException == null,通过
  4. close() 末尾的 checkFlushException() 同样通过 → task FINISHED
  5. 数秒后 flush 线程重试耗尽,执行 flushException = e —— 已无人检查
  6. flushThread 是 daemon,TM JVM 随任务结束退出时,线程可能在重试 sleep 或 HTTP 半途被
    直接杀死,连 flushException 赋值和 warn 日志都来不及执行,事后无任何排查痕迹

影响

  • 任务 FINISHED、写出统计正常,但目标表少数据,且日志无痕迹
  • 即使最后一批不报错,close 也没有等它写完;stream load 按 label 原子提交,
    JVM 退出杀死飞行中的请求同样会丢这一批

附带发现

asyncFlush() 的 javadoc 写明 "return false if met eof and flush thread will exit",
循环中也有 if (!asyncFlush()) break;,但方法体内没有任何路径返回 false,
EOF 收尾协议似乎未实现完。

What you expected to happen

期望行为

flush(waitUtilDone=true) / close 应等待所有已入队批次处理完成(而非仅出队),
并在任务结束前把 flushException 传播到主线程。

可能的修复方向(供参考)

  1. 最小改动:waitAsyncFlushingDone 连续 offer 两个空 entity 再 check。容量为 1 时,
    第二个 offer 成功必然发生在真实数据处理结束、第一个空 entity 被 poll 之后;此时若失败,
    flushException 已先行置位(volatile 可见)。注:依赖"容量=1 + 消费线程异常后不退出"
    两个隐式前提,建议注释 + 单测固定。
  2. 完成式同步:BufferEntity 携带 CompletableFuture,doStreamLoad 成功 complete、
    重试耗尽 completeExceptionally,waitUtilDone 时对已入队 entity 执行 future.get(timeout)。
  3. 补全 EOF 协议:close 发显式 EOF entity,消费线程遇到则 return false 退出循环,
    主线程 join(timeout) 后做最终 check;flushThread 不设 daemon。

How to reproduce

在待同步数据中的最后一条插入一行必定写失败的数据即可复现

Anything else

No response

Version

master

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions