Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] Remove framework_iterator from codebase. #47259

Merged
merged 3 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions rllib/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1898,13 +1898,6 @@ py_test(
srcs = ["utils/postprocessing/tests/test_value_predictions.py"]
)

py_test(
name = "test_random_encoder",
tags = ["team:rllib", "utils"],
size = "large",
srcs = ["utils/exploration/tests/test_random_encoder.py"]
)

py_test(
name = "test_torch_utils",
tags = ["team:rllib", "utils", "gpu"],
Expand Down
94 changes: 42 additions & 52 deletions rllib/algorithms/appo/tests/test_appo.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@
import ray.rllib.algorithms.appo as appo
from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID
from ray.rllib.utils.metrics.learner_info import LEARNER_INFO, LEARNER_STATS_KEY
from ray.rllib.utils.test_utils import (
check_compute_single_action,
check_train_results,
framework_iterator,
)
from ray.rllib.utils.test_utils import check_compute_single_action, check_train_results


class TestAPPO(unittest.TestCase):
Expand All @@ -25,15 +21,14 @@ def test_appo_compilation(self):
config = appo.APPOConfig().env_runners(num_env_runners=1)
num_iterations = 2

for _ in framework_iterator(config):
algo = config.build(env="CartPole-v1")
for i in range(num_iterations):
results = algo.train()
print(results)
check_train_results(results)
algo = config.build(env="CartPole-v1")
for i in range(num_iterations):
results = algo.train()
print(results)
check_train_results(results)

check_compute_single_action(algo)
algo.stop()
check_compute_single_action(algo)
algo.stop()

def test_appo_compilation_use_kl_loss(self):
"""Test whether APPO can be built with kl_loss enabled."""
Expand All @@ -42,14 +37,13 @@ def test_appo_compilation_use_kl_loss(self):
)
num_iterations = 2

for _ in framework_iterator(config):
algo = config.build(env="CartPole-v1")
for i in range(num_iterations):
results = algo.train()
check_train_results(results)
print(results)
check_compute_single_action(algo)
algo.stop()
algo = config.build(env="CartPole-v1")
for i in range(num_iterations):
results = algo.train()
check_train_results(results)
print(results)
check_compute_single_action(algo)
algo.stop()

def test_appo_two_optimizers_two_lrs(self):
# Not explicitly setting this should cause a warning, but not fail.
Expand All @@ -71,14 +65,13 @@ def test_appo_two_optimizers_two_lrs(self):
num_iterations = 2

# Only supported for tf so far.
for _ in framework_iterator(config, frameworks=("torch", "tf2", "tf")):
algo = config.build(env="CartPole-v1")
for i in range(num_iterations):
results = algo.train()
check_train_results(results)
print(results)
check_compute_single_action(algo)
algo.stop()
algo = config.build(env="CartPole-v1")
for i in range(num_iterations):
results = algo.train()
check_train_results(results)
print(results)
check_compute_single_action(algo)
algo.stop()

def test_appo_entropy_coeff_schedule(self):
# Initial lr, doesn't really matter because of the schedule below.
Expand Down Expand Up @@ -122,19 +115,18 @@ def _step_n_times(algo, n: int):
"entropy_coeff"
]

for _ in framework_iterator(config, frameworks=("torch", "tf")):
algo = config.build(env="CartPole-v1")
algo = config.build(env="CartPole-v1")

coeff = _step_n_times(algo, 10) # 200 timesteps
# Should be close to the starting coeff of 0.01.
self.assertLessEqual(coeff, 0.01)
self.assertGreaterEqual(coeff, 0.001)
coeff = _step_n_times(algo, 10) # 200 timesteps
# Should be close to the starting coeff of 0.01.
self.assertLessEqual(coeff, 0.01)
self.assertGreaterEqual(coeff, 0.001)

coeff = _step_n_times(algo, 20) # 400 timesteps
# Should have annealed to the final coeff of 0.0001.
self.assertLessEqual(coeff, 0.001)
coeff = _step_n_times(algo, 20) # 400 timesteps
# Should have annealed to the final coeff of 0.0001.
self.assertLessEqual(coeff, 0.001)

algo.stop()
algo.stop()

def test_appo_learning_rate_schedule(self):
config = (
Expand Down Expand Up @@ -173,15 +165,14 @@ def _step_n_times(algo, n: int):
"cur_lr"
]

for _ in framework_iterator(config):
algo = config.build(env="CartPole-v1")
algo = config.build(env="CartPole-v1")

lr1 = _step_n_times(algo, 10) # 200 timesteps
lr2 = _step_n_times(algo, 10) # 200 timesteps
lr1 = _step_n_times(algo, 10) # 200 timesteps
lr2 = _step_n_times(algo, 10) # 200 timesteps

self.assertGreater(lr1, lr2)
self.assertGreater(lr1, lr2)

algo.stop()
algo.stop()

def test_appo_model_variables(self):
config = (
Expand All @@ -202,13 +193,12 @@ def test_appo_model_variables(self):
)
)

for _ in framework_iterator(config, frameworks=["tf2", "torch"]):
algo = config.build(env="CartPole-v1")
state = algo.get_policy(DEFAULT_POLICY_ID).get_state()
# Weights and Biases for the single hidden layer, the output layer
# of the policy and value networks. So 6 tensors in total.
# We should not get the tensors from the target model here.
self.assertEqual(len(state["weights"]), 6)
algo = config.build(env="CartPole-v1")
state = algo.get_policy(DEFAULT_POLICY_ID).get_state()
# Weights and Biases for the single hidden layer, the output layer
# of the policy and value networks. So 6 tensors in total.
# We should not get the tensors from the target model here.
self.assertEqual(len(state["weights"]), 6)


if __name__ == "__main__":
Expand Down
79 changes: 36 additions & 43 deletions rllib/algorithms/bc/tests/test_bc_old_api_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from ray.rllib.utils.test_utils import (
check_compute_single_action,
check_train_results,
framework_iterator,
)


Expand Down Expand Up @@ -50,48 +49,42 @@ def test_bc_compilation_and_learning_from_offline_file(self):
num_iterations = 350
min_return_to_reach = 75.0

# Test for the following frameworks.
frameworks_to_test = ("torch", "tf")

for _ in framework_iterator(config, frameworks=frameworks_to_test):
for recurrent in [True, False]:
# We only test recurrent networks with RLModules.
if recurrent:
# TODO (Artur): We read input data without a time-dimensions.
# In order for a recurrent offline learning RL Module to
# work, the input data needs to be transformed do add a
# time-dimension.
continue

config.training(model={"use_lstm": recurrent})
algo = config.build(env="CartPole-v1")
learnt = False
for i in range(num_iterations):
results = algo.train()
check_train_results(results)
print(results)

eval_results = results.get("evaluation")
if eval_results:
mean_return = eval_results[ENV_RUNNER_RESULTS][
EPISODE_RETURN_MEAN
]
print("iter={} R={}".format(i, mean_return))
# Learn until good reward is reached in the actual env.
if mean_return > min_return_to_reach:
print("learnt!")
learnt = True
break

if not learnt:
raise ValueError(
"`BC` did not reach {} reward from expert offline "
"data!".format(min_return_to_reach)
)

check_compute_single_action(algo, include_prev_action_reward=True)

algo.stop()
for recurrent in [True, False]:
# We only test recurrent networks with RLModules.
if recurrent:
# TODO (Artur): We read input data without a time-dimensions.
# In order for a recurrent offline learning RL Module to
# work, the input data needs to be transformed do add a
# time-dimension.
continue

config.training(model={"use_lstm": recurrent})
algo = config.build(env="CartPole-v1")
learnt = False
for i in range(num_iterations):
results = algo.train()
check_train_results(results)
print(results)

eval_results = results.get("evaluation")
if eval_results:
mean_return = eval_results[ENV_RUNNER_RESULTS][EPISODE_RETURN_MEAN]
print("iter={} R={}".format(i, mean_return))
# Learn until good reward is reached in the actual env.
if mean_return > min_return_to_reach:
print("learnt!")
learnt = True
break

if not learnt:
raise ValueError(
"`BC` did not reach {} reward from expert offline "
"data!".format(min_return_to_reach)
)

check_compute_single_action(algo, include_prev_action_reward=True)

algo.stop()


if __name__ == "__main__":
Expand Down
114 changes: 41 additions & 73 deletions rllib/algorithms/cql/tests/test_cql.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,17 @@
import numpy as np
from pathlib import Path
import os
import unittest

import ray
from ray.rllib.algorithms import cql
from ray.rllib.utils.framework import try_import_tf, try_import_torch
from ray.rllib.utils.framework import try_import_torch
from ray.rllib.utils.metrics import (
ENV_RUNNER_RESULTS,
EPISODE_RETURN_MEAN,
EVALUATION_RESULTS,
)
from ray.rllib.utils.test_utils import (
check_compute_single_action,
check_train_results,
framework_iterator,
)
from ray.rllib.utils.test_utils import check_compute_single_action, check_train_results

tf1, tf, tfv = try_import_tf()
torch, _ = try_import_torch()


Expand Down Expand Up @@ -75,77 +69,51 @@ def test_cql_compilation(self):
)
num_iterations = 4

# Test for tf/torch frameworks.
for fw in framework_iterator(config):
algo = config.build()
for i in range(num_iterations):
results = algo.train()
check_train_results(results)
print(results)
eval_results = results.get(EVALUATION_RESULTS)
if eval_results:
print(
f"iter={algo.iteration} "
f"R={eval_results[ENV_RUNNER_RESULTS][EPISODE_RETURN_MEAN]}"
)
check_compute_single_action(algo)

# Get policy and model.
pol = algo.get_policy()
cql_model = pol.model
if fw == "tf":
pol.get_session().__enter__()

# Example on how to do evaluation on the trained Algorithm
# using the data from CQL's global replay buffer.
# Get a sample (MultiAgentBatch).

batch = algo.env_runner.input_reader.next()
multi_agent_batch = batch.as_multi_agent()
# All experiences have been buffered for `default_policy`
batch = multi_agent_batch.policy_batches["default_policy"]

if fw == "torch":
obs = torch.from_numpy(batch["obs"])
else:
obs = batch["obs"]
batch["actions"] = batch["actions"].astype(np.float32)

# Pass the observations through our model to get the
# features, which then to pass through the Q-head.
model_out, _ = cql_model({"obs": obs})
# The estimated Q-values from the (historic) actions in the batch.
if fw == "torch":
q_values_old = cql_model.get_q_values(
model_out, torch.from_numpy(batch["actions"])
)
else:
q_values_old = cql_model.get_q_values(
tf.convert_to_tensor(model_out), batch["actions"]
algo = config.build()
for i in range(num_iterations):
results = algo.train()
check_train_results(results)
print(results)
eval_results = results.get(EVALUATION_RESULTS)
if eval_results:
print(
f"iter={algo.iteration} "
f"R={eval_results[ENV_RUNNER_RESULTS][EPISODE_RETURN_MEAN]}"
)
check_compute_single_action(algo)

# The estimated Q-values for the new actions computed
# by our policy.
actions_new = pol.compute_actions_from_input_dict({"obs": obs})[0]
if fw == "torch":
q_values_new = cql_model.get_q_values(
model_out, torch.from_numpy(actions_new)
)
else:
q_values_new = cql_model.get_q_values(model_out, actions_new)
# Get policy and model.
pol = algo.get_policy()
cql_model = pol.model

if fw == "tf":
q_values_old, q_values_new = pol.get_session().run(
[q_values_old, q_values_new]
)
# Example on how to do evaluation on the trained Algorithm
# using the data from CQL's global replay buffer.
# Get a sample (MultiAgentBatch).

batch = algo.env_runner.input_reader.next()
multi_agent_batch = batch.as_multi_agent()
# All experiences have been buffered for `default_policy`
batch = multi_agent_batch.policy_batches["default_policy"]

obs = torch.from_numpy(batch["obs"])

# Pass the observations through our model to get the
# features, which then to pass through the Q-head.
model_out, _ = cql_model({"obs": obs})
# The estimated Q-values from the (historic) actions in the batch.
q_values_old = cql_model.get_q_values(
model_out, torch.from_numpy(batch["actions"])
)

print(f"Q-val batch={q_values_old}")
print(f"Q-val policy={q_values_new}")
# The estimated Q-values for the new actions computed
# by our policy.
actions_new = pol.compute_actions_from_input_dict({"obs": obs})[0]
q_values_new = cql_model.get_q_values(model_out, torch.from_numpy(actions_new))

if fw == "tf":
pol.get_session().__exit__(None, None, None)
print(f"Q-val batch={q_values_old}")
print(f"Q-val policy={q_values_new}")

algo.stop()
algo.stop()


if __name__ == "__main__":
Expand Down
Loading
Loading