From cadc6f02b9cc5d6229be4cc0fd91da02bd353a5d Mon Sep 17 00:00:00 2001 From: Cuong Nguyen <128072568+can-anyscale@users.noreply.github.com> Date: Wed, 10 Jul 2024 13:53:08 -0700 Subject: [PATCH 01/15] [doc][api] recursively get all rsts from the head file (#46530) Currently `api_policy_check` only obtain APIs in the head rst file, and rsts included in the head file. However, we now have rsts including other rsts, etc. This PR updates the logic to recursively obtain all rsts from the head file. Test: - CI --------- Signed-off-by: can --- ci/ray_ci/doc/autodoc.py | 58 +++++++++++++++++++++++++++++------ ci/ray_ci/doc/test_autodoc.py | 40 ++++++++++++++++++------ 2 files changed, 78 insertions(+), 20 deletions(-) diff --git a/ci/ray_ci/doc/autodoc.py b/ci/ray_ci/doc/autodoc.py index fc922ca1c2346..88bd3a40e9757 100644 --- a/ci/ray_ci/doc/autodoc.py +++ b/ci/ray_ci/doc/autodoc.py @@ -1,6 +1,6 @@ import os import re -from typing import List +from typing import List, Set from ci.ray_ci.doc.api import ( API, @@ -11,6 +11,7 @@ _SPHINX_CURRENTMODULE_HEADER = ".. currentmodule::" _SPHINX_TOCTREE_HEADER = ".. toctree::" +_SPHINX_INCLUDE_HEADER = ".. include::" class Autodoc: @@ -42,26 +43,60 @@ def walk(self) -> None: for rst in rsts: self._apis.extend(self._parse_autodoc_rst(rst)) - def _get_autodoc_rsts(self) -> List[str]: + def _get_autodoc_rsts(self) -> Set[str]: + """ + Recursively parse the head_rst_file to find all the autodoc rsts + """ + if self._autodoc_rsts is not None: + return self._autodoc_rsts + + self._autodoc_rsts = {self._head_rst_file} + visit_current = {self._head_rst_file} + while visit_current: + visit_next = set() + for rst in visit_current: + for child_rst in self._get_autodoc_rsts_in_file(rst): + if child_rst not in self._autodoc_rsts: + self._autodoc_rsts.add(child_rst) + visit_next.add(child_rst) + visit_current = visit_next + + return self._autodoc_rsts + + def _get_autodoc_rsts_in_file(self, rst_file: str) -> Set[str]: """ Parse the list of rst declared in the head_rst_file, for example: + .. include:: area_00.rst + .. toctree:: :option area_01.rst area_02.rst """ - if self._autodoc_rsts is not None: - return self._autodoc_rsts + if not os.path.exists(rst_file): + return set() - dir = os.path.dirname(self._head_rst_file) - self._autodoc_rsts = [self._head_rst_file] - with open(self._head_rst_file, "r") as f: + rsts = set() + dir = os.path.dirname(rst_file) + with open(rst_file, "r") as f: line = f.readline() while line: + line = line.strip() + + # look for the include block + if line.startswith(_SPHINX_INCLUDE_HEADER): + rsts.add( + os.path.join( + dir, line.removeprefix(_SPHINX_INCLUDE_HEADER).strip() + ) + ) + line = f.readline() + continue + # look for the toctree block - if not line.strip() == _SPHINX_TOCTREE_HEADER: + if not line == _SPHINX_TOCTREE_HEADER: line = f.readline() continue @@ -73,10 +108,10 @@ def _get_autodoc_rsts(self) -> List[str]: # the line is not empty and not starting with empty space break if line.strip().endswith(".rst"): - self._autodoc_rsts.append(os.path.join(dir, line.strip())) + rsts.add(os.path.join(dir, line.strip())) line = f.readline() - return self._autodoc_rsts + return rsts def _parse_autodoc_rst(self, rst_file: str) -> List[API]: """ @@ -92,6 +127,9 @@ def _parse_autodoc_rst(self, rst_file: str) -> List[API]: myclass.myfunc_01 myclass.myfunc_02 """ + if not os.path.exists(rst_file): + return [] + apis = [] module = None with open(rst_file, "r") as f: diff --git a/ci/ray_ci/doc/test_autodoc.py b/ci/ray_ci/doc/test_autodoc.py index 90ac60aed121b..e340889e82551 100644 --- a/ci/ray_ci/doc/test_autodoc.py +++ b/ci/ray_ci/doc/test_autodoc.py @@ -15,48 +15,68 @@ def test_walk(): f.write("\tapi_01.rst\n") f.write("\tapi_02.rst\n") with open(os.path.join(tmp, "api_01.rst"), "w") as f: + f.write(".. include:: api_03.rst\n") f.write(".. currentmodule:: ci.ray_ci.doc\n") f.write(".. autosummary::\n") - f.write("\t~mock.mock_function\n") f.write("\tmock.mock_module.mock_w00t\n") with open(os.path.join(tmp, "api_02.rst"), "w") as f: f.write(".. currentmodule:: ci.ray_ci.doc.mock\n") f.write(".. autoclass:: MockClass\n") + with open(os.path.join(tmp, "api_03.rst"), "w") as f: + f.write(".. currentmodule:: ci.ray_ci.doc\n") + f.write(".. autosummary::\n") + f.write("\t~mock.mock_function\n") autodoc = Autodoc(os.path.join(tmp, "head.rst")) - apis = autodoc.get_apis() + apis = sorted(autodoc.get_apis(), key=lambda x: x.name) assert str(apis) == str( [ API( - name="ci.ray_ci.doc.mock.mock_function", + name="ci.ray_ci.doc.mock.MockClass", annotation_type=AnnotationType.PUBLIC_API, - code_type=CodeType.FUNCTION, + code_type=CodeType.CLASS, ), API( - name="ci.ray_ci.doc.mock.mock_module.mock_w00t", + name="ci.ray_ci.doc.mock.mock_function", annotation_type=AnnotationType.PUBLIC_API, code_type=CodeType.FUNCTION, ), API( - name="ci.ray_ci.doc.mock.MockClass", + name="ci.ray_ci.doc.mock.mock_module.mock_w00t", annotation_type=AnnotationType.PUBLIC_API, - code_type=CodeType.CLASS, + code_type=CodeType.FUNCTION, ), ] ) assert ( apis[0].get_canonical_name() - == f"{mock_function.__module__}.{mock_function.__qualname__}" + == f"{MockClass.__module__}.{MockClass.__qualname__}" ) assert ( apis[1].get_canonical_name() - == f"{mock_w00t.__module__}.{mock_w00t.__qualname__}" + == f"{mock_function.__module__}.{mock_function.__qualname__}" ) assert ( apis[2].get_canonical_name() - == f"{MockClass.__module__}.{MockClass.__qualname__}" + == f"{mock_w00t.__module__}.{mock_w00t.__qualname__}" ) +def test_get_autodoc_rsts_in_file(): + with tempfile.TemporaryDirectory() as tmp: + with open(os.path.join(tmp, "head.rst"), "w") as f: + f.write(".. include:: api_00.rst\n") + f.write(".. toctree::\n\n") + f.write("\tapi_01.rst\n") + f.write("\tapi_02.rst\n") + + autodoc = Autodoc("head.rst") + sorted(autodoc._get_autodoc_rsts_in_file(os.path.join(tmp, "head.rst"))) == { + os.path.join(tmp, "api_00.rst"), + os.path.join(tmp, "api_01.rst"), + os.path.join(tmp, "api_02.rst"), + } + + if __name__ == "__main__": sys.exit(pytest.main(["-vv", __file__])) From 152422370d08e9a76381692c1d91e2cd23f5e709 Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Thu, 11 Jul 2024 05:22:34 +0800 Subject: [PATCH 02/15] [Doc] Fix security doc link (#46352) closes #46350 Signed-off-by: Superskyyy --- SECURITY.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/SECURITY.md b/SECURITY.md index 3f82fba76bdc9..bccd5c7779434 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -1,6 +1,6 @@ # Security Policy -If you are deploying Ray, read [Security](ray-security/index.html). +If you are deploying Ray, read [Security](doc/source/ray-security/index.md). ## Reporting a vulnerability From d250007feebd339032d510413ebcd3ff3d6ce49b Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Wed, 10 Jul 2024 14:27:25 -0700 Subject: [PATCH 03/15] [Data] Add `snowflake-python-connector` to test requirements (#46544) Signed-off-by: Balaji Veeramani --- .../ml/data-test-requirements.txt | 1 + python/requirements_compiled.txt | 25 +++++++++++++++++-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/python/requirements/ml/data-test-requirements.txt b/python/requirements/ml/data-test-requirements.txt index 2cf48e8e2f3c0..b18bf68fadd76 100644 --- a/python/requirements/ml/data-test-requirements.txt +++ b/python/requirements/ml/data-test-requirements.txt @@ -15,3 +15,4 @@ webdataset raydp==1.7.0b20231020.dev0 pylance decord +snowflake-connector-python diff --git a/python/requirements_compiled.txt b/python/requirements_compiled.txt index afceecc05b5a7..3ac9df36594b6 100644 --- a/python/requirements_compiled.txt +++ b/python/requirements_compiled.txt @@ -123,6 +123,8 @@ array-record==0.5.0 ; sys_platform != "darwin" and platform_system != "Windows" # tensorflow-datasets arrow==1.3.0 # via isoduration +asn1crypto==1.5.1 + # via snowflake-connector-python asttokens==2.4.1 # via stack-data astunparse==1.6.3 @@ -261,12 +263,14 @@ certifi==2023.11.17 # msrest # requests # sentry-sdk + # snowflake-connector-python cffi==1.16.0 # via # argon2-cffi-bindings # cryptography # pymunk # pynacl + # snowflake-connector-python # soundfile cfn-lint==0.83.3 # via moto @@ -274,6 +278,7 @@ charset-normalizer==3.3.2 # via # aiohttp # requests + # snowflake-connector-python chess==1.7.0 # via -r /ray/ci/../python/requirements/ml/rllib-test-requirements.txt chex==0.1.7 @@ -371,6 +376,7 @@ cryptography==38.0.1 # pyjwt # pyopenssl # python-jose + # snowflake-connector-python # sshpubkeys # trustme cupy-cuda11x==13.1.0 ; sys_platform != "darwin" @@ -513,6 +519,7 @@ filelock==3.13.1 # aim # huggingface-hub # ray + # snowflake-connector-python # torch # transformers # virtualenv @@ -778,6 +785,7 @@ idna==3.7 # httpx # jsonschema # requests + # snowflake-connector-python # trustme # yarl imageio==2.31.1 @@ -1433,6 +1441,7 @@ packaging==21.3 # ray # scikit-image # semgrep + # snowflake-connector-python # sphinx # statsmodels # tensorboardx @@ -1519,6 +1528,7 @@ platformdirs==3.11.0 # via # black # jupyter-core + # snowflake-connector-python # virtualenv plotly==5.18.0 # via @@ -1678,6 +1688,7 @@ pyjwt==2.8.0 # azure-cli-core # databricks-cli # msal + # snowflake-connector-python pylance==0.10.18 # via -r /ray/ci/../python/requirements/ml/data-test-requirements.txt pymars==0.10.0 @@ -1703,6 +1714,7 @@ pyopenssl==23.0.0 # azure-cli-core # gcs-oauth2-boto-plugin # gsutil + # snowflake-connector-python # vapi-runtime pyparsing==3.1.1 # via @@ -1805,6 +1817,7 @@ pytz==2022.7.1 # aim # mlflow # pandas + # snowflake-connector-python pyu2f==0.1.5 # via google-reauth pyvmomi==8.0.2.0.1 @@ -1908,6 +1921,7 @@ requests==2.31.0 # responses # segment-analytics-python # semgrep + # snowflake-connector-python # sphinx # tensorboard # tensorflow-datasets @@ -2112,8 +2126,12 @@ sniffio==1.3.0 # httpx snowballstemmer==2.2.0 # via sphinx +snowflake-connector-python==3.11.0 + # via -r /ray/ci/../python/requirements/ml/data-test-requirements.txt sortedcontainers==2.4.0 - # via distributed + # via + # distributed + # snowflake-connector-python soundfile==0.12.1 # via -r /ray/ci/../python/requirements/ml/data-test-requirements.txt soupsieve==2.5 @@ -2270,7 +2288,9 @@ tomli==2.0.1 # pytest # semgrep tomlkit==0.12.3 - # via yq + # via + # snowflake-connector-python + # yq toolz==0.12.0 # via # altair @@ -2417,6 +2437,7 @@ typing-extensions==4.8.0 # pydantic-core # pytorch-lightning # semgrep + # snowflake-connector-python # tensorflow # torch # typer From cdb35859167beee778430ccd167ac450a1c370b0 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Wed, 10 Jul 2024 14:57:30 -0700 Subject: [PATCH 04/15] [Data] Remove dead `InputDataBuffer._set_num_output_blocks` (#46546) Signed-off-by: Balaji Veeramani --- .../data/_internal/execution/operators/input_data_buffer.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/python/ray/data/_internal/execution/operators/input_data_buffer.py b/python/ray/data/_internal/execution/operators/input_data_buffer.py index f55df049a4111..96d27c74134e2 100644 --- a/python/ray/data/_internal/execution/operators/input_data_buffer.py +++ b/python/ray/data/_internal/execution/operators/input_data_buffer.py @@ -40,7 +40,6 @@ def __init__( assert input_data_factory is not None self._input_data_factory = input_data_factory self._is_input_initialized = False - self._num_output_blocks = num_output_blocks self._input_data_index = 0 super().__init__("Input", [], target_max_block_size=None) @@ -67,11 +66,8 @@ def _get_next_inner(self) -> RefBundle: self._input_data_index += 1 return bundle - def _set_num_output_blocks(self, num_output_blocks): - self._num_output_blocks = num_output_blocks - def num_outputs_total(self) -> int: - return self._num_output_blocks or self._num_output_bundles + return self._num_output_bundles def get_stats(self) -> StatsDict: return {} From 7c8a326512ba7f4dfa833778558880bdc43b0ed3 Mon Sep 17 00:00:00 2001 From: Lonnie Liu <95255098+aslonnie@users.noreply.github.com> Date: Wed, 10 Jul 2024 16:01:07 -0700 Subject: [PATCH 05/15] [data] lint fixes on unnecessary comprehension (#46463) simplify things Signed-off-by: Lonnie Liu --- python/ray/data/dataset.py | 2 +- python/ray/data/tests/util.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index eb5915e5f0f75..0b93bc26cb4c6 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -1408,7 +1408,7 @@ def train(self, data_iterator): ) return split_datasets - metadata_mapping = {b: m for b, m in zip(block_refs, metadata)} + metadata_mapping = dict(zip(block_refs, metadata)) # If the locality_hints is set, we use a two-round greedy algorithm # to co-locate the blocks with the actors based on block diff --git a/python/ray/data/tests/util.py b/python/ray/data/tests/util.py index 85c8dd408d370..1af172b659add 100644 --- a/python/ray/data/tests/util.py +++ b/python/ray/data/tests/util.py @@ -61,10 +61,10 @@ def named_values(col_names, tuples): output = [] if isinstance(col_names, list): for t in tuples: - output.append({name: value for (name, value) in zip(col_names, t)}) + output.append(dict(zip(col_names, t))) else: for t in tuples: - output.append({name: value for (name, value) in zip((col_names,), (t,))}) + output.append({col_names: t}) return output From fc674962bb3e716cd01f834730fb0542a49716fa Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Wed, 10 Jul 2024 16:12:32 -0700 Subject: [PATCH 06/15] [Data] Rename `_estimated_output_blocks` to `_estimated_num_output_bundles` (#46547) The name is misleading. The value represents bundles, not blocks. Signed-off-by: Balaji Veeramani --- .../execution/interfaces/physical_operator.py | 6 ++--- .../execution/operators/limit_operator.py | 6 ++--- .../execution/operators/map_operator.py | 2 +- python/ray/data/tests/test_operators.py | 24 +++++++++---------- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces/physical_operator.py b/python/ray/data/_internal/execution/interfaces/physical_operator.py index 1f06bdb164ee9..36dae10663be5 100644 --- a/python/ray/data/_internal/execution/interfaces/physical_operator.py +++ b/python/ray/data/_internal/execution/interfaces/physical_operator.py @@ -184,7 +184,7 @@ def __init__( self._started = False self._in_task_submission_backpressure = False self._metrics = OpRuntimeMetrics(self) - self._estimated_output_blocks = None + self._estimated_num_output_bundles = None self._execution_completed = False def __reduce__(self): @@ -269,8 +269,8 @@ def num_outputs_total(self) -> int: The value returned may be an estimate based off the consumption so far. This is useful for reporting progress. """ - if self._estimated_output_blocks is not None: - return self._estimated_output_blocks + if self._estimated_num_output_bundles is not None: + return self._estimated_num_output_bundles if len(self.input_dependencies) == 1: return self.input_dependencies[0].num_outputs_total() raise AttributeError diff --git a/python/ray/data/_internal/execution/operators/limit_operator.py b/python/ray/data/_internal/execution/operators/limit_operator.py index 7db0590960da1..da4b009a21a6f 100644 --- a/python/ray/data/_internal/execution/operators/limit_operator.py +++ b/python/ray/data/_internal/execution/operators/limit_operator.py @@ -91,7 +91,7 @@ def slice_fn(block, metadata, num_rows) -> Tuple[Block, BlockMetadata]: ) # _consumed_rows / _limit is roughly equal to # _cur_output_bundles / total output blocks - self._estimated_output_blocks = round( + self._estimated_num_output_bundles = round( estimated_total_output_rows / self._consumed_rows * self._cur_output_bundles @@ -113,8 +113,8 @@ def num_outputs_total(self) -> int: # bundles we will have. We estimate based off the consumption so far. if self._execution_completed: return self._cur_output_bundles - elif self._estimated_output_blocks is not None: - return self._estimated_output_blocks + elif self._estimated_num_output_bundles is not None: + return self._estimated_num_output_bundles else: return self.input_dependencies[0].num_outputs_total() diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index 11998c9c1e76e..aa52d9cfdea60 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -322,7 +322,7 @@ def _task_done_callback(task_index: int, exception: Optional[Exception]): / self._metrics.num_inputs_received * self._next_data_task_idx ) - self._estimated_output_blocks = round( + self._estimated_num_output_bundles = round( estimated_num_tasks * self._metrics.num_outputs_of_finished_tasks / self._metrics.num_tasks_finished diff --git a/python/ray/data/tests/test_operators.py b/python/ray/data/tests/test_operators.py index ad141b8d1d455..07d0c4c7eb41f 100644 --- a/python/ray/data/tests/test_operators.py +++ b/python/ray/data/tests/test_operators.py @@ -884,7 +884,7 @@ def map_fn(block_iter: Iterable[Block], ctx) -> Iterable[Block]: assert metrics.obj_store_mem_freed == metrics.bytes_task_inputs_processed, i -def test_map_estimated_output_blocks(): +def test_map_estimated_num_output_bundles(): # Test map operator estimation input_op = InputDataBuffer(make_ref_bundles([[i] for i in range(100)])) @@ -906,12 +906,12 @@ def yield_five(block_iter: Iterable[Block], ctx) -> Iterable[Block]: if op.metrics.num_inputs_received % min_rows_per_bundle == 0: # enough inputs for a task bundle run_op_tasks_sync(op) - assert op._estimated_output_blocks == 50 + assert op._estimated_num_output_bundles == 50 op.all_inputs_done() # 100 inputs -> 100 / 10 = 10 tasks -> 10 * 5 = 50 output blocks - assert op._estimated_output_blocks == 50 + assert op._estimated_num_output_bundles == 50 def test_map_estimated_blocks_split(): @@ -936,14 +936,14 @@ def yield_five(block_iter: Iterable[Block], ctx) -> Iterable[Block]: if op.metrics.num_inputs_received % min_rows_per_bundle == 0: # enough inputs for a task bundle run_op_tasks_sync(op) - assert op._estimated_output_blocks == 100 + assert op._estimated_num_output_bundles == 100 op.all_inputs_done() # Each output block is split in 2, so the number of blocks double. - assert op._estimated_output_blocks == 100 + assert op._estimated_num_output_bundles == 100 -def test_limit_estimated_output_blocks(): +def test_limit_estimated_num_output_bundles(): # Test limit operator estimation input_op = InputDataBuffer(make_ref_bundles([[i, i] for i in range(100)])) op = LimitOperator(100, input_op) @@ -951,12 +951,12 @@ def test_limit_estimated_output_blocks(): while input_op.has_next(): op.add_input(input_op.get_next(), 0) run_op_tasks_sync(op) - assert op._estimated_output_blocks == 50 + assert op._estimated_num_output_bundles == 50 op.all_inputs_done() # 2 rows per bundle, 100 / 2 = 50 blocks output - assert op._estimated_output_blocks == 50 + assert op._estimated_num_output_bundles == 50 # Test limit operator estimation where: limit > # of rows input_op = InputDataBuffer(make_ref_bundles([[i, i] for i in range(100)])) @@ -965,15 +965,15 @@ def test_limit_estimated_output_blocks(): while input_op.has_next(): op.add_input(input_op.get_next(), 0) run_op_tasks_sync(op) - assert op._estimated_output_blocks == 100 + assert op._estimated_num_output_bundles == 100 op.all_inputs_done() # all blocks are outputted - assert op._estimated_output_blocks == 100 + assert op._estimated_num_output_bundles == 100 -def test_all_to_all_estimated_output_blocks(): +def test_all_to_all_estimated_num_output_bundles(): # Test all to all operator input_op = InputDataBuffer(make_ref_bundles([[i] for i in range(100)])) @@ -1002,7 +1002,7 @@ def all_transform(bundles: List[RefBundle], ctx): run_op_tasks_sync(op2) # estimated output blocks for op2 should fallback to op1 - assert op2._estimated_output_blocks is None + assert op2._estimated_num_output_bundles is None assert op2.num_outputs_total() == estimated_output_blocks From 73307a4c7da15a3871633e1b7a5a624f98c3d5e2 Mon Sep 17 00:00:00 2001 From: Cuong Nguyen <128072568+can-anyscale@users.noreply.github.com> Date: Wed, 10 Jul 2024 17:40:42 -0700 Subject: [PATCH 07/15] [doc][api] fix check for auto-generated api docs (#46543) Fix api policy check for auto-generated API docs. For the check to work properly, we first need to compile ray docs to generate all API docs. Test: - CI Signed-off-by: can --- .buildkite/lint.rayci.yml | 5 ++--- ci/lint/lint.sh | 3 +++ ci/ray_ci/doc/cmd_check_api_discrepancy.py | 8 -------- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/.buildkite/lint.rayci.yml b/.buildkite/lint.rayci.yml index 64176969c509c..f9dfdd8e306bf 100644 --- a/.buildkite/lint.rayci.yml +++ b/.buildkite/lint.rayci.yml @@ -26,11 +26,10 @@ steps: - lint key: lint-medium instance_type: medium - depends_on: - - oss-ci-base_build + depends_on: docbuild + job_env: docbuild commands: - ./ci/lint/lint.sh {{matrix}} - job_env: oss-ci-base_build matrix: - api_annotations - api_policy_check data diff --git a/ci/lint/lint.sh b/ci/lint/lint.sh index cdba91decdd74..f67799945f4c4 100755 --- a/ci/lint/lint.sh +++ b/ci/lint/lint.sh @@ -63,7 +63,10 @@ api_annotations() { } api_policy_check() { + # install ray and compile doc to generate API files RAY_DISABLE_EXTRA_CPP=1 pip install -e "python[all]" + make -C doc/ html + # validate the API files bazel run //ci/ray_ci/doc:cmd_check_api_discrepancy -- /ray "$@" } diff --git a/ci/ray_ci/doc/cmd_check_api_discrepancy.py b/ci/ray_ci/doc/cmd_check_api_discrepancy.py index 2350955aee231..ce29ca8ccc7a1 100644 --- a/ci/ray_ci/doc/cmd_check_api_discrepancy.py +++ b/ci/ray_ci/doc/cmd_check_api_discrepancy.py @@ -16,14 +16,6 @@ "ray.data.dataset.MaterializedDataset", # special case where we cannot deprecate although we want to "ray.data.random_access_dataset.RandomAccessDataset", - # TODO(can): apis that are auto-generated so falsely classified as not - # documented with the current logic - "ray.data.iterator.DataIterator", - "ray.data.iterator.DataIterator.iter_batches", - "ray.data.iterator.DataIterator.iter_torch_batches", - "ray.data.iterator.DataIterator.to_tf", - "ray.data.iterator.DataIterator.materialize", - "ray.data.iterator.DataIterator.stats", }, }, } From 170d1084fadaa129e1c28579d2d51ea4ef78192a Mon Sep 17 00:00:00 2001 From: zhilong <121425509+Bye-legumes@users.noreply.github.com> Date: Wed, 10 Jul 2024 21:29:43 -0400 Subject: [PATCH 08/15] [Data] Set for better performance in loop (#46541) close https://github.com/ray-project/ray/issues/46482 Signed-off-by: zhilong --- python/ray/data/_internal/arrow_block.py | 5 +++-- python/ray/data/_internal/pandas_block.py | 3 ++- python/ray/data/_internal/planner/exchange/sort_task_spec.py | 3 ++- python/ray/data/iterator.py | 2 +- python/ray/data/preprocessors/encoder.py | 5 +++-- 5 files changed, 11 insertions(+), 7 deletions(-) diff --git a/python/ray/data/_internal/arrow_block.py b/python/ray/data/_internal/arrow_block.py index dba739f75644a..85f1bba58407a 100644 --- a/python/ray/data/_internal/arrow_block.py +++ b/python/ray/data/_internal/arrow_block.py @@ -278,11 +278,12 @@ def to_numpy( columns = [columns] should_be_single_ndarray = True + column_names_set = set(self._table.column_names) for column in columns: - if column not in self._table.column_names: + if column not in column_names_set: raise ValueError( f"Cannot find column {column}, available columns: " - f"{self._table.column_names}" + f"{column_names_set}" ) arrays = [] diff --git a/python/ray/data/_internal/pandas_block.py b/python/ray/data/_internal/pandas_block.py index 7b46df62b9cee..214aa19b78c1e 100644 --- a/python/ray/data/_internal/pandas_block.py +++ b/python/ray/data/_internal/pandas_block.py @@ -251,8 +251,9 @@ def to_numpy( columns = [columns] should_be_single_ndarray = True + column_names_set = set(self._table.columns) for column in columns: - if column not in self._table.columns: + if column not in column_names_set: raise ValueError( f"Cannot find column {column}, available columns: " f"{self._table.columns.tolist()}" diff --git a/python/ray/data/_internal/planner/exchange/sort_task_spec.py b/python/ray/data/_internal/planner/exchange/sort_task_spec.py index d4b8f0b043917..bc9c2a97ae815 100644 --- a/python/ray/data/_internal/planner/exchange/sort_task_spec.py +++ b/python/ray/data/_internal/planner/exchange/sort_task_spec.py @@ -75,8 +75,9 @@ def validate_schema(self, schema: Optional[Union[type, "pyarrow.lib.Schema"]]): return if self._columns and len(schema.names) > 0: + schema_names_set = set(schema.names) for column in self._columns: - if column not in schema.names: + if column not in schema_names_set: raise ValueError( "The column '{}' does not exist in the " "schema '{}'.".format(column, schema) diff --git a/python/ray/data/iterator.py b/python/ray/data/iterator.py index f0ec5f7f5ee35..281a0c43388b7 100644 --- a/python/ray/data/iterator.py +++ b/python/ray/data/iterator.py @@ -843,7 +843,7 @@ def generator(): if feature_type_spec is None or label_type_spec is None: schema = self.schema() - valid_columns = schema.names + valid_columns = set(schema.names) validate_columns(feature_columns) validate_columns(label_columns) feature_type_spec = get_type_spec(schema, columns=feature_columns) diff --git a/python/ray/data/preprocessors/encoder.py b/python/ray/data/preprocessors/encoder.py index d40930f1a4e23..6201c905cf2a7 100644 --- a/python/ray/data/preprocessors/encoder.py +++ b/python/ray/data/preprocessors/encoder.py @@ -522,7 +522,7 @@ def __init__( def _fit(self, dataset: Dataset) -> Preprocessor: columns_to_get = [ - column for column in self.columns if column not in self.dtypes + column for column in self.columns if column not in set(self.dtypes) ] if columns_to_get: unique_indices = _get_unique_value_indices( @@ -561,8 +561,9 @@ def _get_unique_value_indices( if max_categories is None: max_categories = {} + columns_set = set(columns) for column in max_categories: - if column not in columns: + if column not in columns_set: raise ValueError( f"You set `max_categories` for {column}, which is not present in " f"{columns}." From 26b9464582d3b1af63d0c28bc5f64daca3ce2d88 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> Date: Wed, 10 Jul 2024 21:32:23 -0700 Subject: [PATCH 09/15] [core] Add 1s timeout in RPC to CoreWorkerService.NumPendingTasks in GcsJobManager::HandleGetAllJobInfo (#46335) Signed-off-by: Ruiyang Wang --- python/ray/tests/test_state_api.py | 28 +++++++++++++++++--- src/mock/ray/rpc/worker/core_worker_client.h | 6 +++-- src/ray/gcs/gcs_server/gcs_job_manager.cc | 25 ++++++++++------- src/ray/protobuf/gcs.proto | 3 ++- src/ray/rpc/worker/core_worker_client.h | 14 +++++----- 5 files changed, 53 insertions(+), 23 deletions(-) diff --git a/python/ray/tests/test_state_api.py b/python/ray/tests/test_state_api.py index 9554f5cab52ea..c5c076281e7db 100644 --- a/python/ray/tests/test_state_api.py +++ b/python/ray/tests/test_state_api.py @@ -3598,12 +3598,34 @@ def f(signal): all_job_info = client.get_all_job_info() assert len(all_job_info) == 1 assert job_id in all_job_info - assert client.get_all_job_info()[job_id].is_running_tasks is True + assert all_job_info[job_id].is_running_tasks is True -if __name__ == "__main__": - import sys +def test_hang_driver_has_no_is_running_task(monkeypatch, ray_start_cluster): + """ + When there's a call to JobInfoGcsService.GetAllJobInfo, GCS sends RPC + CoreWorkerService.NumPendingTasks to all drivers for "is_running_task". Our driver + however has trouble serving such RPC, and GCS should timeout that RPC and unsest the + field. + """ + cluster = ray_start_cluster + cluster.add_node(num_cpus=10) + address = cluster.address + + monkeypatch.setenv( + "RAY_testing_asio_delay_us", + "CoreWorkerService.grpc_server.NumPendingTasks=2000000:2000000", + ) + ray.init(address=address) + client = ray.worker.global_worker.gcs_client + my_job_id = ray.worker.global_worker.current_job_id + all_job_info = client.get_all_job_info() + assert list(all_job_info.keys()) == [my_job_id] + assert not all_job_info[my_job_id].HasField("is_running_tasks") + + +if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) else: diff --git a/src/mock/ray/rpc/worker/core_worker_client.h b/src/mock/ray/rpc/worker/core_worker_client.h index a3fc6377deb8c..3a99a800b2e1a 100644 --- a/src/mock/ray/rpc/worker/core_worker_client.h +++ b/src/mock/ray/rpc/worker/core_worker_client.h @@ -32,7 +32,8 @@ class MockCoreWorkerClientInterface : public ray::pubsub::MockSubscriberClientIn MOCK_METHOD(void, NumPendingTasks, (std::unique_ptr request, - const ClientCallback &callback), + const ClientCallback &callback, + int64_t timeout_ms), (override)); MOCK_METHOD(void, DirectActorCallArgWaitComplete, @@ -133,7 +134,8 @@ class MockCoreWorkerClientConfigurableRunningTasks : num_running_tasks_(num_running_tasks) {} void NumPendingTasks(std::unique_ptr request, - const ClientCallback &callback) override { + const ClientCallback &callback, + int64_t timeout_ms = -1) override { NumPendingTasksReply reply; reply.set_num_pending_tasks(num_running_tasks_); callback(Status::OK(), reply); diff --git a/src/ray/gcs/gcs_server/gcs_job_manager.cc b/src/ray/gcs/gcs_server/gcs_job_manager.cc index 09a10644c15e7..5c8b956259707 100644 --- a/src/ray/gcs/gcs_server/gcs_job_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_job_manager.cc @@ -204,6 +204,7 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request, job_data_key_to_indices[job_data_key].push_back(i); } + JobID job_id = data.first; WorkerID worker_id = WorkerID::FromBinary(data.second.driver_address().worker_id()); // If job is not dead, get is_running_tasks from the core worker for the driver. @@ -217,23 +218,29 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request, // Get is_running_tasks from the core worker for the driver. auto client = core_worker_clients_.GetOrConnect(data.second.driver_address()); auto request = std::make_unique(); - RAY_LOG(DEBUG) << "Send NumPendingTasksRequest to worker " << worker_id; + constexpr int64_t kNumPendingTasksRequestTimeoutMs = 1000; + RAY_LOG(DEBUG) << "Send NumPendingTasksRequest to worker " << worker_id + << ", timeout " << kNumPendingTasksRequestTimeoutMs << " ms."; client->NumPendingTasks( std::move(request), - [worker_id, reply, i, num_processed_jobs, try_send_reply]( + [job_id, worker_id, reply, i, num_processed_jobs, try_send_reply]( const Status &status, const rpc::NumPendingTasksReply &num_pending_tasks_reply) { - RAY_LOG(DEBUG) << "Received NumPendingTasksReply from worker " << worker_id; + RAY_LOG(DEBUG).WithField(worker_id) + << "Received NumPendingTasksReply from worker."; if (!status.ok()) { - RAY_LOG(WARNING) << "Failed to get is_running_tasks from core worker: " - << status.ToString(); + RAY_LOG(WARNING).WithField(job_id).WithField(worker_id) + << "Failed to get num_pending_tasks from core worker: " << status + << ", is_running_tasks is unset."; + reply->mutable_job_info_list(i)->clear_is_running_tasks(); + } else { + bool is_running_tasks = num_pending_tasks_reply.num_pending_tasks() > 0; + reply->mutable_job_info_list(i)->set_is_running_tasks(is_running_tasks); } - bool is_running_tasks = num_pending_tasks_reply.num_pending_tasks() > 0; - reply->mutable_job_info_list(i)->set_is_running_tasks(is_running_tasks); (*num_processed_jobs)++; - ; try_send_reply(); - }); + }, + kNumPendingTasksRequestTimeoutMs); } i++; } diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index c4600018ec6a4..1910f4713dd0d 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -703,7 +703,8 @@ message JobTableData { // The optional JobInfo from the Ray Job API. optional JobsAPIInfo job_info = 10; // Whether this job has running tasks. - bool is_running_tasks = 11; + // In GetAllJobInfo, if GCS can't reach the driver, it will be unset. + optional bool is_running_tasks = 11; // Address of the driver that started this job. Address driver_address = 12; } diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index 58070260b7f05..2d451758eae2d 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -106,7 +106,8 @@ class CoreWorkerClientInterface : public pubsub::SubscriberClientInterface { /// \param[in] callback The callback function that handles reply. /// \return if the rpc call succeeds virtual void NumPendingTasks(std::unique_ptr request, - const ClientCallback &callback) {} + const ClientCallback &callback, + int64_t timeout_ms = -1) {} /// Notify a wait has completed for direct actor call arguments. /// @@ -392,13 +393,10 @@ class CoreWorkerClient : public std::enable_shared_from_this, } void NumPendingTasks(std::unique_ptr request, - const ClientCallback &callback) override { - INVOKE_RPC_CALL(CoreWorkerService, - NumPendingTasks, - *request, - callback, - grpc_client_, - /*method_timeout_ms*/ -1); + const ClientCallback &callback, + int64_t timeout_ms = -1) override { + INVOKE_RPC_CALL( + CoreWorkerService, NumPendingTasks, *request, callback, grpc_client_, timeout_ms); } /// Send as many pending tasks as possible. This method is thread-safe. From bb1759aaa745a37bc4a5053ce12d2115206ccd11 Mon Sep 17 00:00:00 2001 From: Saihajpreet Singh Date: Thu, 11 Jul 2024 09:39:31 -0400 Subject: [PATCH 10/15] docs: update GTM tag (#46549) --- doc/source/_templates/extrahead.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/_templates/extrahead.html b/doc/source/_templates/extrahead.html index 4ebed61c764c5..6ef938c38ce78 100644 --- a/doc/source/_templates/extrahead.html +++ b/doc/source/_templates/extrahead.html @@ -53,7 +53,7 @@ j.async = true; j.src = 'https://www.googletagmanager.com/gtm.js?id=' + i + dl; f.parentNode.insertBefore(j, f); - })(window, document, 'script', 'dataLayer', 'GTM-TQX5VVP7'); + })(window, document, 'script', 'dataLayer', 'GTM-P8H6KQG'); From a92e27a89cebdd368449302ff18c1de83fce55af Mon Sep 17 00:00:00 2001 From: Gene Der Su Date: Thu, 11 Jul 2024 11:16:47 -0700 Subject: [PATCH 11/15] [Serve] Add status code to retry when request timed out (#46527) ## Why are these changes needed? It's not immediately apparent what's the status code returned by Serve when the request timed out. Added a note section on the timeout setup so user knows what status code to expect and potentially retry. ## Related issue number ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Gene Su Signed-off-by: Gene Der Su Co-authored-by: shrekris-anyscale <92341594+shrekris-anyscale@users.noreply.github.com> Co-authored-by: angelinalg <122562471+angelinalg@users.noreply.github.com> --- doc/source/serve/advanced-guides/performance.md | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/doc/source/serve/advanced-guides/performance.md b/doc/source/serve/advanced-guides/performance.md index 6f5c599c3a225..569de9554c3b2 100644 --- a/doc/source/serve/advanced-guides/performance.md +++ b/doc/source/serve/advanced-guides/performance.md @@ -55,7 +55,15 @@ proper backpressure. You can increase the value in the deployment decorator; e.g By default, Serve lets client HTTP requests run to completion no matter how long they take. However, slow requests could bottleneck the replica processing, blocking other requests that are waiting. Set an end-to-end timeout, so slow requests can be terminated and retried. -You can set an end-to-end timeout for HTTP requests by setting the `request_timeout_s` in the `http_options` field of the Serve config. HTTP Proxies wait for that many seconds before terminating an HTTP request. This config is global to your Ray cluster, and it can't be updated during runtime. Use [client-side retries](serve-best-practices-http-requests) to retry requests that time out due to transient failures. +You can set an end-to-end timeout for HTTP requests by setting the `request_timeout_s` parameter +in the `http_options` field of the Serve config. HTTP Proxies wait for that many +seconds before terminating an HTTP request. This config is global to your Ray cluster, +and you can't update it during runtime. Use [client-side retries](serve-best-practices-http-requests) +to retry requests that time out due to transient failures. + +:::{note} +Serve returns a response with status code `408` when a request times out. Clients can retry when they receive this `408` response. +::: ### Give the Serve Controller more time to process requests From e6887260f848572704a46b2c8fbd4257b87947d1 Mon Sep 17 00:00:00 2001 From: Vicki Boykis Date: Thu, 11 Jul 2024 14:27:11 -0400 Subject: [PATCH 12/15] Update starting-ray.rst to fix broken link (#46475) Updating broken link in cluster README --------- Signed-off-by: Vicki Boykis Signed-off-by: Philipp Moritz Co-authored-by: Philipp Moritz --- doc/source/ray-core/starting-ray.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/ray-core/starting-ray.rst b/doc/source/ray-core/starting-ray.rst index 711049d940f5a..396b29a64f139 100644 --- a/doc/source/ray-core/starting-ray.rst +++ b/doc/source/ray-core/starting-ray.rst @@ -260,4 +260,4 @@ Note that the machine calling ``ray up`` will not be considered as part of the R What's next? ------------ -Check out our `Deployment section `_ for more information on deploying Ray in different settings, including Kubernetes, YARN, and SLURM. +Check out our `Deployment section <../cluster/getting-started.html>`_ for more information on deploying Ray in different settings, including `Kubernetes <../cluster/kubernetes/index.html>`_, `YARN <../cluster/vms/user-guides/community/yarn.html>`_, and `SLURM <../cluster/vms/user-guides/community/slurm.html>`_. From a3ff8b291c4211d08819610e8262ee06a0def37e Mon Sep 17 00:00:00 2001 From: Peyton Murray Date: Thu, 11 Jul 2024 11:42:06 -0700 Subject: [PATCH 13/15] [Core] Replace lambda default arguments (#46554) Signed-off-by: pdmurray --- python/ray/_private/runtime_env/uri_cache.py | 10 +++++++--- python/ray/autoscaler/_private/load_metrics.py | 7 ++++--- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/python/ray/_private/runtime_env/uri_cache.py b/python/ray/_private/runtime_env/uri_cache.py index 97871baba1cbb..c749cc63c0415 100644 --- a/python/ray/_private/runtime_env/uri_cache.py +++ b/python/ray/_private/runtime_env/uri_cache.py @@ -1,5 +1,5 @@ import logging -from typing import Set, Callable +from typing import Set, Callable, Optional default_logger = logging.getLogger(__name__) @@ -29,14 +29,18 @@ class URICache: def __init__( self, - delete_fn: Callable[[str, logging.Logger], int] = lambda uri, logger: 0, + delete_fn: Optional[Callable[[str, logging.Logger], int]] = None, max_total_size_bytes: int = DEFAULT_MAX_URI_CACHE_SIZE_BYTES, debug_mode: bool = False, ): # Maps URIs to the size in bytes of their corresponding disk contents. self._used_uris: Set[str] = set() self._unused_uris: Set[str] = set() - self._delete_fn = delete_fn + + if delete_fn is None: + self._delete_fn = lambda uri, logger: 0 + else: + self._delete_fn = delete_fn # Total size of both used and unused URIs in the cache. self._total_size_bytes = 0 diff --git a/python/ray/autoscaler/_private/load_metrics.py b/python/ray/autoscaler/_private/load_metrics.py index 103f9b3389d5b..9714901987f72 100644 --- a/python/ray/autoscaler/_private/load_metrics.py +++ b/python/ray/autoscaler/_private/load_metrics.py @@ -32,9 +32,7 @@ def add_resources(dict1: Dict[str, float], dict2: Dict[str, float]) -> Dict[str, return new_dict -def freq_of_dicts( - dicts: List[Dict], serializer=lambda d: frozenset(d.items()), deserializer=dict -) -> DictCount: +def freq_of_dicts(dicts: List[Dict], serializer=None, deserializer=dict) -> DictCount: """Count a list of dictionaries (or unhashable types). This is somewhat annoying because mutable data structures aren't hashable, @@ -53,6 +51,9 @@ def freq_of_dicts( is a tuple containing a unique entry from `dicts` and its corresponding frequency count. """ + if serializer is None: + serializer = lambda d: frozenset(d.items()) # noqa: E731 + freqs = Counter(serializer(d) for d in dicts) as_list = [] for as_set, count in freqs.items(): From e5798d6942097c95ecc4cc73af1849b7b601557a Mon Sep 17 00:00:00 2001 From: Lonnie Liu <95255098+aslonnie@users.noreply.github.com> Date: Thu, 11 Jul 2024 12:10:54 -0700 Subject: [PATCH 14/15] [release auto] block pre-release jobs when RAY_VERSION is specified (#46568) when `RAY_VERSION` is set, it is often for wheel download, verification and uploading, so the pre-release tests do not need to run. running them might actually rebuild the artifacts and lead to issues for the release process. Signed-off-by: Lonnie Liu --- .buildkite/release-automation/pre_release.rayci.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.buildkite/release-automation/pre_release.rayci.yml b/.buildkite/release-automation/pre_release.rayci.yml index c00b0d14d7ca4..1e645182d4701 100644 --- a/.buildkite/release-automation/pre_release.rayci.yml +++ b/.buildkite/release-automation/pre_release.rayci.yml @@ -2,6 +2,11 @@ group: Pre-release checks depends_on: - forge steps: + - block: "Run pre-release checks" + if: build.env("RAY_VERSION") != null + key: block-pre-release-checks + depends_on: [] + - label: "Check release blockers" key: check-release-blockers instance_type: small_branch From 306cd9e46401cf12bf1ae91f0b9d3464ddba647d Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Thu, 11 Jul 2024 14:12:12 -0500 Subject: [PATCH 15/15] [Serve] Fix return type of `_DeploymentHandleBase._get_or_create_router` (#46480) ## Why are these changes needed? I'm definitely *not* using this private method in a hack in our code, and `mypy` is complaining about it. ## Related issue number ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [x] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: Josh Karpel --- python/ray/serve/handle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/serve/handle.py b/python/ray/serve/handle.py index e4b726ccb647d..148673c1308ad 100644 --- a/python/ray/serve/handle.py +++ b/python/ray/serve/handle.py @@ -145,7 +145,7 @@ def _set_request_protocol(self, request_protocol: RequestProtocol): _request_protocol=request_protocol ) - def _get_or_create_router(self) -> Union[Router, asyncio.AbstractEventLoop]: + def _get_or_create_router(self) -> Tuple[Router, asyncio.AbstractEventLoop]: if self._router is None: node_id = ray.get_runtime_context().get_node_id()