Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Details tab blank for Mapped Tasks #36690

Closed
1 of 2 tasks
tanmaykansara opened this issue Jan 9, 2024 · 3 comments
Closed
1 of 2 tasks

Details tab blank for Mapped Tasks #36690

tanmaykansara opened this issue Jan 9, 2024 · 3 comments
Labels
area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet pending-response

Comments

@tanmaykansara
Copy link

Apache Airflow version

2.8.0

If "Other Airflow 2 version" selected, which one?

No response

What happened?

The details tab in previous versions had the information about number of tasks running/scheduled/ etc... In 2.8 it is blank

What you think should happen instead?

Information about tasks running/scheduled/queued should be shown, along with success and failed numbers.

How to reproduce

image
When on the mapped tasks task, go to the details tab

Operating System

EKS Amazon AMI

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@tanmaykansara tanmaykansara added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jan 9, 2024
@aritra24
Copy link
Collaborator

aritra24 commented Jan 9, 2024

@tanmaykansara could you share any specific steps to repro? I do not see this behaviour on using the sample dag here on latest main.

@tanmaykansara
Copy link
Author

tanmaykansara commented Jan 9, 2024

HI,
here is the code snippet that I can share:

`
def get_work_items_task():
logger.info(f"secret_name:{secret_name}")
logger.info(f"bucket={bucket}")
social_posts_list = []

      # Dynamically import the SQL based on the variable 'sql_to_use'
      sql_module = importlib.import_module(f'includes.sql.update_social_posts.{sql_to_use}')
      if not hasattr(sql_module, 'sql'):
          raise ImportError(f"No SQL found in module {sql_to_use}")
      sql = getattr(sql_module, 'sql')
      logger.info(f"Additional Params:{additional_params}")
      results = snowflake_helpers.select_data_from_snowflake(secret_name, sql, additional_params, dag_id=dag_id, calling_function_name="get_work_items_task")
      chunks = generic_helpers.split_results_into_batches(results, concurrency, batch_size)
      return chunks
@task(multiple_outputs=True, executor_config=executor_config_template, execution_timeout=timedelta(seconds=task_execution_timeout))
    def ingest_data_task(batch):
        logger.info(f"len(batch)={len(batch)}")
        logger.info(batch_size)
        logger.info(f"batch:{batch}")

    

      # Iterate over the 'batch' sequence in increments of 'batch_size'
      for data in batch:
          try:
              id = data[0]
              platform_post_id = data[1]
              platform_url = data[2]
              handle_name = data[3]
              next_scheduled_update = data[4]

              platform_code = ""
              if ('instagram.com' in platform_url):
                  url_short_parts = platform_url.rstrip('/').split('/')
                  platform_code = url_short_parts[-1]

              # ingest social media feed data from the platform for this social post.
              social_ingest_helpers.ingest_social_post_data(
                 bunch_of_params
              )
          except Exception as ex:
              exception_message = str(ex)
              exception_name = ex.__class__.__name__
              stack_trace = traceback.format_exc()

              error_logger.add_error_to_list(
                  error_logger.create_error_entry
                  (
                      "ERROR", dag_id, 
                      "ingest_data_task",
                      error_logger_name,
                      exception_message, exception_name, stack_trace,
                      json.dumps(data)
              ))
      
          #Log all errors to Snowflake that have already been added to the error_loggers internal list of errors.
          error_logger.log_errors_to_snowflake(core_database, secret_name)

    return {
        'status': 'complete'
    }
get_work_items = get_work_items_task()
ingest_data = ingest_data_task.partial().expand(batch=get_work_items)

`

When I see the details tab of the mapped task, it is blank. In 2.7, I did see the stats for the job.
Hope this helps

@eladkal
Copy link
Contributor

eladkal commented Jan 9, 2024

duplicate of #36481
it's already solved in main and will be fixed in 2.8.1

@eladkal eladkal closed this as not planned Won't fix, can't repro, duplicate, stale Jan 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet pending-response
Projects
None yet
Development

No branches or pull requests

3 participants