Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Service API suspend / resume + a modified webapp example #1137

Merged
merged 7 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 294 additions & 0 deletions examples/webapp/webapp_suspend_resume.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
#!/usr/bin/env python3
import asyncio
import json
import pathlib
import sys
from datetime import datetime, timedelta
from decimal import Decimal

from yapapi import Golem
from yapapi.contrib.service.http_proxy import HttpProxyService, LocalHttpProxy
from yapapi.network import Network
from yapapi.payload import vm
from yapapi.props import com
from yapapi.services import Service, ServiceState
from yapapi.strategy import (
PROP_DEBIT_NOTE_INTERVAL_SEC,
DecreaseScoreForUnconfirmedAgreement,
LeastExpensiveLinearPayuMS,
PropValueRange,
)

examples_dir = pathlib.Path(__file__).resolve().parent.parent
sys.path.append(str(examples_dir))

from utils import (
TEXT_COLOR_CYAN,
TEXT_COLOR_DEFAULT,
build_parser,
print_env_info,
run_golem_example,
)

HTTP_IMAGE_HASH = "c37c1364f637c199fe710ca62241ff486db92c875b786814c6030aa1"
DB_IMAGE_HASH = "85021afecf51687ecae8bdc21e10f3b11b82d2e3b169ba44e177340c"

STARTING_TIMEOUT = timedelta(minutes=4)


DEBIT_NOTE_INTERVAL_SEC = 3600


class HttpService(HttpProxyService):
def __init__(self, db_address: str, db_port: int = 4001):
super().__init__(remote_port=5000)
self._db_address = db_address
self._db_port = db_port

@staticmethod
async def get_payload():
return await vm.repo(
image_hash=HTTP_IMAGE_HASH,
capabilities=[vm.VM_CAPS_VPN],
)

async def start(self):
# perform the initialization of the Service
# (which includes sending the network details within the `deploy` command)
async for script in super().start():
yield script

script = self._ctx.new_script(timeout=timedelta(seconds=20))

script.run(
"/bin/bash",
"-c",
f"cd /webapp && python app.py "
f"--db-address {self._db_address} "
f"--db-port {self._db_port}"
f" initdb",
)
script.run(
"/bin/bash",
"-c",
f"cd /webapp && python app.py "
f"--db-address {self._db_address} "
f"--db-port {self._db_port} "
f"run > /webapp/out 2> /webapp/err &",
)
yield script

def _serialize_init_params(self):
return {
"db_address": self._db_address,
"db_port": self._db_port,
}


class DbService(Service):
@staticmethod
async def get_payload():
return await vm.repo(
image_hash=DB_IMAGE_HASH,
capabilities=[vm.VM_CAPS_VPN],
)

async def start(self):
# perform the initialization of the Service
# (which includes sending the network details within the `deploy` command)
async for script in super().start():
yield script

script = self._ctx.new_script(timeout=timedelta(seconds=30))
script.run("/bin/run_rqlite.sh")
yield script


class MyMarketStrategy(LeastExpensiveLinearPayuMS):
acceptable_prop_value_range_overrides = {
PROP_DEBIT_NOTE_INTERVAL_SEC: PropValueRange(DEBIT_NOTE_INTERVAL_SEC, None),
}


async def main(subnet_tag, payment_driver, payment_network, port):
base_strategy = MyMarketStrategy(
max_fixed_price=Decimal("1.0"),
max_price_for={com.Counter.CPU: Decimal("0.2"), com.Counter.TIME: Decimal("0.1")},
)
strategy = DecreaseScoreForUnconfirmedAgreement(base_strategy, 0.5)

golem = Golem(
budget=1.0,
subnet_tag=subnet_tag,
payment_driver=payment_driver,
payment_network=payment_network,
strategy=strategy,
)

golem.add_event_consumer(strategy.on_event)

await golem.start()

print_env_info(golem)

network = await golem.create_network("192.168.0.1/24")
db_cluster = await golem.run_service(DbService, network=network)
db_instance = db_cluster.instances[0]

def still_starting(cluster):
return any(
i.state in (ServiceState.pending, ServiceState.starting) for i in cluster.instances
)

def raise_exception_if_still_starting(cluster):
if still_starting(cluster):
raise Exception(
f"Failed to start {cluster} instances "
f"after {STARTING_TIMEOUT.total_seconds()} seconds"
)

commissioning_time = datetime.now()

while still_starting(db_cluster) and datetime.now() < commissioning_time + STARTING_TIMEOUT:
print(db_cluster.instances)
await asyncio.sleep(5)

raise_exception_if_still_starting(db_cluster)

print(f"{TEXT_COLOR_CYAN}DB instance started, spawning the web server{TEXT_COLOR_DEFAULT}")

web_cluster = await golem.run_service(
HttpService,
network=network,
instance_params=[{"db_address": db_instance.network_node.ip}],
)

# wait until all remote http instances are started

while still_starting(web_cluster) and datetime.now() < commissioning_time + STARTING_TIMEOUT:
print(web_cluster.instances + db_cluster.instances)
await asyncio.sleep(5)

raise_exception_if_still_starting(web_cluster)

# service instances started, start the local HTTP server

proxy = LocalHttpProxy(web_cluster, port)
await proxy.run()

print(
f"{TEXT_COLOR_CYAN}Local HTTP server listening on:\n"
f"http://localhost:{port}{TEXT_COLOR_DEFAULT}"
)

secs = 7
print(f"{TEXT_COLOR_CYAN}waiting {secs} seconds...{TEXT_COLOR_DEFAULT}")
await asyncio.sleep(secs)

await proxy.stop()
print(f"{TEXT_COLOR_CYAN}HTTP server stopped{TEXT_COLOR_DEFAULT}")

print("=============================================== SERIALIZING AND DROPPING CURRENT STATE")

network_serialized = network.serialize()
db_serialized = db_cluster.serialize_instances()
web_serialized = web_cluster.serialize_instances()

web_cluster.suspend()
db_cluster.suspend()

print(f"{TEXT_COLOR_CYAN}waiting {secs} seconds...{TEXT_COLOR_DEFAULT}")
await asyncio.sleep(secs)

print("=============================================== STOPPING GOLEM ENGINE")

await golem.stop(wait_for_payments=False)

print(f"{TEXT_COLOR_CYAN}waiting {secs} seconds...{TEXT_COLOR_DEFAULT}")
await asyncio.sleep(secs)

print("=============================================== SERIALIZED STATE: ")

print(json.dumps([network_serialized, db_serialized, web_serialized], indent=4))

print(f"{TEXT_COLOR_CYAN}waiting {secs} seconds...{TEXT_COLOR_DEFAULT}")
await asyncio.sleep(secs)

print("=============================================== RESTARTING THE ENGINE AND THE SERVICES")

golem = Golem(
budget=1.0,
subnet_tag=subnet_tag,
payment_driver=payment_driver,
payment_network=payment_network,
)

await golem.start()

print_env_info(golem)

network = Network.deserialize(golem._engine._net_api, network_serialized)

db_cluster = await golem.resume_service(DbService, instances=db_serialized, network=network)
web_cluster = await golem.resume_service(HttpService, instances=web_serialized, network=network)

print([i.state for i in web_cluster.instances])

raise_exception_if_still_starting(web_cluster)

proxy = LocalHttpProxy(web_cluster, port)
await proxy.run()

print(
f"{TEXT_COLOR_CYAN}Local HTTP server listening on:\n"
f"http://localhost:{port}{TEXT_COLOR_DEFAULT}"
)

# wait until Ctrl-C

while True:
print(web_cluster.instances + db_cluster.instances)
try:
await asyncio.sleep(10)
except (KeyboardInterrupt, asyncio.CancelledError):
break

# perform the shutdown of the local http server and the service cluster

await proxy.stop()
print(f"{TEXT_COLOR_CYAN}HTTP server stopped{TEXT_COLOR_DEFAULT}")

web_cluster.stop()
db_cluster.stop()

cnt = 0
while cnt < 3 and any(s.is_available for s in web_cluster.instances + db_cluster.instances):
print(web_cluster.instances + db_cluster.instances)
await asyncio.sleep(5)
cnt += 1

await network.remove()
await golem.stop()


if __name__ == "__main__":
parser = build_parser("Golem simple Web app example")
parser.add_argument(
"--port",
type=int,
default=8080,
help="The local port to listen on",
)
now = datetime.now().strftime("%Y-%m-%d_%H.%M.%S")
parser.set_defaults(log_file=f"webapp-yapapi-{now}.log")
args = parser.parse_args()

run_golem_example(
main(
subnet_tag=args.subnet_tag,
payment_driver=args.payment_driver,
payment_network=args.payment_network,
port=args.port,
),
log_file=args.log_file,
)
3 changes: 1 addition & 2 deletions tests/goth_tests/test_power_outage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from goth.runner.log import configure_logging
from goth.runner.probe import ProviderProbe, RequestorProbe

from .assertions import assert_no_errors, assert_tasks_processed
from .assertions import assert_tasks_processed

logger = logging.getLogger("goth.test.power_outage")

Expand Down Expand Up @@ -67,7 +67,6 @@ async def test_power_outage(
env=os.environ,
) as (_cmd_task, cmd_monitor, _process_monitor):
# Add assertions to the command output monitor `cmd_monitor`:
cmd_monitor.add_assertion(assert_no_errors)
all_sent = cmd_monitor.add_assertion(assert_all_tasks_started)
all_computed = cmd_monitor.add_assertion(assert_all_tasks_computed)

Expand Down
14 changes: 10 additions & 4 deletions tests/test_agreements_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ async def create_agreement():
return create_agreement


def get_agreements_pool() -> agreements_pool.AgreementsPool:
return agreements_pool.AgreementsPool(
lambda _event, **kwargs: None, lambda _offer: None, mock.Mock()
)


@pytest.mark.asyncio
async def test_use_agreement_chooses_max_score():
"""Test that a proposal with the largest score is chosen in AgreementsPool.use_agreement()."""
Expand All @@ -40,7 +46,7 @@ async def test_use_agreement_chooses_max_score():
mock_score = random.random()
proposals[n] = (mock_score, mock_proposal)

pool = agreements_pool.AgreementsPool(lambda _event, **kwargs: None, lambda _offer: None)
pool = get_agreements_pool()

for score, proposal in proposals.values():
await pool.add_proposal(score, proposal)
Expand Down Expand Up @@ -76,7 +82,7 @@ async def test_use_agreement_shuffles_proposals():
mock_score = 42.0 if n != 0 else 41.0
proposals.append((mock_score, mock_proposal))

pool = agreements_pool.AgreementsPool(lambda _event, **kwargs: None, lambda _offer: None)
pool = get_agreements_pool()

for score, proposal in proposals:
await pool.add_proposal(score, proposal)
Expand All @@ -95,7 +101,7 @@ def use_agreement_cb(agreement):
async def test_use_agreement_no_proposals():
"""Test that `AgreementPool.use_agreement()` returns `None` when there are no proposals."""

pool = agreements_pool.AgreementsPool(lambda _event, **kwargs: None, lambda _offer: None)
pool = get_agreements_pool()

def use_agreement_cb(_agreement):
assert False, "use_agreement callback called"
Expand All @@ -120,7 +126,7 @@ async def test_terminate_agreement(multi_activity, simulate_race, event_emitted)
events = []

pool = agreements_pool.AgreementsPool(
lambda event, **kwargs: events.append(event), lambda _offer: None # noqa
lambda event, **kwargs: events.append(event), lambda _offer: None, mock.Mock() # noqa
)
agreement: BufferedAgreement = BufferedAgreementFactory(has_multi_activity=multi_activity)
pool._agreements[agreement.agreement.id] = agreement
Expand Down
Loading
Loading