Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

异常挂起后手工解挂2次,channel pipeline状态都显示正常,但实际上channel已经假死不同步 #638

Closed
funnyAnt opened this issue Nov 10, 2018 · 4 comments
Assignees
Labels
Milestone

Comments

@funnyAnt
Copy link

funnyAnt commented Nov 10, 2018

  • 现象:

node在insert 时出现异常触发manager挂起,不处理这个异常,紧接着直接在manager界面上面点击解挂,2次这样操作后channel 状态显示正常,但实际上channel已经假死不工作。

  • 原因
    出现异常rollback,触发SelectTask.java里面
    if (rversion.get() != startVersion) {// 说明存在过变化,中间出现过rollback,需要丢弃该数据 logger.warn("rollback happend , should skip this data and get new message."); canStartSelector.get();// 确认一下rollback是否完成 gotMessage = otterSelector.selector();// 这时不管有没有数据,都需要执行一次s/e/t/l }
    ,同时rollback会发送错误给manager,更改channel状态为PAUSE。如果在ExtractMemoryArbitrateEvent中channel状态已经变成了PAUSE, 会直接丢掉数据不进行任何处理,EventStore里面的读取位置点没有被rollback.
@funnyAnt
Copy link
Author

funnyAnt commented Nov 10, 2018

@agapple ExtractMemoryArbitrateEvent中channel状态已经变成了PAUSE, 会直接丢掉数据不进行任何处理,回滚EventStore里面的读位点是不是被遗漏了?

  • ExtractMemoryArbitrateEvent.java文件:
`public EtlEventData await(Long pipelineId) throws InterruptedException {
        Assert.notNull(pipelineId);

        PermitMonitor permitMonitor = ArbitrateFactory.getInstance(pipelineId, PermitMonitor.class);
        permitMonitor.waitForPermit();// 阻塞等待授权

        MemoryStageController stageController = ArbitrateFactory.getInstance(pipelineId, MemoryStageController.class);
        Long processId = stageController.waitForProcess(StageType.EXTRACT); // 符合条件的processId

        ChannelStatus status = permitMonitor.getChannelPermit();
        if (status.isStart()) {// 即时查询一下当前的状态,状态随时可能会变
            return stageController.getLastData(processId);
        } else {
            logger.warn("pipelineId[{}] extract ignore processId[{}] by status[{}]", new Object[] { pipelineId,
                    processId, status });
            **//这个地方是否应该加上触发EventStore读位置点rollback的操作????**
            return await(pipelineId);// 递归调用
        }
    }
`

@funnyAnt
Copy link
Author

  • 调试发现不是EventStore的回滚的问题,是ExtractMemoryArbitrateEvent.java里面发现channel是PAUSE态直接把信号包丢弃掉,没有remove掉MemoryStageController里面相关的processId,

  • ExtractMemoryArbitrateEvent里面的代码修改如下:

 if (status.isStart()) {// 即时查询一下当前的状态,状态随时可能会变
          ......
        } else {
            logger.warn("pipelineId[{}] select ignore processId[{}] by status[{}]", new Object[] { pipelineId,
                    processId, status });
           //  释放下processId,因为MemoryStageController的load是等待processId最小值完成Tranform才继//续,如果这里不释放,会一直卡死等待
            stageController.clearProgress(processId);
            return await(pipelineId);// 递归调用
        }

agapple added a commit that referenced this issue Nov 13, 2018
#638 异常挂起后手工解挂2次,channel pipeline状态都显示正常,但实际上channel已经假死不同步。processId…
@agapple agapple closed this as completed Nov 13, 2018
@agapple agapple self-assigned this Nov 13, 2018
@agapple agapple added the bug label Nov 13, 2018
@agapple agapple added this to the v4.2.17 milestone Nov 13, 2018
@funnyAnt
Copy link
Author

  • 仅仅stageController.clearProgress(processId);

移除掉processId还是有问题,一是select stage信号量会丢掉1个,二是eventStore里面的get值没有回置。 测试发现把pipeline里面的并发度改成1,卡死的问题会稳定出现。

  • f512a3f 这个里面,做了一些修改,
  1. SelectTask.java里面增加sleep,等待channel变成start态,尽量避免走丢信号量流程。

  2. ****MemoryArbitrateEvent.java里面把stageController.clearProgress(processId)调用改成stageController.termin(TerminType.ROLLBACK);

            // 进行ROLLBACK,触发释放下processId,信号量及EventStore里面的读位置点。
            // 1)因为MemoryStageController的load是等待processId最小值完成Tranform才继续,如果这里不释放,会一直卡死等待
            // 2)SELECT信号量消耗完selectTask任务会停止3)EventStore里面的读位置点不回置,如果正好队列已经满并且读取了最后,BINLOG新的数据进不来
            stageController.termin(TerminType.ROLLBACK);

@agapple 看看有没有问题,如果没有问题,我再重现pull request下。

agapple added a commit that referenced this issue Nov 15, 2018
@linqh1
Copy link

linqh1 commented Sep 30, 2020

我也出现这个问题了...
版本是4.2.17
manager看channel状态正常, 但是同步位点一直没更新. 得重启channel才行

linqh1 pushed a commit to linqh1/otter that referenced this issue Dec 7, 2021
alibaba#638 异常挂起后手工解挂2次,channel pipeline状态都显示正常,但实际上channel已经假死不同步。processId…
linqh1 pushed a commit to linqh1/otter that referenced this issue Dec 7, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants