Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG]: Intermittent CI failures on tests/test_kafka_source_stage_pipe.py::test_kafka_source_commit #1217

Closed
2 tasks done
dagardner-nv opened this issue Sep 21, 2023 · 0 comments
Assignees
Labels
bug Something isn't working

Comments

@dagardner-nv
Copy link
Contributor

Version

23.11

Which installation method(s) does this occur on?

No response

Describe the bug.

Bug occurring semi-regularly in CI.

____________________ test_kafka_source_commit[use_cpp-1000] ____________________

num_records = 1000
config = Config(debug=False, log_level=30, log_config_file=None, plugins=None, mode=<PipelineModes.OTHER: 'OTHER'>, feature_len...peline_batch_size=256, num_threads=1, model_max_batch_size=8, edge_buffer_size=128, class_labels=[], ae=None, fil=None)
kafka_bootstrap_servers = 'localhost:25359'
kafka_topics = KafkaTopics(input_topic='morpheus_unittest_input_1695163236.5183914', output_topic='morpheus_unittest_output_1695163236.5184')

    @pytest.mark.kafka
    @pytest.mark.parametrize('num_records', [10, 100, 1000])
    def test_kafka_source_commit(num_records, config, kafka_bootstrap_servers: str,
                                 kafka_topics: typing.Tuple[str, str]) -> None:
    
        data = [{'v': i} for i in range(num_records)]
        num_written = write_data_to_kafka(kafka_bootstrap_servers, kafka_topics.input_topic, data)
        assert num_written == num_records
    
        pipe = LinearPipeline(config)
        pipe.set_source(
            KafkaSourceStage(config,
                             bootstrap_servers=kafka_bootstrap_servers,
                             input_topic=kafka_topics.input_topic,
                             auto_offset_reset="earliest",
                             poll_interval="1seconds",
                             group_id='morpheus',
                             client_id='morpheus_kafka_source_commit',
                             stop_after=num_records,
                             async_commits=False))
    
        pipe.add_stage(OffsetChecker(config, bootstrap_servers=kafka_bootstrap_servers, group_id='morpheus'))
        pipe.add_stage(TriggerStage(config))
    
        pipe.add_stage(DeserializeStage(config))
        pipe.add_stage(SerializeStage(config))
        comp_stage = pipe.add_stage(
            CompareDataFrameStage(config, pd.DataFrame(data=data), include=[r'^v$'], reset_index=True))
>       pipe.run()

test_kafka_source_stage_pipe.py:186: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/opt/conda/envs/morpheus/lib/python3.10/site-packages/morpheus/pipeline/pipeline.py:606: in run
    asyncio.run(self.run_async())
/opt/conda/envs/morpheus/lib/python3.10/asyncio/runners.py:44: in run
    return loop.run_until_complete(main)
/opt/conda/envs/morpheus/lib/python3.10/asyncio/base_events.py:649: in run_until_complete
    return future.result()
/opt/conda/envs/morpheus/lib/python3.10/site-packages/morpheus/pipeline/pipeline.py:584: in run_async
    await self.join()
/opt/conda/envs/morpheus/lib/python3.10/site-packages/morpheus/pipeline/pipeline.py:329: in join
    await self._mrc_executor.join_async()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <morpheus_offset_checker-2279; OffsetChecker(bootstrap_servers=localhost:25359, group_id=morpheus)>
x = <morpheus._lib.messages.MessageMeta object at 0x7f9a3c356170>

    def _offset_checker(self, x):
        at_least_one_gt = False
        new_offsets = self._client.list_consumer_group_offsets(self._group_id)
    
        if self._offsets is not None:
            for (topic_partition, prev_offset) in self._offsets.items():
                new_offset = new_offsets[topic_partition]
    
                assert new_offset.offset >= prev_offset.offset
    
                if new_offset.offset > prev_offset.offset:
                    at_least_one_gt = True
    
>           assert at_least_one_gt
E           assert False

test_kafka_source_stage_pipe.py:145: AssertionError
----------------------------- Captured stdout call -----------------------------
[2023-09-19 22:40:36,801] INFO Creating topic morpheus_unittest_input_1695163236.5183914 with configuration {} and initial partition assignment HashMap(0 -> ArrayBuffer(25359)) (kafka.zk.AdminZkClient)
[2023-09-19 22:40:36,822] INFO [ReplicaFetcherManager on broker 25359] Removed fetcher for partitions Set(morpheus_unittest_input_1695163236.5183914-0) (kafka.server.ReplicaFetcherManager)
[2023-09-19 22:40:36,826] INFO [LogLoader partition=morpheus_unittest_input_1695163236.5183914-0, dir=/tmp/pytest-of-root/pytest-0/session-scoped0/kafka-server-25359/logs] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
[2023-09-19 22:40:36,827] INFO Created log for partition morpheus_unittest_input_1695163236.5183914-0 in /tmp/pytest-of-root/pytest-0/session-scoped0/kafka-server-25359/logs/morpheus_unittest_input_1695163236.5183914-0 with properties {} (kafka.log.LogManager)
[2023-09-19 22:40:36,828] INFO [Partition morpheus_unittest_input_1695163236.5183914-0 broker=25359] No checkpointed highwatermark is found for partition morpheus_unittest_input_1695163236.5183914-0 (kafka.cluster.Partition)
[2023-09-19 22:40:36,828] INFO [Partition morpheus_unittest_input_1695163236.5183914-0 broker=25359] Log loaded for partition morpheus_unittest_input_1695163236.5183914-0 with initial high watermark 0 (kafka.cluster.Partition)
[2023-09-19 22:40:37,529] INFO [GroupCoordinator 25359]: Dynamic member with unknown member id joins group morpheus in Empty state. Created a new member id morpheus_kafka_source_commit-95caad11-9116-4646-974e-aa209e30a01d and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2023-09-19 22:40:37,531] INFO [GroupCoordinator 25359]: Preparing to rebalance group morpheus in state PreparingRebalance with old generation 16 (__consumer_offsets-39) (reason: Adding new member morpheus_kafka_source_commit-95caad11-9116-4646-974e-aa209e30a01d with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
[2023-09-19 22:40:40,531] INFO [GroupCoordinator 25359]: Stabilized group morpheus generation 17 (__consumer_offsets-39) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2023-09-19 22:40:40,534] INFO [GroupCoordinator 25359]: Assignment received from leader morpheus_kafka_source_commit-95caad11-9116-4646-974e-aa209e30a01d for group morpheus for generation 17. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2023-09-19 22:40:41,870] INFO [GroupCoordinator 25359]: Preparing to rebalance group morpheus in state PreparingRebalance with old generation 17 (__consumer_offsets-39) (reason: Removing member morpheus_kafka_source_commit-95caad11-9116-4646-974e-aa209e30a01d on LeaveGroup; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
[2023-09-19 22:40:41,870] INFO [GroupCoordinator 25359]: Group morpheus with generation 18 is now empty (__consumer_offsets-39) (kafka.coordinator.group.GroupCoordinator)
[2023-09-19 22:40:41,871] INFO [GroupCoordinator 25359]: Member MemberMetadata(memberId=morpheus_kafka_source_commit-95caad11-9116-4646-974e-aa209e30a01d, groupInstanceId=None, clientId=morpheus_kafka_source_commit, clientHost=/172.18.0.2, sessionTimeoutMs=60000, rebalanceTimeoutMs=300000, supportedProtocols=List(range, roundrobin)) has left group morpheus through explicit `LeaveGroup`; client reason: not provided (kafka.coordinator.group.GroupCoordinator)
----------------------------- Captured stderr call -----------------------------
E20230919 22:40:40.941561 22823 context.cpp:124] /linear_segment_0/morpheus_offset_checker-2279; rank: 0; size: 1; tid: 140300064384768: set_exception issued; issuing kill to current runnable. Exception msg: AssertionError: assert False

At:
  /__w/Morpheus/Morpheus/morpheus/tests/test_kafka_source_stage_pipe.py(145): _offset_checker
E20230919 22:40:41.869378 22822 kafka_source.cpp:380] Exception in rebalance_loop. Msg: std::exception
E20230919 22:40:41.872720 17919 runner.cpp:189] Runner::await_join - an exception was caught while awaiting on one or more contexts/instances - rethrowing
E20230919 22:40:41.872822 17919 segment_instance.cpp:270] segment::SegmentInstance - an exception was caught while awaiting on one or more nodes - rethrowing
E20230919 22:40:41.872867 17919 pipeline_instance.cpp:225] pipeline::PipelineInstance - an exception was caught while awaiting on segments - rethrowing
Exception occurred in pipeline. Rethrowing
Traceback (most recent call last):
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/morpheus/pipeline/pipeline.py", line 329, in join
    await self._mrc_executor.join_async()
  File "/__w/Morpheus/Morpheus/morpheus/tests/test_kafka_source_stage_pipe.py", line 145, in _offset_checker
    assert at_least_one_gt
AssertionError: assert False
E20230919 22:40:41.874922 17919 service.cpp:136] mrc::service: service was not joined before being destructed; issuing join

Minimum reproducible example

No response

Relevant log output

Click here to see error details

[Paste the error here, it will be hidden by default]

Full env printout

Click here to see environment details

[Paste the results of print_env.sh here, it will be hidden by default]

Other/Misc.

No response

Code of Conduct

  • I agree to follow Morpheus' Code of Conduct
  • I have searched the open bugs and have found no duplicates for this bug report
@dagardner-nv dagardner-nv added the bug Something isn't working label Sep 21, 2023
rapids-bot bot pushed a commit that referenced this issue Sep 22, 2023
* Fixes intermittent failures in `tests/test_kafka_source_stage_pipe.py::test_kafka_source_commit`
* The test checked the offsets in a stage, however the C++ impl for the source stage performs a commit after calling `on_next`. This also limited us to only testing sync commits.
* Updated test performs the offset check after the pipeline completes when we can be assured that all commits have completed.

Fxies #1217

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1212
@github-project-automation github-project-automation bot moved this from Todo to Done in Morpheus Boards Sep 22, 2023
dagardner-nv added a commit to dagardner-nv/Morpheus that referenced this issue Oct 4, 2023
* Fixes intermittent failures in `tests/test_kafka_source_stage_pipe.py::test_kafka_source_commit`
* The test checked the offsets in a stage, however the C++ impl for the source stage performs a commit after calling `on_next`. This also limited us to only testing sync commits.
* Updated test performs the offset check after the pipeline completes when we can be assured that all commits have completed.

Fxies nv-morpheus#1217

- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: nv-morpheus#1212
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Status: Done
Development

No branches or pull requests

1 participant