Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[🐛 BUG] Never receiving Data Node notifications/update with Core event #2319

Closed
1 of 7 tasks
FlorianJacta opened this issue Dec 11, 2024 · 0 comments · Fixed by #2316
Closed
1 of 7 tasks

[🐛 BUG] Never receiving Data Node notifications/update with Core event #2319

FlorianJacta opened this issue Dec 11, 2024 · 0 comments · Fixed by #2316
Assignees
Labels
Core: Job & Orchestrator Core Related to Taipy Core 💥Malfunction Addresses an identified problem. 🟥 Priority: Critical Must be addressed as soon as possible 🔒 Staff only Can only be assigned to the Taipy R&D team

Comments

@FlorianJacta
Copy link
Member

FlorianJacta commented Dec 11, 2024

What went wrong? 🤔

Taipy doesn't always publish the event of a data node update. Therefore, when a data node is changed, creating simple code to update my UI through Core events is impossible.

Expected Behavior

I should receive all the appropriate events.

Steps to Reproduce Issue

Run this code and the scenario:

from taipy.core import Status
from taipy.core.notification import (
    CoreEventConsumerBase,
    Event,
    EventEntityType,
    EventOperation,
    Notifier,
)
import taipy as tp
import taipy.gui.builder as tgb
from taipy.gui import Gui, State, notify
from taipy.core import Scenario, DataNode
from taipy import Config

import time
import pandas as pd


def some_task_1(input_data):
    time.sleep(2)
    print "hello world"
    return "hello world"


def some_task_2(intermediate_data):
    time.sleep(2)
    print f"{intermediate_data} from Taipy!"
    return f"{intermediate_data} from Taipy!"


Config.configure_job_executions(mode="standalone", max_nb_of_workers=2)
input_data_cfg = Config.configure_data_node(
    id="input_data", storage_type="csv", default_data={"x": [1, 2, 3], "y": [4, 5, 6]}
)
intermediate_data_cfg = Config.configure_data_node(id="intermediate_data")
output_data_cfg = Config.configure_data_node(id="output_data")
task_1_cfg = Config.configure_task(
    id="task_1",
    function=some_task_1,
    input=input_data_cfg,
    output=intermediate_data_cfg,
    skippable=True,
)
task_2_cfg = Config.configure_task(
    id="task_2",
    function=some_task_2,
    input=intermediate_data_cfg,
    output=output_data_cfg,
    skippable=True,
)
scenario_cfg = Config.configure_scenario(
    id="scenario",
    task_configs=[task_1_cfg, task_2_cfg],
)

def notify_user(state, data_node, scenarios):
    if [state.scenario.id](http://state.scenario.id/) in scenarios:
        if data_node.config_id == "input_data":
            state.input_data = data_node.read()
        if data_node.config_id == "intermediate_data":
            notify(state, "success", "Task 1 is finished!")
        if data_node.config_id == "output_data":
            state.output_data = data_node.read()
            notify(state, "success", "Submission is finished!")


class SpecificCoreConsumer(CoreEventConsumerBase):
    def __init__(self, gui: Gui):
        self.gui = gui

        reg_id, queue = Notifier.register(
            # attribute_name="last_edit_date",
        )  # Adapt the registration to the events you want to listen to
        super().__init__(reg_id, queue)

    def process_event(self, event: Event):
        if (
            event.entity_type == EventEntityType.DATA_NODE
            and event.operation == EventOperation.UPDATE
            and event.attribute_name == "last_edit_date"):
        ):
            print(
                f"Received event: {event.operation} - {event.entity_type} - {event.entity_id}\n"
                f"                {event.attribute_name} - {event.attribute_value}\n"
            )
            data_node: DataNode = tp.get(event.entity_id)
            scenarios = data_node.get_parents().get("scenario")
            self.gui.broadcast_callback(
                notify_user,
                [data_node, scenarios],
            )


def change_scenario(state: State):
    state.input_data = state.scenario.input_data.read()
    state.output_data = state.scenario.output_data.read()


def run_scenario(state: State):
    print("Running scenario...")
    state.scenario.submit()
    print("Done!")


def on_exception(state, function_name: str, ex: Exception):
    print(ex, "in ", function_name)


if __name__ == "__main__":
    tp.Orchestrator().run()
    scenario: Scenario = tp.create_scenario(scenario_cfg)

    input_data: pd.DataFrame = scenario.input_data.read()
    output_data: int = scenario.output_data.read()

    with [tgb.Page](http://tgb.page/)() as page:
        tgb.scenario_selector(scenario, on_change=change_scenario)
        tgb.button("Run Scenario", on_action=run_scenario)
        tgb.chart("{input_data}")
        tgb.input("{output_data}")

    gui = Gui(page)
    SpecificCoreConsumer(gui).start()
    gui.run(debug=True)

Versions of Taipy

  • develop
  • 4.0.1

Acceptance Criteria

  • A unit test reproducing the bug is added.
  • Any new code is covered by a unit tested.
  • Check code coverage is at least 90%.
  • The bug reporter validated the fix.
  • Related issue(s) in taipy-doc are created for documentation and Release Notes are updated.

Code of Conduct

  • I have checked the existing issues.
  • I am willing to work on this issue (optional)
@FlorianJacta FlorianJacta added 💥Malfunction Addresses an identified problem. Core Related to Taipy Core 🟥 Priority: Critical Must be addressed as soon as possible labels Dec 11, 2024
@jrobinAV jrobinAV self-assigned this Dec 11, 2024
@jrobinAV jrobinAV added Core: Job & Orchestrator 🔒 Staff only Can only be assigned to the Taipy R&D team labels Dec 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Core: Job & Orchestrator Core Related to Taipy Core 💥Malfunction Addresses an identified problem. 🟥 Priority: Critical Must be addressed as soon as possible 🔒 Staff only Can only be assigned to the Taipy R&D team
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants