Skip to content
This repository has been archived by the owner on Jan 2, 2024. It is now read-only.

Commit

Permalink
Fix unit tests. The tests are failing because the code is wrong.
Browse files Browse the repository at this point in the history
  • Loading branch information
jrobinAV committed Nov 14, 2023
1 parent 9327ac0 commit acb6521
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 47 deletions.
2 changes: 1 addition & 1 deletion src/taipy/core/job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def status(self, val):
if self.submit_id:
from ..submission._submission_manager_factory import _SubmissionManagerFactory

_SubmissionManagerFactory._build_manager()._get(self.submit_id).update_submission_status()
_SubmissionManagerFactory._build_manager()._get(self.submit_id)._update_submission_status()

@property # type: ignore
@_self_reload(_MANAGER_NAME)
Expand Down
2 changes: 1 addition & 1 deletion src/taipy/core/submission/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def __gt__(self, other):
def __ge__(self, other):
return self.creation_date.timestamp() == other.creation_date.timestamp() or self > other

def update_submission_status(self):
def _update_submission_status(self):
submission_status = SubmissionStatus.UNDEFINED
blocked = False
pending = False
Expand Down
157 changes: 113 additions & 44 deletions tests/core/notification/test_events_published.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,80 +39,149 @@ def identity(x):
return x


def test_event_published():
register_id_0, register_queue_0 = Notifier.register()
all_evts = AllCoreEventConsumer(register_id_0, register_queue_0)
all_evts.start()

def test_events_published_for_scenario_creation():
input_config = Config.configure_data_node("the_input")
output_config = Config.configure_data_node("the_output")
task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
sc_config = Config.configure_scenario(
"the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
)

register_id_0, register_queue_0 = Notifier.register()
all_evts = AllCoreEventConsumer(register_id_0, register_queue_0)
all_evts.start()
# Create a scenario only trigger 6 creation events (for cycle, data node(x2), task, sequence and scenario)
scenario = tp.create_scenario(sc_config)
tp.create_scenario(sc_config)

assert_true_after_time(lambda: all_evts.event_collected == 6, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.CYCLE] == 1, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.DATA_NODE] == 2, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.TASK] == 1, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.SEQUENCE] == 1, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.SCENARIO] == 1, time=10)
assert_true_after_time(lambda: all_evts.operation_collected[EventOperation.CREATION] == 6, time=10)
all_evts.stop()


def test_no_event_published_for_getting_scenario():
input_config = Config.configure_data_node("the_input")
output_config = Config.configure_data_node("the_output")
task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
sc_config = Config.configure_scenario(
"the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
)
scenario = tp.create_scenario(sc_config)

register_id_0, register_queue_0 = Notifier.register()
all_evts = AllCoreEventConsumer(register_id_0, register_queue_0)
all_evts.start()
# Get all scenarios does not trigger any event
tp.get_scenarios()
assert_true_after_time(lambda: all_evts.event_collected == 6, time=10)
assert_true_after_time(lambda: all_evts.event_collected == 0, time=10)

# Get one scenario does not trigger any event
sc = tp.get(scenario.id)
assert_true_after_time(lambda: all_evts.event_collected == 6, time=10)
assert_true_after_time(lambda: all_evts.event_collected == 0, time=10)

# Write input manually trigger 4 data node update events (for last_edit_date, editor_id, editor_expiration_date
# and edit_in_progress)
sc.the_input.write("test")
assert_true_after_time(lambda: all_evts.event_collected == 10, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.CYCLE] == 1, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.DATA_NODE] == 6, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.TASK] == 1, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.SEQUENCE] == 1, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.SCENARIO] == 1, time=10)
assert_true_after_time(lambda: all_evts.operation_collected[EventOperation.CREATION] == 6, time=10)
all_evts.stop()


def test_events_published_for_writing_dn():
input_config = Config.configure_data_node("the_input")
output_config = Config.configure_data_node("the_output")
task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
sc_config = Config.configure_scenario(
"the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
)
scenario = tp.create_scenario(sc_config)
register_id_0, register_queue_0 = Notifier.register()
all_evts = AllCoreEventConsumer(register_id_0, register_queue_0)
all_evts.start()

# Write input manually trigger 4 data node update events
# for last_edit_date, editor_id, editor_expiration_date and edit_in_progress
scenario.the_input.write("test")
assert_true_after_time(lambda: all_evts.event_collected == 4, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.DATA_NODE] == 4, time=10)
assert_true_after_time(lambda: all_evts.operation_collected[EventOperation.UPDATE] == 4, time=10)
assert_true_after_time(lambda: all_evts.attr_name_collected["last_edit_date"] == 1, time=10)
assert_true_after_time(lambda: all_evts.attr_name_collected["editor_id"] == 1, time=10)
assert_true_after_time(lambda: all_evts.attr_name_collected["editor_expiration_date"] == 1, time=10)
assert_true_after_time(lambda: all_evts.attr_name_collected["edit_in_progress"] == 1, time=10)
all_evts.stop()

# Submit a scenario triggers 12 events:

def test_events_published_for_scenario_submission():
input_config = Config.configure_data_node("the_input")
output_config = Config.configure_data_node("the_output")
task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
sc_config = Config.configure_scenario(
"the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
)
scenario = tp.create_scenario(sc_config)
scenario.the_input.write("test")
register_id_0, register_queue_0 = Notifier.register()
all_evts = AllCoreEventConsumer(register_id_0, register_queue_0)
all_evts.start()
# Submit a scenario triggers:
# 1 scenario submission event
# 7 data node update events (for last_edit_date, editor_id(x2), editor_expiration_date(x2) and edit_in_progress(x2))
# 7 dn update events (for last_edit_date, editor_id(x2), editor_expiration_date(x2) and edit_in_progress(x2))
# 1 job creation event
# 3 job update events (for status: PENDING, RUNNING and COMPLETED)
# 1 submission creation event
# 1 submission update event for jobs
# 3 submission update events (for status: PENDING, RUNNING and COMPLETED)
scenario.submit()
assert_true_after_time(lambda: all_evts.event_collected == 17, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.SCENARIO] == 1, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.DATA_NODE] == 7, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.JOB] == 4, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.SUBMISSION] == 5, time=10)

sc.submit()
assert_true_after_time(lambda: all_evts.event_collected == 30, time=10)
assert_true_after_time(lambda: all_evts.operation_collected[EventOperation.SUBMISSION] == 1, time=10)
assert_true_after_time(lambda: all_evts.operation_collected[EventOperation.CREATION] == 2, time=10)
assert_true_after_time(lambda: all_evts.operation_collected[EventOperation.UPDATE] == 14, time=10)

assert_true_after_time(lambda: all_evts.attr_name_collected["last_edit_date"] == 1, time=10)
assert_true_after_time(lambda: all_evts.attr_name_collected["editor_id"] == 2, time=10)
assert_true_after_time(lambda: all_evts.attr_name_collected["editor_expiration_date"] == 2, time=10)
assert_true_after_time(lambda: all_evts.attr_name_collected["edit_in_progress"] == 2, time=10)
assert_true_after_time(lambda: all_evts.attr_name_collected["status"] == 3, time=10)
assert_true_after_time(lambda: all_evts.attr_name_collected["jobs"] == 1, time=10)
assert_true_after_time(lambda: all_evts.attr_name_collected["submission_status"] == 3, time=10)
all_evts.stop()


def test_events_published_for_scenario_deletion():
input_config = Config.configure_data_node("the_input")
output_config = Config.configure_data_node("the_output")
task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
sc_config = Config.configure_scenario(
"the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
)
scenario = tp.create_scenario(sc_config)
scenario.the_input.write("test")
scenario.submit()
register_id_0, register_queue_0 = Notifier.register()
all_evts = AllCoreEventConsumer(register_id_0, register_queue_0)
all_evts.start()
# Delete a scenario trigger 8 deletion events
# 1 scenario deletion event
# 1 cycle deletion event
# 2 dn deletion events (for input and output)
# 1 task deletion event
# 1 sequence deletion event
# 1 job deletion event
# 1 submission deletion event
tp.delete(scenario.id)
assert_true_after_time(lambda: all_evts.event_collected == 8, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.CYCLE] == 1, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.DATA_NODE] == 13, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.DATA_NODE] == 2, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.TASK] == 1, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.SEQUENCE] == 1, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.SCENARIO] == 2, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.JOB] == 4, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.SUBMISSION] == 8, time=10)
assert_true_after_time(lambda: all_evts.operation_collected[EventOperation.CREATION] == 8, time=10)
assert_true_after_time(lambda: all_evts.operation_collected[EventOperation.UPDATE] == 21, time=10)
assert_true_after_time(lambda: all_evts.operation_collected[EventOperation.SUBMISSION] == 1, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.SCENARIO] == 1, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.JOB] == 1, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.SUBMISSION] == 1, time=10)

# Delete a scenario trigger 7 update events
tp.delete(scenario.id)
assert_true_after_time(lambda: all_evts.event_collected == 37, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.CYCLE] == 2, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.DATA_NODE] == 15, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.TASK] == 2, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.SEQUENCE] == 2, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.SCENARIO] == 3, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.JOB] == 5, time=10)
assert_true_after_time(lambda: all_evts.entity_type_collected[EventEntityType.SUBMISSION] == 8, time=10)
assert_true_after_time(lambda: all_evts.operation_collected[EventOperation.CREATION] == 8, time=10)
assert_true_after_time(lambda: all_evts.operation_collected[EventOperation.UPDATE] == 21, time=10)
assert_true_after_time(lambda: all_evts.operation_collected[EventOperation.SUBMISSION] == 1, time=10)
assert_true_after_time(lambda: all_evts.operation_collected[EventOperation.DELETION] == 7, time=10)
assert_true_after_time(lambda: all_evts.operation_collected[EventOperation.DELETION] == 8, time=10)

all_evts.stop()
2 changes: 1 addition & 1 deletion tests/core/submission/test_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __test_update_submission_status(job_ids, job_statuses, expected_submission_s
submission.jobs = jobs

assert submission.submission_status == SubmissionStatus.UNDEFINED
submission.update_submission_status()
submission._update_submission_status()
assert submission.submission_status == expected_submission_status


Expand Down

0 comments on commit acb6521

Please sign in to comment.