Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] Basic structured logging #47210

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

omatthew98
Copy link
Contributor

@omatthew98 omatthew98 commented Aug 19, 2024

Why are these changes needed?

Adds structured logging to Ray Data. This will allow users to configure logging to use any of the following:

  • A user's custom logging file (existing functionality)
  • A default TEXT logger (existing functionality)
  • A default JSON logger (new functionality)

Examples:

Code snippet:

import ray
import time

def f(x):
    time.sleep(0.1)
    return x

def g(x):
    time.sleep(1)
    return x

ray.data.range(100).map(f).map(g, num_cpus=0.1).materialize()

JSON logging (new)

Console output:

❯ RAY_DATA_LOG_ENCODING="JSON" python log.py
2024-09-10 14:00:26,672	INFO worker.py:1777 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
2024-09-10 14:00:27,688	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-09-10_14-00-25_720845_96934/logs/ray-data
2024-09-10 14:00:27,688	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->Map(f)] -> TaskPoolMapOperator[Map(g)]
✔️  Dataset execution finished in 9.96 seconds: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100/100 [00:09<00:00, 10.1 row/s]
- ReadRange->Map(f): 0 active, 0 queued, [cpu: 0.0, objects: 0.0B]: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100/100 [00:09<00:00, 10.1 row/s]
- Map(g): 0 active, 0 queued, [cpu: 0.0, objects: 64.0B]: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100/100 [00:09<00:00, 10.1 row/s]

ray-data.log:

{"asctime": "2024-09-10 14:00:27,167", "levelname": "DEBUG", "message": "Autodetected parallelism=24 based on estimated_available_cpus=12 and estimated_data_size=800.", "filename": "util.py", "lineno": 203}
{"asctime": "2024-09-10 14:00:27,687", "levelname": "DEBUG", "message": "Autodetected parallelism=24 based on estimated_available_cpus=12 and estimated_data_size=800.", "filename": "util.py", "lineno": 203}
{"asctime": "2024-09-10 14:00:27,688", "levelname": "DEBUG", "message": "Expected in-memory size 800, block size 32.0", "filename": "set_read_parallelism.py", "lineno": 34}
{"asctime": "2024-09-10 14:00:27,688", "levelname": "DEBUG", "message": "Size based split factor 1", "filename": "set_read_parallelism.py", "lineno": 42}
{"asctime": "2024-09-10 14:00:27,688", "levelname": "DEBUG", "message": "Blocks after size splits 25", "filename": "set_read_parallelism.py", "lineno": 44}
{"asctime": "2024-09-10 14:00:27,688", "levelname": "DEBUG", "message": "Using autodetected parallelism=24 for operator ReadRange to satisfy parallelism at least twice the available number of CPUs (12).", "filename": "set_read_parallelism.py", "lineno": 117}
{"asctime": "2024-09-10 14:00:27,688", "levelname": "DEBUG", "message": "Estimated num output blocks 25", "filename": "set_read_parallelism.py", "lineno": 132}
{"asctime": "2024-09-10 14:00:27,688", "levelname": "INFO", "message": "Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-09-10_14-00-25_720845_96934/logs/ray-data", "filename": "streaming_executor.py", "lineno": 108}
{"asctime": "2024-09-10 14:00:27,688", "levelname": "INFO", "message": "Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->Map(f)] -> TaskPoolMapOperator[Map(g)]", "filename": "streaming_executor.py", "lineno": 109}
{"asctime": "2024-09-10 14:00:27,688", "levelname": "DEBUG", "message": "Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=inf, gpu=inf, object_store_memory=inf), exclude_resources=ExecutionResources(cpu=0.0, gpu=0.0, object_store_memory=0.0B), locality_with_output=False, preserve_order=False, actor_locality_enabled=False, verbose_progress=True)", "filename": "streaming_executor.py", "lineno": 111}
{"asctime": "2024-09-10 14:00:27,717", "levelname": "DEBUG", "message": "ConcurrencyCapBackpressurePolicy initialized with: {InputDataBuffer[Input]: inf, InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->Map(f)]: inf, InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->Map(f)] -> TaskPoolMapOperator[Map(g)]: inf}", "filename": "concurrency_cap_backpressure_policy.py", "lineno": 37}
{"asctime": "2024-09-10 14:00:27,728", "levelname": "DEBUG", "message": "Operator Metrics:\nInput: {'num_inputs_received': 25, 'bytes_inputs_received': 66350, 'num_outputs_taken': 25, 'bytes_outputs_taken': 66350, 'task_submission_backpressure_time': 0, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_used': 0, 'cpu_usage': 0, 'gpu_usage': 0}\nReadRange->Map(f): {'num_inputs_received': 1, 'bytes_inputs_received': 2654, 'num_task_inputs_processed': 0, 'bytes_task_inputs_processed': 0, 'bytes_inputs_of_submitted_tasks': 2654, 'num_task_outputs_generated': 0, 'bytes_task_outputs_generated': 0, 'rows_task_outputs_generated': 0, 'num_outputs_taken': 0, 'bytes_outputs_taken': 0, 'num_outputs_of_finished_tasks': 0, 'bytes_outputs_of_finished_tasks': 0, 'num_tasks_submitted': 1, 'num_tasks_running': 1, 'num_tasks_have_outputs': 0, 'num_tasks_finished': 0, 'num_tasks_failed': 0, 'block_generation_time': 0, 'task_submission_backpressure_time': 0, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_pending_task_inputs': 2654, 'obj_store_mem_freed': 0, 'obj_store_mem_spilled': 0, 'obj_store_mem_used': 268435456, 'cpu_usage': 1, 'gpu_usage': 0, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}\nMap(g): {'num_inputs_received': 0, 'bytes_inputs_received': 0, 'num_task_inputs_processed': 0, 'bytes_task_inputs_processed': 0, 'bytes_inputs_of_submitted_tasks': 0, 'num_task_outputs_generated': 0, 'bytes_task_outputs_generated': 0, 'rows_task_outputs_generated': 0, 'num_outputs_taken': 0, 'bytes_outputs_taken': 0, 'num_outputs_of_finished_tasks': 0, 'bytes_outputs_of_finished_tasks': 0, 'num_tasks_submitted': 0, 'num_tasks_running': 0, 'num_tasks_have_outputs': 0, 'num_tasks_finished': 0, 'num_tasks_failed': 0, 'block_generation_time': 0, 'task_submission_backpressure_time': 0, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_pending_task_inputs': 0, 'obj_store_mem_freed': 0, 'obj_store_mem_spilled': 0, 'obj_store_mem_used': 0, 'cpu_usage': 0, 'gpu_usage': 0, 'ray_remote_args': {'num_cpus': 0.1}}\n", "filename": "streaming_executor.py", "lineno": 478}
{"asctime": "2024-09-10 14:00:27,728", "levelname": "DEBUG", "message": "Execution Progress:", "filename": "streaming_executor.py", "lineno": 461}
{"asctime": "2024-09-10 14:00:27,728", "levelname": "DEBUG", "message": "0: - Input: 0 active, 0 queued, [cpu: 0.0, objects: 0.0B], Blocks Outputted: 25/25", "filename": "streaming_executor.py", "lineno": 463}
{"asctime": "2024-09-10 14:00:27,728", "levelname": "DEBUG", "message": "1: - ReadRange->Map(f): 1 active, 24 queued \ud83d\udea7, [cpu: 1.0, objects: 256.0MB], Blocks Outputted: 0/None", "filename": "streaming_executor.py", "lineno": 463}
{"asctime": "2024-09-10 14:00:27,728", "levelname": "DEBUG", "message": "2: - Map(g): 0 active, 0 queued, [cpu: 0.0, objects: 0.0B], Blocks Outputted: 0/None", "filename": "streaming_executor.py", "lineno": 463}
{"asctime": "2024-09-10 14:00:27,728", "levelname": "DEBUG", "message": "Operator InputDataBuffer[Input] completed. Operator Metrics:\n{'num_inputs_received': 25, 'bytes_inputs_received': 66350, 'num_outputs_taken': 25, 'bytes_outputs_taken': 66350, 'task_submission_backpressure_time': 0, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_used': 0, 'cpu_usage': 0, 'gpu_usage': 0}", "filename": "streaming_executor.py", "lineno": 338}
{"asctime": "2024-09-10 14:00:30,214", "levelname": "DEBUG", "message": "Operator InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->Map(f)] completed. Operator Metrics:\n{'num_inputs_received': 25, 'bytes_inputs_received': 66350, 'num_task_inputs_processed': 25, 'bytes_task_inputs_processed': 66350, 'bytes_inputs_of_submitted_tasks': 66350, 'num_task_outputs_generated': 25, 'bytes_task_outputs_generated': 800, 'rows_task_outputs_generated': 100, 'num_outputs_taken': 25, 'bytes_outputs_taken': 800, 'num_outputs_of_finished_tasks': 25, 'bytes_outputs_of_finished_tasks': 800, 'num_tasks_submitted': 25, 'num_tasks_running': 0, 'num_tasks_have_outputs': 25, 'num_tasks_finished': 25, 'num_tasks_failed': 0, 'block_generation_time': 10.366568419999997, 'task_submission_backpressure_time': 2.263392420000001, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_pending_task_inputs': 0, 'obj_store_mem_freed': 66350, 'obj_store_mem_spilled': 0, 'obj_store_mem_used': 800, 'cpu_usage': 0, 'gpu_usage': 0, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}", "filename": "streaming_executor.py", "lineno": 338}
{"asctime": "2024-09-10 14:00:32,757", "levelname": "DEBUG", "message": "Operator Metrics:\nInput: {'num_inputs_received': 25, 'bytes_inputs_received': 66350, 'num_outputs_taken': 25, 'bytes_outputs_taken': 66350, 'task_submission_backpressure_time': 0, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_used': 0, 'cpu_usage': 0, 'gpu_usage': 0}\nReadRange->Map(f): {'num_inputs_received': 25, 'bytes_inputs_received': 66350, 'num_task_inputs_processed': 25, 'bytes_task_inputs_processed': 66350, 'bytes_inputs_of_submitted_tasks': 66350, 'num_task_outputs_generated': 25, 'bytes_task_outputs_generated': 800, 'rows_task_outputs_generated': 100, 'num_outputs_taken': 25, 'bytes_outputs_taken': 800, 'num_outputs_of_finished_tasks': 25, 'bytes_outputs_of_finished_tasks': 800, 'num_tasks_submitted': 25, 'num_tasks_running': 0, 'num_tasks_have_outputs': 25, 'num_tasks_finished': 25, 'num_tasks_failed': 0, 'block_generation_time': 10.366568419999997, 'task_submission_backpressure_time': 2.263392420000001, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_pending_task_inputs': 0, 'obj_store_mem_freed': 66350, 'obj_store_mem_spilled': 0, 'obj_store_mem_used': 768, 'cpu_usage': 0, 'gpu_usage': 0, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}\nMap(g): {'num_inputs_received': 25, 'bytes_inputs_received': 800, 'num_task_inputs_processed': 1, 'bytes_task_inputs_processed': 32, 'bytes_inputs_of_submitted_tasks': 800, 'num_task_outputs_generated': 1, 'bytes_task_outputs_generated': 32, 'rows_task_outputs_generated': 4, 'num_outputs_taken': 1, 'bytes_outputs_taken': 32, 'num_outputs_of_finished_tasks': 1, 'bytes_outputs_of_finished_tasks': 32, 'num_tasks_submitted': 25, 'num_tasks_running': 24, 'num_tasks_have_outputs': 1, 'num_tasks_finished': 1, 'num_tasks_failed': 0, 'block_generation_time': 4.012629125, 'task_submission_backpressure_time': 3.374947209, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_pending_task_inputs': 768, 'obj_store_mem_freed': 32, 'obj_store_mem_spilled': 0, 'obj_store_mem_used': 768.0, 'cpu_usage': 2.4000000000000004, 'gpu_usage': 0, 'ray_remote_args': {'num_cpus': 0.1, 'scheduling_strategy': 'SPREAD'}}\n", "filename": "streaming_executor.py", "lineno": 478}
{"asctime": "2024-09-10 14:00:32,758", "levelname": "DEBUG", "message": "Execution Progress:", "filename": "streaming_executor.py", "lineno": 461}
{"asctime": "2024-09-10 14:00:32,758", "levelname": "DEBUG", "message": "0: - Input: 0 active, 0 queued, [cpu: 0.0, objects: 0.0B], Blocks Outputted: 25/25", "filename": "streaming_executor.py", "lineno": 463}
{"asctime": "2024-09-10 14:00:32,758", "levelname": "DEBUG", "message": "1: - ReadRange->Map(f): 0 active, 0 queued, [cpu: 0.0, objects: 768.0B], Blocks Outputted: 25/25", "filename": "streaming_executor.py", "lineno": 463}
{"asctime": "2024-09-10 14:00:32,758", "levelname": "DEBUG", "message": "2: - Map(g): 24 active, 0 queued, [cpu: 2.4, objects: 768.0B], Blocks Outputted: 1/25", "filename": "streaming_executor.py", "lineno": 463}
{"asctime": "2024-09-10 14:00:37,636", "levelname": "DEBUG", "message": "Operator InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->Map(f)] -> TaskPoolMapOperator[Map(g)] completed. Operator Metrics:\n{'num_inputs_received': 25, 'bytes_inputs_received': 800, 'num_task_inputs_processed': 25, 'bytes_task_inputs_processed': 800, 'bytes_inputs_of_submitted_tasks': 800, 'num_task_outputs_generated': 25, 'bytes_task_outputs_generated': 800, 'rows_task_outputs_generated': 100, 'num_outputs_taken': 25, 'bytes_outputs_taken': 800, 'num_outputs_of_finished_tasks': 25, 'bytes_outputs_of_finished_tasks': 800, 'num_tasks_submitted': 25, 'num_tasks_running': 0, 'num_tasks_have_outputs': 25, 'num_tasks_finished': 25, 'num_tasks_failed': 0, 'block_generation_time': 100.43680124699999, 'task_submission_backpressure_time': 3.374947209, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_pending_task_inputs': 0, 'obj_store_mem_freed': 800, 'obj_store_mem_spilled': 0, 'obj_store_mem_used': 64, 'cpu_usage': 0, 'gpu_usage': 0, 'ray_remote_args': {'num_cpus': 0.1, 'scheduling_strategy': 'SPREAD'}}", "filename": "streaming_executor.py", "lineno": 338}
{"asctime": "2024-09-10 14:00:37,644", "levelname": "DEBUG", "message": "Shutting down <StreamingExecutor(StreamingExecutor-b43b55dc880c4a2e95530c5a190e510d, stopped daemon 12952186880)>.", "filename": "streaming_executor.py", "lineno": 182}

TEXT logging (unchanged)

Console output:

❯ RAY_DATA_LOG_ENCODING="TEXT" python log.py
2024-09-10 14:02:09,407	INFO worker.py:1777 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
2024-09-10 14:02:10,353	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-09-10_14-02-08_613994_98769/logs/ray-data
2024-09-10 14:02:10,353	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->Map(f)] -> TaskPoolMapOperator[Map(g)]
✔️  Dataset execution finished in 9.86 seconds: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100/100 [00:09<00:00, 10.1 row/s]
- ReadRange->Map(f): 0 active, 0 queued, [cpu: 0.0, objects: 0.0B]: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100/100 [00:09<00:00, 10.2 row/s]
- Map(g): 0 active, 0 queued, [cpu: 0.0, objects: 96.0B]: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100/100 [00:09<00:00, 10.2 row/s]

ray-data.log:

2024-09-10 14:02:09,843	DEBUG util.py:203 -- Autodetected parallelism=24 based on estimated_available_cpus=12 and estimated_data_size=800.
2024-09-10 14:02:10,353	DEBUG util.py:203 -- Autodetected parallelism=24 based on estimated_available_cpus=12 and estimated_data_size=800.
2024-09-10 14:02:10,353	DEBUG set_read_parallelism.py:34 -- Expected in-memory size 800, block size 32.0
2024-09-10 14:02:10,353	DEBUG set_read_parallelism.py:42 -- Size based split factor 1
2024-09-10 14:02:10,353	DEBUG set_read_parallelism.py:44 -- Blocks after size splits 25
2024-09-10 14:02:10,353	DEBUG set_read_parallelism.py:117 -- Using autodetected parallelism=24 for operator ReadRange to satisfy parallelism at least twice the available number of CPUs (12).
2024-09-10 14:02:10,353	DEBUG set_read_parallelism.py:132 -- Estimated num output blocks 25
2024-09-10 14:02:10,353	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-09-10_14-02-08_613994_98769/logs/ray-data
2024-09-10 14:02:10,353	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->Map(f)] -> TaskPoolMapOperator[Map(g)]
2024-09-10 14:02:10,353	DEBUG streaming_executor.py:111 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=inf, gpu=inf, object_store_memory=inf), exclude_resources=ExecutionResources(cpu=0.0, gpu=0.0, object_store_memory=0.0B), locality_with_output=False, preserve_order=False, actor_locality_enabled=False, verbose_progress=True)
2024-09-10 14:02:10,383	DEBUG concurrency_cap_backpressure_policy.py:37 -- ConcurrencyCapBackpressurePolicy initialized with: {InputDataBuffer[Input]: inf, InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->Map(f)]: inf, InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->Map(f)] -> TaskPoolMapOperator[Map(g)]: inf}
2024-09-10 14:02:10,393	DEBUG streaming_executor.py:478 -- Operator Metrics:
Input: {'num_inputs_received': 25, 'bytes_inputs_received': 66350, 'num_outputs_taken': 25, 'bytes_outputs_taken': 66350, 'task_submission_backpressure_time': 0, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_used': 0, 'cpu_usage': 0, 'gpu_usage': 0}
ReadRange->Map(f): {'num_inputs_received': 1, 'bytes_inputs_received': 2654, 'num_task_inputs_processed': 0, 'bytes_task_inputs_processed': 0, 'bytes_inputs_of_submitted_tasks': 2654, 'num_task_outputs_generated': 0, 'bytes_task_outputs_generated': 0, 'rows_task_outputs_generated': 0, 'num_outputs_taken': 0, 'bytes_outputs_taken': 0, 'num_outputs_of_finished_tasks': 0, 'bytes_outputs_of_finished_tasks': 0, 'num_tasks_submitted': 1, 'num_tasks_running': 1, 'num_tasks_have_outputs': 0, 'num_tasks_finished': 0, 'num_tasks_failed': 0, 'block_generation_time': 0, 'task_submission_backpressure_time': 0, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_pending_task_inputs': 2654, 'obj_store_mem_freed': 0, 'obj_store_mem_spilled': 0, 'obj_store_mem_used': 268435456, 'cpu_usage': 1, 'gpu_usage': 0, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}
Map(g): {'num_inputs_received': 0, 'bytes_inputs_received': 0, 'num_task_inputs_processed': 0, 'bytes_task_inputs_processed': 0, 'bytes_inputs_of_submitted_tasks': 0, 'num_task_outputs_generated': 0, 'bytes_task_outputs_generated': 0, 'rows_task_outputs_generated': 0, 'num_outputs_taken': 0, 'bytes_outputs_taken': 0, 'num_outputs_of_finished_tasks': 0, 'bytes_outputs_of_finished_tasks': 0, 'num_tasks_submitted': 0, 'num_tasks_running': 0, 'num_tasks_have_outputs': 0, 'num_tasks_finished': 0, 'num_tasks_failed': 0, 'block_generation_time': 0, 'task_submission_backpressure_time': 0, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_pending_task_inputs': 0, 'obj_store_mem_freed': 0, 'obj_store_mem_spilled': 0, 'obj_store_mem_used': 0, 'cpu_usage': 0, 'gpu_usage': 0, 'ray_remote_args': {'num_cpus': 0.1}}

2024-09-10 14:02:10,393	DEBUG streaming_executor.py:461 -- Execution Progress:
2024-09-10 14:02:10,393	DEBUG streaming_executor.py:463 -- 0: - Input: 0 active, 0 queued, [cpu: 0.0, objects: 0.0B], Blocks Outputted: 25/25
2024-09-10 14:02:10,393	DEBUG streaming_executor.py:463 -- 1: - ReadRange->Map(f): 1 active, 24 queued 🚧, [cpu: 1.0, objects: 256.0MB], Blocks Outputted: 0/None
2024-09-10 14:02:10,393	DEBUG streaming_executor.py:463 -- 2: - Map(g): 0 active, 0 queued, [cpu: 0.0, objects: 0.0B], Blocks Outputted: 0/None
2024-09-10 14:02:10,393	DEBUG streaming_executor.py:338 -- Operator InputDataBuffer[Input] completed. Operator Metrics:
{'num_inputs_received': 25, 'bytes_inputs_received': 66350, 'num_outputs_taken': 25, 'bytes_outputs_taken': 66350, 'task_submission_backpressure_time': 0, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_used': 0, 'cpu_usage': 0, 'gpu_usage': 0}
2024-09-10 14:02:13,165	DEBUG streaming_executor.py:338 -- Operator InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->Map(f)] completed. Operator Metrics:
{'num_inputs_received': 25, 'bytes_inputs_received': 66350, 'num_task_inputs_processed': 25, 'bytes_task_inputs_processed': 66350, 'bytes_inputs_of_submitted_tasks': 66350, 'num_task_outputs_generated': 25, 'bytes_task_outputs_generated': 800, 'rows_task_outputs_generated': 100, 'num_outputs_taken': 25, 'bytes_outputs_taken': 800, 'num_outputs_of_finished_tasks': 25, 'bytes_outputs_of_finished_tasks': 800, 'num_tasks_submitted': 25, 'num_tasks_running': 0, 'num_tasks_have_outputs': 25, 'num_tasks_finished': 25, 'num_tasks_failed': 0, 'block_generation_time': 10.35352417, 'task_submission_backpressure_time': 2.240482626999999, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_pending_task_inputs': 0, 'obj_store_mem_freed': 66350, 'obj_store_mem_spilled': 0, 'obj_store_mem_used': 800, 'cpu_usage': 0, 'gpu_usage': 0, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}
2024-09-10 14:02:15,483	DEBUG streaming_executor.py:478 -- Operator Metrics:
Input: {'num_inputs_received': 25, 'bytes_inputs_received': 66350, 'num_outputs_taken': 25, 'bytes_outputs_taken': 66350, 'task_submission_backpressure_time': 0, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_used': 0, 'cpu_usage': 0, 'gpu_usage': 0}
ReadRange->Map(f): {'num_inputs_received': 25, 'bytes_inputs_received': 66350, 'num_task_inputs_processed': 25, 'bytes_task_inputs_processed': 66350, 'bytes_inputs_of_submitted_tasks': 66350, 'num_task_outputs_generated': 25, 'bytes_task_outputs_generated': 800, 'rows_task_outputs_generated': 100, 'num_outputs_taken': 25, 'bytes_outputs_taken': 800, 'num_outputs_of_finished_tasks': 25, 'bytes_outputs_of_finished_tasks': 800, 'num_tasks_submitted': 25, 'num_tasks_running': 0, 'num_tasks_have_outputs': 25, 'num_tasks_finished': 25, 'num_tasks_failed': 0, 'block_generation_time': 10.35352417, 'task_submission_backpressure_time': 2.240482626999999, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_pending_task_inputs': 0, 'obj_store_mem_freed': 66350, 'obj_store_mem_spilled': 0, 'obj_store_mem_used': 768, 'cpu_usage': 0, 'gpu_usage': 0, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}
Map(g): {'num_inputs_received': 25, 'bytes_inputs_received': 800, 'num_task_inputs_processed': 1, 'bytes_task_inputs_processed': 32, 'bytes_inputs_of_submitted_tasks': 800, 'num_task_outputs_generated': 1, 'bytes_task_outputs_generated': 32, 'rows_task_outputs_generated': 4, 'num_outputs_taken': 1, 'bytes_outputs_taken': 32, 'num_outputs_of_finished_tasks': 1, 'bytes_outputs_of_finished_tasks': 32, 'num_tasks_submitted': 25, 'num_tasks_running': 24, 'num_tasks_have_outputs': 1, 'num_tasks_finished': 1, 'num_tasks_failed': 0, 'block_generation_time': 4.0134345, 'task_submission_backpressure_time': 3.350887582999999, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_pending_task_inputs': 768, 'obj_store_mem_freed': 32, 'obj_store_mem_spilled': 0, 'obj_store_mem_used': 768.0, 'cpu_usage': 2.4000000000000004, 'gpu_usage': 0, 'ray_remote_args': {'num_cpus': 0.1, 'scheduling_strategy': 'SPREAD'}}

2024-09-10 14:02:15,483	DEBUG streaming_executor.py:461 -- Execution Progress:
2024-09-10 14:02:15,483	DEBUG streaming_executor.py:463 -- 0: - Input: 0 active, 0 queued, [cpu: 0.0, objects: 0.0B], Blocks Outputted: 25/25
2024-09-10 14:02:15,483	DEBUG streaming_executor.py:463 -- 1: - ReadRange->Map(f): 0 active, 0 queued, [cpu: 0.0, objects: 768.0B], Blocks Outputted: 25/25
2024-09-10 14:02:15,483	DEBUG streaming_executor.py:463 -- 2: - Map(g): 24 active, 0 queued, [cpu: 2.4, objects: 768.0B], Blocks Outputted: 1/25
2024-09-10 14:02:20,214	DEBUG streaming_executor.py:338 -- Operator InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->Map(f)] -> TaskPoolMapOperator[Map(g)] completed. Operator Metrics:
{'num_inputs_received': 25, 'bytes_inputs_received': 800, 'num_task_inputs_processed': 25, 'bytes_task_inputs_processed': 800, 'bytes_inputs_of_submitted_tasks': 800, 'num_task_outputs_generated': 25, 'bytes_task_outputs_generated': 800, 'rows_task_outputs_generated': 100, 'num_outputs_taken': 25, 'bytes_outputs_taken': 800, 'num_outputs_of_finished_tasks': 25, 'bytes_outputs_of_finished_tasks': 800, 'num_tasks_submitted': 25, 'num_tasks_running': 0, 'num_tasks_have_outputs': 25, 'num_tasks_finished': 25, 'num_tasks_failed': 0, 'block_generation_time': 100.446355003, 'task_submission_backpressure_time': 3.350887582999999, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_pending_task_inputs': 0, 'obj_store_mem_freed': 800, 'obj_store_mem_spilled': 0, 'obj_store_mem_used': 96, 'cpu_usage': 0, 'gpu_usage': 0, 'ray_remote_args': {'num_cpus': 0.1, 'scheduling_strategy': 'SPREAD'}}
2024-09-10 14:02:20,217	DEBUG streaming_executor.py:182 -- Shutting down <StreamingExecutor(StreamingExecutor-9091586451d54141b2ef511d6f302bde, stopped daemon 13086404608)>.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

python/ray/data/_internal/logging.py Outdated Show resolved Hide resolved
python/ray/data/_internal/logging.py Outdated Show resolved Hide resolved
python/ray/data/_internal/logging.py Outdated Show resolved Hide resolved
python/ray/data/_internal/logging_json.yaml Outdated Show resolved Hide resolved
python/ray/data/_internal/logging_json.yaml Outdated Show resolved Hide resolved
omatthew98 and others added 4 commits September 9, 2024 15:39
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Co-authored-by: Scott Lee <scottjlee@users.noreply.github.com>
Signed-off-by: Matthew Owen <omatthew98@berkeley.edu>
Co-authored-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Matthew Owen <omatthew98@berkeley.edu>
@omatthew98 omatthew98 force-pushed the mowen/ray-data-structured-logging branch from 18a921c to 7c6313d Compare September 9, 2024 22:40
@@ -0,0 +1,34 @@
version: 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this yaml file format? Is this somethign specific to ray data? is there documentation about this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The YAML file format is specific to python (more docs here). We just directly load the yaml with something like

    with open(config_path) as file:
        config = yaml.safe_load(file)
    logging.config.dictConfig(config)

Probably could do a better job explaining that this could be used in the brief logging docs here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a comment in this file explaining the origin/format of this YAML file?

Copy link
Contributor

@GeneDer GeneDer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No blocker for me, but we should probably add some tests to ensure the behavior, and follow up with product and doc changes.

Also, maybe out of scope, but right now everything is configured through env var. Does it make sense to create a logging config for users to do other configurations such as log level or log file locations?

python/ray/data/_internal/logging_json.yaml Outdated Show resolved Hide resolved
@omatthew98
Copy link
Contributor Author

No blocker for me, but we should probably add some tests to ensure the behavior, and follow up with product and doc changes.

Also, maybe out of scope, but right now everything is configured through env var. Does it make sense to create a logging config for users to do other configurations such as log level or log file locations?

Users could create their own logging.yaml configuration (which could be forked off either of the default examples we have for TEXT or JSON), and specify the path to be used instead of the default. Does that make sense as a workflow or is there something that would be preferred?

@GeneDer
Copy link
Contributor

GeneDer commented Sep 9, 2024

No blocker for me, but we should probably add some tests to ensure the behavior, and follow up with product and doc changes.
Also, maybe out of scope, but right now everything is configured through env var. Does it make sense to create a logging config for users to do other configurations such as log level or log file locations?

Users could create their own logging.yaml configuration (which could be forked off either of the default examples we have for TEXT or JSON), and specify the path to be used instead of the default. Does that make sense as a workflow or is there something that would be preferred?

Yep, understand there are essentially unlimited degrees of freedom for users to define their own yaml, but then they would have to specify everything on their own right. Just wondering if here are something in between if user just need a small tweak. But totally understand if there are no such usecase for ray data, no need to over engineer this.

Signed-off-by: Matthew Owen <mowen@anyscale.com>
Copy link
Contributor

@GeneDer GeneDer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@omatthew98
Copy link
Contributor Author

Yep, understand there are essentially unlimited degrees of freedom for users to define their own yaml, but then they would have to specify everything on their own right. Just wondering if here are something in between if user just need a small tweak. But totally understand if there are no such usecase for ray data, no need to over engineer this.

I think to keep things simple will leave as is. My understanding is that the vast majority of users will use the default settings (maybe not even JSON file logging) and those who do need something custom will likely have the expertise to set things up in whatever way they desire. Can revisit in the future if this becomes a pain point.

@omatthew98 omatthew98 added the go add ONLY when ready to merge, run all tests label Sep 10, 2024
os.path.join(os.path.dirname(__file__), "logging_json.yaml")
)

# Environment variable to specify the encoding of the log messages
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also comment what options are available?

Comment on lines 103 to 104
environment variable. If the variable isn't set, this function loads the
"logging.yaml" file that is adjacent to this module.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring is slightly out-of-date with the code

@@ -0,0 +1,34 @@
version: 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a comment in this file explaining the origin/format of this YAML file?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants