From 196cb8bdda4065df92ac9ecbff62e8c38831a42a Mon Sep 17 00:00:00 2001 From: Gustavo Montamat Date: Mon, 23 Sep 2024 16:21:37 -0400 Subject: [PATCH 1/7] Adapt to train-test split --- examples/sample_worker.py | 6 +++--- src/gentun/services.py | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/examples/sample_worker.py b/examples/sample_worker.py index c0f3651..b0406c1 100644 --- a/examples/sample_worker.py +++ b/examples/sample_worker.py @@ -13,8 +13,8 @@ # docker run -d --rm --name gentun-redis -p 6379:6379 redis worker = RedisWorker("test", Dummy, host="localhost", port=6379) - x_train = [] - y_train = [] + x_train, y_train = [], [] + x_test, y_test = [], [] # Start worker process - worker.run(x_train, y_train) + worker.run(x_train, y_train, x_test, y_test) diff --git a/src/gentun/services.py b/src/gentun/services.py index 7268869..780c190 100644 --- a/src/gentun/services.py +++ b/src/gentun/services.py @@ -25,8 +25,8 @@ from gentun.services import RedisWorker worker = RedisWorker("{name}", {handler}, host="{host}", port={port}) -x_train, y_train = ... # get data -worker.run(x_train, y_train) +x_train, y_train, x_test, y_test = ... # get data +worker.run(x_train, y_train, x_test, y_test) ``` """ @@ -116,11 +116,11 @@ def __init__( self.results_queue = results_queue self.timeout = timeout - def process_job(self, x_train: Any, y_train: Any, **kwargs) -> float: + def process_job(self, x_train: Any, y_train: Any, x_test: Any, y_test: Any, **kwargs) -> float: """Call model handler, return fitness.""" - return self.handler(**kwargs).evaluate(x_train, y_train) + return self.handler(**kwargs)(x_train, y_train) - def run(self, x_train: Any, y_train: Any): + def run(self, x_train: Any, y_train: Any, x_test: Any = None, y_test: Any = None): """Read jobs from queue, call handler, and return fitness.""" logging.info("Worker started (Ctrl+C to stop), waiting for jobs...") try: @@ -130,7 +130,7 @@ def run(self, x_train: Any, y_train: Any): data = json.loads(job_data) if data["name"] == self.name and data["handler"] == self.handler.__name__: logging.info("Working on job %s", data["id"]) - fitness = self.process_job(x_train, y_train, **data["kwargs"]) + fitness = self.process_job(x_train, y_train, x_test, y_test, **data["kwargs"]) result = {"id": data["id"], "name": self.name, "fitness": fitness} self.client.rpush(self.results_queue, json.dumps(result)) else: From ab2081b0ba910f7ea8be1a8f168bc8f110b2fe8d Mon Sep 17 00:00:00 2001 From: Gustavo Montamat Date: Mon, 23 Sep 2024 16:31:02 -0400 Subject: [PATCH 2/7] Add missing test parameters --- src/gentun/services.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gentun/services.py b/src/gentun/services.py index 780c190..4388c4d 100644 --- a/src/gentun/services.py +++ b/src/gentun/services.py @@ -118,7 +118,7 @@ def __init__( def process_job(self, x_train: Any, y_train: Any, x_test: Any, y_test: Any, **kwargs) -> float: """Call model handler, return fitness.""" - return self.handler(**kwargs)(x_train, y_train) + return self.handler(**kwargs)(x_train, y_train, x_test, y_test) def run(self, x_train: Any, y_train: Any, x_test: Any = None, y_test: Any = None): """Read jobs from queue, call handler, and return fitness.""" From 149c496ba3d13b80170b0397573d7fd656e4d24f Mon Sep 17 00:00:00 2001 From: Gustavo Montamat Date: Wed, 25 Sep 2024 14:52:05 -0400 Subject: [PATCH 3/7] Fix lag issue with several workers --- src/gentun/services.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/src/gentun/services.py b/src/gentun/services.py index 4388c4d..55027a2 100644 --- a/src/gentun/services.py +++ b/src/gentun/services.py @@ -79,19 +79,22 @@ def send_job(self, handler: Type[Handler], **kwargs) -> str: "handler": handler.__name__, "kwargs": kwargs, } - self.client.rpush(self.job_queue, json.dumps(job)) + self.client.lpush(self.job_queue, json.dumps(job)) return job_id def wait_for_result(self, job_id) -> float: """Retrieve fitness from the results queue.""" start_time = time.time() while time.time() - start_time < self.timeout: - result = self.client.lpop(self.results_queue) - if result: - result = json.loads(result) + data = self.client.rpop(self.results_queue) + if data: + result = json.loads(data) if result["name"] == self.name and result["id"] == job_id: return result["fitness"] - time.sleep(1) + # Leave data back in queue + self.client.lpush(self.results_queue, data) + else: + time.sleep(1) raise TimeoutError(f"Could not get job with id {job_id}") @@ -125,16 +128,21 @@ def run(self, x_train: Any, y_train: Any, x_test: Any = None, y_test: Any = None logging.info("Worker started (Ctrl+C to stop), waiting for jobs...") try: while True: - job_data = self.client.lpop(self.job_queue) + job_data = self.client.rpop(self.job_queue) if job_data: data = json.loads(job_data) if data["name"] == self.name and data["handler"] == self.handler.__name__: logging.info("Working on job %s", data["id"]) fitness = self.process_job(x_train, y_train, x_test, y_test, **data["kwargs"]) result = {"id": data["id"], "name": self.name, "fitness": fitness} - self.client.rpush(self.results_queue, json.dumps(result)) + self.client.lpush(self.results_queue, json.dumps(result)) + else: + # Job not used, do not dump + self.client.lpush(self.job_queue, job_data) else: logging.debug("No jobs in queue, sleeping for a while...") time.sleep(1) except KeyboardInterrupt: + if job_data: + self.client.lpush(self.job_queue, job_data) logging.info("Bye!") From ed013a4d6a18e8b5c5352152ef238c43f469a08d Mon Sep 17 00:00:00 2001 From: Gustavo Montamat Date: Wed, 25 Sep 2024 14:52:18 -0400 Subject: [PATCH 4/7] Bump release version --- src/gentun/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gentun/__init__.py b/src/gentun/__init__.py index 9bea2fd..79c6bbd 100644 --- a/src/gentun/__init__.py +++ b/src/gentun/__init__.py @@ -4,6 +4,6 @@ from .config import setup_logging -__version__ = "0.0.2" +__version__ = "0.0.3" setup_logging() From ee200c831a16ce5c2bc90c698b19a1e4a521c890 Mon Sep 17 00:00:00 2001 From: Gustavo Montamat Date: Wed, 25 Sep 2024 16:54:56 -0400 Subject: [PATCH 5/7] Update redis tests --- tests/test_services.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/tests/test_services.py b/tests/test_services.py index c16e2e8..2f061bd 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -12,7 +12,7 @@ def __init__(self, param1: int, param2: str = "default"): self.param1 = param1 self.param2 = param2 - def evaluate(self, x_train, y_train): + def create_train_evaluate(self, x_train, y_train, x_test, y_test): return 0.9 @@ -47,14 +47,14 @@ def test_redis_controller_send_job(mock_redis): # Send first job job_id = controller.send_job(MockHandler, param1=1, param2="value") assert isinstance(job_id, str) - job = json.loads(mock_redis.return_value.rpush.call_args[0][1]) + job = json.loads(mock_redis.return_value.lpush.call_args[0][1]) assert job["name"] == "test" assert job["handler"] == "MockHandler" assert job["kwargs"] == {"param1": 1, "param2": "value"} # Send a second job job_id = controller.send_job(MockHandler, param1=2, param2="value2") assert isinstance(job_id, str) - job = json.loads(mock_redis.return_value.rpush.call_args[0][1]) + job = json.loads(mock_redis.return_value.lpush.call_args[0][1]) assert job["name"] == "test" assert job["handler"] == "MockHandler" assert job["kwargs"] == {"param1": 2, "param2": "value2"} @@ -66,7 +66,7 @@ def test_redis_controller_wait_for_result(mock_redis): job_id = "test_job_id" result = {"id": job_id, "name": "test", "fitness": 0.9} ignore_result = {"id": "not_test_job_id", "name": "test", "fitness": 0.9} - mock_redis.return_value.lpop.side_effect = [None, json.dumps(ignore_result), json.dumps(result)] + mock_redis.return_value.rpop.side_effect = [None, json.dumps(ignore_result), json.dumps(result)] fitness = controller.wait_for_result(job_id) assert fitness == 0.9 @@ -75,7 +75,7 @@ def test_redis_controller_wait_for_result(mock_redis): def test_redis_controller_wait_for_result_timeout(mock_redis): controller = RedisController("test", timeout=1) job_id = "test_job_id" - mock_redis.return_value.lpop.return_value = None + mock_redis.return_value.rpop.return_value = None with pytest.raises(TimeoutError): controller.wait_for_result(job_id) @@ -94,7 +94,7 @@ def test_redis_worker_init(mock_redis): @patch("src.gentun.services.redis.StrictRedis") def test_redis_worker_process_job(mock_redis): worker = RedisWorker("test", MockHandler) - fitness = worker.process_job([1, 2, 3], [4, 5, 6], param1=1, param2="value") + fitness = worker.process_job([1, 2, 3], [4, 5, 6], [7, 8, 9], [0, 1, 2], param1=1, param2="value") assert fitness == 0.9 @@ -113,12 +113,18 @@ def test_redis_worker_run(mock_redis): "handler": "NotMockHandler", "kwargs": {"param1": 1, "param2": "value"}, } - mock_redis.return_value.lpop.side_effect = [json.dumps(ignore_job_data)] + [json.dumps(job_data)] + [None] + mock_redis.return_value.rpop.side_effect = [json.dumps(ignore_job_data), json.dumps(job_data), None] with patch.object(worker, "process_job", return_value=0.9) as mock_process_job: with patch("time.sleep", side_effect=KeyboardInterrupt): - worker.run([1, 2, 3], [4, 5, 6]) - mock_process_job.assert_called_once_with([1, 2, 3], [4, 5, 6], param1=1, param2="value") - result = json.loads(mock_redis.return_value.rpush.call_args[0][1]) + worker.run([1, 2, 3], [4, 5, 6], [7, 8, 9], [0, 1, 2]) + mock_process_job.assert_called_once_with( + [1, 2, 3], [4, 5, 6], [7, 8, 9], [0, 1, 2], param1=1, param2="value" + ) + result = json.loads(mock_redis.return_value.lpush.call_args[0][1]) assert result["id"] == "test_job_id" assert result["name"] == "test" assert result["fitness"] == 0.9 + mock_redis.return_value.rpop.side_effect = [json.dumps(ignore_job_data)] + with patch("json.loads", side_effect=KeyboardInterrupt): + worker.run([1, 2, 3], [4, 5, 6], [7, 8, 9], [0, 1, 2]) + mock_redis.return_value.lpush.assert_any_call(worker.job_queue, json.dumps(ignore_job_data)) From 5caf481371da8b074888801069191e39d3df4ffe Mon Sep 17 00:00:00 2001 From: Gustavo Montamat Date: Wed, 25 Sep 2024 18:20:37 -0400 Subject: [PATCH 6/7] Define black-style imports in isort --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index e6a90cb..e8de12c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -75,6 +75,7 @@ line-length = 120 fast = true [tool.isort] +profile = "black" line_length = 120 [tool.coverage.run] From 360956dcad52c9e189302c0f0bd8af31748e150d Mon Sep 17 00:00:00 2001 From: Gustavo Montamat Date: Wed, 25 Sep 2024 18:28:12 -0400 Subject: [PATCH 7/7] Add missing BatchNormalization layer --- src/gentun/models/tensorflow.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/gentun/models/tensorflow.py b/src/gentun/models/tensorflow.py index e030450..1d746a8 100644 --- a/src/gentun/models/tensorflow.py +++ b/src/gentun/models/tensorflow.py @@ -9,7 +9,17 @@ import numpy as np import tensorflow as tf from tensorflow.keras import backend as K -from tensorflow.keras.layers import Activation, Add, Conv2D, Dense, Dropout, Flatten, Input, MaxPool2D +from tensorflow.keras.layers import ( + Activation, + Add, + BatchNormalization, + Conv2D, + Dense, + Dropout, + Flatten, + Input, + MaxPool2D, +) from tensorflow.keras.models import Model from tensorflow.keras.optimizers import Adam from tensorflow.keras.utils import plot_model @@ -124,6 +134,7 @@ def build_dag(x: Any, nodes: int, connections: str, kernels: int): else: tmp = add_vars[0] tmp = Conv2D(kernels, kernel_size=(3, 3), strides=(1, 1), padding="same")(tmp) + tmp = BatchNormalization()(tmp) tmp = Activation("relu")(tmp) all_vars[i] = tmp if not outs: @@ -150,6 +161,7 @@ def build_model( for layer, kernels in enumerate(kernels_per_layer): # Default input node x = Conv2D(kernels, kernel_size=kernel_sizes[layer], strides=(1, 1), padding="same")(x) + x = BatchNormalization()(x) x = Activation("relu")(x) # Decode internal connections # If at least one bit is 1, then we need to construct the Directed Acyclic Graph @@ -157,6 +169,7 @@ def build_model( x = self.build_dag(x, nodes[layer], connections[layer], kernels) # Output node x = Conv2D(kernels, kernel_size=(3, 3), strides=(1, 1), padding="same")(x) + x = BatchNormalization()(x) x = Activation("relu")(x) x = MaxPool2D(pool_size=pool_sizes[layer], strides=(2, 2))(x) x = Flatten()(x)