diff --git a/generator/examples/simple/application_description.json b/generator/examples/simple/application_description.json index 2a6db2fd..2e8cfe36 100644 --- a/generator/examples/simple/application_description.json +++ b/generator/examples/simple/application_description.json @@ -1,8 +1,8 @@ { "cluster_latencies": [ { - "src" : "cluster-1", - "dest": "cluster-2", + "src" : "cluster1", + "dest": "cluster2", "latency": 0.002 } ], @@ -11,41 +11,123 @@ "name": "service1", "clusters": [ { - "cluster": "cluster-1", + "cluster": "cluster1", "namespace": "default", - "node": "node-1" + "node": "cluster1-control-plane" } ], "resources": { "limits": { "memory": "1024M", - "cpu": "1000m" + "cpu": "2000m" }, "requests": { - "memory": "512M", - "cpu": "500m" + "memory": "1024M", + "cpu": "2000m" + } + }, + "processes": 2, + "threads": 2, + "readiness_probe": 1, + "endpoints": [ + { + "name": "endpoint1", + "protocol": "http", + "execution_mode": "sequential", + "network_complexity": { + "forward_requests": "asynchronous", + "response_payload_size": 512, + "called_services": [ + { + "service": "service2", + "port": "80", + "endpoint": "endpoint1", + "protocol": "http", + "traffic_forward_ratio": 1, + "request_payload_size": 256 + }, + { + "service": "service2", + "port": "80", + "endpoint": "endpoint2", + "protocol": "http", + "traffic_forward_ratio": 1, + "request_payload_size": 256 + } + ] + } + }, + { + "name": "endpoint2", + "protocol": "http", + "execution_mode": "parallel", + "network_complexity": { + "forward_requests": "asynchronous", + "response_payload_size": 512, + "called_services": [ + ] + } + } + ] + }, + { + "name": "service2", + "clusters": [ + { + "cluster": "cluster1", + "namespace": "default", + "node": "cluster1-control-plane" + } + ], + "resources": { + "limits": { + "memory": "1024M", + "cpu": "2000m" + }, + "requests": { + "memory": "1024M", + "cpu": "2000m" } }, "processes": 2, "threads": 2, - "readiness_probe": 5, + "readiness_probe": 1, "endpoints": [ { - "name": "end1", + "name": "endpoint1", + "protocol": "http", + "execution_mode": "parallel", + "cpu_complexity": { + "execution_time": "1s", + "method": "fibonacci", + "workers": 2, + "cpu_affinity": [ + 1, + 2 + ], + "cpu_load": "10%" + }, + "memory_complexity": { + "execution_time": "1s", + "method": "swap", + "workers": 24, + "bytes_load": "10%" + }, + "network_complexity": { + "forward_requests": "asynchronous", + "response_payload_size": 512, + "called_services": [] + } + }, + { + "name": "endpoint2", "protocol": "http", - "cpu_consumption": 0.003, - "network_consumption": 0.002, - "memory_consumption": 0.003, - "forward_requests": "asynchronous", - "called_services": [ - { - "service": "service2", - "port": "80", - "endpoint": "end2", - "protocol": "http", - "traffic_forward_ratio": 1 - } - ] + "execution_mode": "parallel", + "network_complexity": { + "forward_requests": "asynchronous", + "response_payload_size": 512, + "called_services": [] + } } ] } diff --git a/generator/examples/simple/cluster-1/service1.yaml b/generator/examples/simple/cluster1/service1.yaml similarity index 56% rename from generator/examples/simple/cluster-1/service1.yaml rename to generator/examples/simple/cluster1/service1.yaml index 9b2c8dc3..70a34fbe 100644 --- a/generator/examples/simple/cluster-1/service1.yaml +++ b/generator/examples/simple/cluster1/service1.yaml @@ -4,11 +4,11 @@ metadata: name: config-service1 labels: name: config-service1 - version: cluster-1 + version: cluster1 namespace: default data: - conf.json: '{"processes":2,"threads":2,"endpoints":[{"name":"end1","protocol":"http","cpu_consumption":0.003,"network_consumption":0.002,"memory_consumption":0.003,"forward_requests":"asynchronous","called_services":[{"service":"service2","port":"80","endpoint":"end2","protocol":"http","traffic_forward_ratio":1}]}]}' - service.proto: "syntax = \"proto3\";\n\n\nservice service1 {\n \n rpc end1 (Request) returns (Response) {}\n \n}\n\n\nmessage Request {\n string data = 1;\n}\n\nmessage Response {\n string data = 1;\n}" + conf.json: '{"processes":2,"threads":2,"endpoints":[{"name":"endpoint1","protocol":"http","execution_mode":"sequential","network_complexity":{"forward_requests":"asynchronous","response_payload_size":512,"called_services":[{"service":"service2","port":"80","endpoint":"endpoint1","protocol":"http","traffic_forward_ratio":1,"request_payload_size":256},{"service":"service2","port":"80","endpoint":"endpoint2","protocol":"http","traffic_forward_ratio":1,"request_payload_size":256}]}},{"name":"endpoint2","protocol":"http","execution_mode":"parallel","network_complexity":{"forward_requests":"asynchronous","response_payload_size":512,"called_services":[]}}]}' + service.proto: "syntax = \"proto3\";\n\n\nservice service1 {\n \n rpc endpoint1 (Request) returns (Response) {}\n \n rpc endpoint2 (Request) returns (Response) {}\n \n}\n\nservice service2 {\n \n rpc endpoint1 (Request) returns (Response) {}\n \n rpc endpoint2 (Request) returns (Response) {}\n \n}\n\n\nmessage Request {\n string data = 1;\n}\n\nmessage Response {\n string data = 1;\n}" --- apiVersion: apps/v1 kind: Deployment @@ -16,20 +16,20 @@ metadata: name: service1 namespace: default labels: - version: cluster-1 + version: cluster1 spec: selector: matchLabels: app: service1 - version: cluster-1 + version: cluster1 replicas: 1 template: metadata: labels: app: service1 - version: cluster-1 + version: cluster1 spec: - nodeName: node-1 + nodeName: cluster1-control-plane containers: - name: app image: app-demo:latest @@ -46,15 +46,15 @@ spec: httpGet: path: / port: 5000 - initialDelaySeconds: 5 + initialDelaySeconds: 1 periodSeconds: 1 resources: limits: - cpu: 1000m + cpu: 2000m memory: 1024M requests: - cpu: 500m - memory: 512M + cpu: 2000m + memory: 1024M volumes: - name: config-data-volume configMap: @@ -66,7 +66,7 @@ metadata: name: service1 namespace: default labels: - version: cluster-1 + version: cluster1 annotations: http: / spec: diff --git a/generator/examples/simple/cluster1/service2.yaml b/generator/examples/simple/cluster1/service2.yaml new file mode 100644 index 00000000..1af87453 --- /dev/null +++ b/generator/examples/simple/cluster1/service2.yaml @@ -0,0 +1,78 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-service2 + labels: + name: config-service2 + version: cluster1 + namespace: default +data: + conf.json: '{"processes":2,"threads":2,"endpoints":[{"name":"endpoint1","protocol":"http","execution_mode":"parallel","cpu_complexity":{"execution_time":"1s","method":"fibonacci","workers":2,"cpu_affinity":[1,2],"cpu_load":"10%"},"memory_complexity":{"execution_time":"1s","method":"swap","workers":24,"bytes_load":"10%"},"network_complexity":{"forward_requests":"asynchronous","response_payload_size":512,"called_services":[]}},{"name":"endpoint2","protocol":"http","execution_mode":"parallel","network_complexity":{"forward_requests":"asynchronous","response_payload_size":512,"called_services":[]}}]}' + service.proto: "syntax = \"proto3\";\n\n\nservice service1 {\n \n rpc endpoint1 (Request) returns (Response) {}\n \n rpc endpoint2 (Request) returns (Response) {}\n \n}\n\nservice service2 {\n \n rpc endpoint1 (Request) returns (Response) {}\n \n rpc endpoint2 (Request) returns (Response) {}\n \n}\n\n\nmessage Request {\n string data = 1;\n}\n\nmessage Response {\n string data = 1;\n}" +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: service2 + namespace: default + labels: + version: cluster1 +spec: + selector: + matchLabels: + app: service2 + version: cluster1 + replicas: 1 + template: + metadata: + labels: + app: service2 + version: cluster1 + spec: + nodeName: cluster1-control-plane + containers: + - name: app + image: app-demo:latest + imagePullPolicy: Never + env: + - name: SERVICE_NAME + value: service2 + ports: + - containerPort: 5000 + volumeMounts: + - mountPath: /usr/src/app/config + name: config-data-volume + readinessProbe: + httpGet: + path: / + port: 5000 + initialDelaySeconds: 1 + periodSeconds: 1 + resources: + limits: + cpu: 2000m + memory: 1024M + requests: + cpu: 2000m + memory: 1024M + volumes: + - name: config-data-volume + configMap: + name: config-service2 +--- +apiVersion: v1 +kind: Service +metadata: + name: service2 + namespace: default + labels: + version: cluster1 + annotations: + http: / +spec: + selector: + app: service2 + ports: + - name: http + port: 80 + targetPort: 5000 diff --git a/generator/input/new_description.json b/generator/input/new_description.json index 2a6db2fd..a80e23bc 100644 --- a/generator/input/new_description.json +++ b/generator/input/new_description.json @@ -33,19 +33,37 @@ { "name": "end1", "protocol": "http", - "cpu_consumption": 0.003, - "network_consumption": 0.002, - "memory_consumption": 0.003, - "forward_requests": "asynchronous", - "called_services": [ - { - "service": "service2", - "port": "80", - "endpoint": "end2", - "protocol": "http", - "traffic_forward_ratio": 1 - } - ] + "execution_mode": "sequential", + "cpu_complexity": { + "execution_time": "1s", + "method": "fibonacci", + "workers": 2, + "cpu_affinity": [ + 0, + 2 + ], + "cpu_load": "10%" + }, + "memory_complexity": { + "execution_time": "1s", + "method": "swap", + "workers": 2, + "bytes_load": "10%" + }, + "network_complexity": { + "forward_requests": "asynchronous", + "response_payload_size": 256, + "called_services": [ + { + "service": "service2", + "port": "80", + "endpoint": "end2", + "protocol": "http", + "traffic_forward_ratio": 1, + "request_payload_size": 512 + } + ] + } } ] } diff --git a/generator/src/pkg/generate/generate.go b/generator/src/pkg/generate/generate.go index 5e5dc5a4..eb875346 100644 --- a/generator/src/pkg/generate/generate.go +++ b/generator/src/pkg/generate/generate.go @@ -141,7 +141,7 @@ func CreateK8sYaml(config model.FileConfig, clusters []string) { for j := 0; j < len(config.Services[i].Clusters); j++ { directory := config.Services[i].Clusters[j].Cluster directory_path := fmt.Sprintf(path+"/%s", directory) - c_id := fmt.Sprintf("%s", config.Services[i].Clusters[j].Cluster) + c_id := config.Services[i].Clusters[j].Cluster nodeAffinity := config.Services[i].Clusters[j].Node namespace := config.Services[i].Clusters[j].Namespace manifestFilePath := fmt.Sprintf(directory_path+"/%s.yaml", serv) @@ -234,7 +234,7 @@ func CreateJsonInput(userConfig model.UserConfig) string { // NOTE: Always calling the first endpoint of the called service calledService.Endpoint = s.EpNamePrefix + "1" - endpoint.CalledServices = append(endpoint.CalledServices, calledService) + endpoint.NetworkComplexity.CalledServices = append(endpoint.NetworkComplexity.CalledServices, calledService) } } diff --git a/generator/src/pkg/model/input.go b/generator/src/pkg/model/input.go index 15a31384..7b31e49d 100644 --- a/generator/src/pkg/model/input.go +++ b/generator/src/pkg/model/input.go @@ -17,21 +17,42 @@ limitations under the License. package model type CalledService struct { - Service string `json:"service"` - Port string `json:"port"` - Endpoint string `json:"endpoint"` - Protocol string `json:"protocol"` - TrafficForwardRatio float32 `json:"traffic_forward_ratio"` + Service string `json:"service"` + Port string `json:"port"` + Endpoint string `json:"endpoint"` + Protocol string `json:"protocol"` + TrafficForwardRatio int `json:"traffic_forward_ratio"` + RequestPayloadSize int `json:"request_payload_size"` +} + +type CpuComplexity struct { + ExecutionTime string `json:"execution_time"` + Method string `json:"method"` + Workers int `json:"workers"` + CpuAffinity []int `json:"cpu_affinity"` + CpuLoad string `json:"cpu_load"` +} + +type MemoryComplexity struct { + ExecutionTime string `json:"execution_time"` + Method string `json:"method"` + Workers int `json:"workers"` + BytesLoad string `json:"bytes_load"` +} + +type NetworkComplexity struct { + ForwardRequests string `json:"forward_requests"` + ResponsePayloadSize int `json:"response_payload_size"` + CalledServices []CalledService `json:"called_services"` } type Endpoint struct { - Name string `json:"name"` - Protocol string `json:"protocol"` - CpuConsumption float64 `json:"cpu_consumption"` - NetworkConsumption float64 `json:"network_consumption"` - MemoryConsumption float64 `json:"memory_consumption"` - ForwardRequests string `json:"forward_requests"` - CalledServices []CalledService `json:"called_services"` + Name string `json:"name"` + Protocol string `json:"protocol"` + ExecutionMode string `json:"execution_mode"` + CpuComplexity *CpuComplexity `json:"cpu_complexity,omitempty"` + MemoryComplexity *MemoryComplexity `json:"memory_complexity,omitempty"` + NetworkComplexity NetworkComplexity `json:"network_complexity"` } type ResourceLimits struct { diff --git a/generator/src/pkg/service/util.go b/generator/src/pkg/service/util.go index 10c8b439..5f3fc057 100644 --- a/generator/src/pkg/service/util.go +++ b/generator/src/pkg/service/util.go @@ -28,9 +28,9 @@ const ( ImageName = "app" ImageURL = "app-demo:latest" - DefaultExtPort = 80 // + DefaultExtPort = 80 DefaultPort = 5000 - defaultProtocol = "http" // + defaultProtocol = "http" Uri = "/" @@ -42,17 +42,23 @@ const ( LimitsMemoryDefault = "1024M" SvcNamePrefix = "service" - SvcProcessesDefault = 2 - SvcThreadsDefault = 2 - SvcReadinessProbeDefault = 5 + SvcProcessesDefault = 1 + SvcThreadsDefault = 1 + SvcReadinessProbeDefault = 2 - EpNamePrefix = "/end" - EpCPUConsumptionDefault = 0.003 - EpNetworkConsumptionDefault = 0.002 - EpMemoryConsumptionDefault = 0.003 - EpForwardRequests = "asynchronous" + EpNamePrefix = "end" + EpExecModeDefault = "sequential" + EpNwResponseSizeDefault = 512 + + EpExecTimeDefault = "1s" + EpMethodDefault = "all" + EpWorkersDefault = 1 + EpLoadDefault = "5%" + + EpNwForwardRequests = "asynchronous" CsTrafficForwardRatio = 1 + CsRequestSizeDefault = 256 ) func CreateDeployment(metadataName, selectorAppName, selectorClusterName string, numberOfReplicas int, @@ -312,10 +318,23 @@ func CreateInputEndpoint() model.Endpoint { var ep model.Endpoint ep.Protocol = defaultProtocol - ep.CpuConsumption = EpCPUConsumptionDefault - ep.NetworkConsumption = EpNetworkConsumptionDefault - ep.MemoryConsumption = EpMemoryConsumptionDefault - ep.ForwardRequests = EpForwardRequests + + ep.ExecutionMode = EpExecModeDefault + + ep.CpuComplexity.ExecutionTime = EpExecTimeDefault + ep.CpuComplexity.Method = EpMethodDefault + ep.CpuComplexity.Workers = EpWorkersDefault + ep.CpuComplexity.CpuAffinity = []int{} + ep.CpuComplexity.CpuLoad = EpLoadDefault + + ep.MemoryComplexity.ExecutionTime = EpExecTimeDefault + ep.MemoryComplexity.Method = EpMethodDefault + ep.MemoryComplexity.Workers = EpWorkersDefault + ep.MemoryComplexity.BytesLoad = EpLoadDefault + + ep.NetworkComplexity.ForwardRequests = EpNwForwardRequests + ep.NetworkComplexity.ResponsePayloadSize = EpNwResponseSizeDefault + ep.NetworkComplexity.CalledServices = []model.CalledService{} return ep } @@ -327,6 +346,7 @@ func CreateInputCalledSvc() model.CalledService { calledSvc.Port = strconv.Itoa(DefaultExtPort) calledSvc.Protocol = defaultProtocol calledSvc.TrafficForwardRatio = CsTrafficForwardRatio + calledSvc.RequestPayloadSize = CsRequestSizeDefault return calledSvc } diff --git a/model/Dockerfile b/model/Dockerfile index 55a00925..7c460a8d 100644 --- a/model/Dockerfile +++ b/model/Dockerfile @@ -20,6 +20,7 @@ RUN mkdir -p /usr/src/app RUN apt update RUN apt install -y jq \ wget \ + stress-ng \ 2to3 WORKDIR /usr/src/app diff --git a/model/path.py b/model/path.py index f78f01e7..4e7985db 100644 --- a/model/path.py +++ b/model/path.py @@ -29,4 +29,5 @@ def process_configfile(): config_data = process_configfile() -SERVICE_CONFIG = config_data \ No newline at end of file +SERVICE_CONFIG = config_data +SERVICE_NAME = os.environ['SERVICE_NAME'] \ No newline at end of file diff --git a/model/restful/resources/endpoint.py b/model/restful/resources/endpoint.py index 81a6ff5e..0206bb32 100644 --- a/model/restful/resources/endpoint.py +++ b/model/restful/resources/endpoint.py @@ -15,7 +15,7 @@ """ from flask_restful import Resource, abort -from path import SERVICE_CONFIG +from path import SERVICE_CONFIG, SERVICE_NAME from restful.utils import task @@ -31,7 +31,7 @@ def get(self, endpoint=None): else: for ep in SERVICE_CONFIG['endpoints']: if ep['name'] == endpoint: - res = task.run_task(service_endpoint=ep) + res = task.run_task(service_name=SERVICE_NAME, service_endpoint=ep) return res not_found(endpoint) @@ -42,9 +42,6 @@ def post(self, endpoint=None): else: for ep in SERVICE_CONFIG['endpoints']: if ep['name'] == endpoint: - res = task.run_task(service_endpoint=ep) + res = task.run_task(service_name=SERVICE_NAME, service_endpoint=ep) return res - not_found(endpoint) - - - + not_found(endpoint) \ No newline at end of file diff --git a/model/restful/utils/task.py b/model/restful/utils/task.py index 56937b0b..b8f2f468 100644 --- a/model/restful/utils/task.py +++ b/model/restful/utils/task.py @@ -14,15 +14,14 @@ limitations under the License. """ -import logging from flask import Blueprint, jsonify, request -import path from aiohttp import ClientSession import asyncio -import uuid +import subprocess +import sys +from concurrent.futures import ThreadPoolExecutor, as_completed +import requests -# TODO: So far, we only support a hard-coded namespace. For more flexible support of namespaces we will need to pass that info as part of the config map -# TODO: So far, we only support http client FORMATTED_REMOTE_URL = "http://{0}:{1}/{2}" @@ -43,105 +42,233 @@ def getForwardHeaders(request): return headers -def run_task(service_endpoint): +def run_task(service_name, service_endpoint): headers = getForwardHeaders(request) - if service_endpoint["forward_requests"] == "asynchronous": + res_payload = create_payload(service_endpoint["network_complexity"]["response_payload_size"]) + + source_svc = {} + source_svc["service"] = service_name + source_svc["endpoint"] = service_endpoint["name"] + + response = create_response() + + execution_mode = service_endpoint["execution_mode"] + if execution_mode == "sequential": + # Network task + if ("network_complexity" in service_endpoint) and (len(service_endpoint["network_complexity"]["called_services"]) > 0): + nw_response, _ = run_network_task(source_svc, service_endpoint, headers, res_payload) + response = concatenate_response_simple(response, nw_response) + + # CPU task + if ("cpu_complexity" in service_endpoint) and len(service_endpoint["cpu_complexity"]["execution_time"]) > 0: + cpu_response, _ = execute_cpu_bounded_task(conf=service_endpoint["cpu_complexity"]) + response["cpu_task"]["services"].append(source_svc["service"]+"/"+source_svc["endpoint"]) + response["cpu_task"]["statuses"].append(cpu_response) + + # Memory task + if ("memory_complexity" in service_endpoint) and len(service_endpoint["memory_complexity"]["execution_time"]) > 0: + mem_response, _ = execute_memory_bounded_task(conf=service_endpoint["memory_complexity"]) + response["memory_task"]["services"].append(source_svc["service"]+"/"+source_svc["endpoint"]) + response["memory_task"]["statuses"].append(mem_response) + + else: # "parallel" + executor = ThreadPoolExecutor(max_workers=3) + task_futures = [] + + # Network task + if ("network_complexity" in service_endpoint) and (len(service_endpoint["network_complexity"]["called_services"]) > 0): + nw_future = executor.submit(run_network_task, source_svc, service_endpoint, headers, res_payload) + task_futures.append(nw_future) + + # CPU task + if ("cpu_complexity" in service_endpoint) and len(service_endpoint["cpu_complexity"]["execution_time"]) > 0: + cpu_future = executor.submit(execute_cpu_bounded_task, service_endpoint["cpu_complexity"]) + task_futures.append(cpu_future) + + # Memory task + if ("memory_complexity" in service_endpoint) and len(service_endpoint["memory_complexity"]["execution_time"]) > 0: + mem_future = executor.submit(execute_memory_bounded_task, service_endpoint["memory_complexity"]) + task_futures.append(mem_future) + + # Wait until all threads are done with their tasks + for future in as_completed(task_futures): + r, task_type = future.result() + if task_type == "network": + response = concatenate_response_simple(response, r) + elif task_type == "cpu": + response["cpu_task"]["services"].append(source_svc["service"]+"/"+source_svc["endpoint"]) + response["cpu_task"]["statuses"].append(r) + elif task_type == "memory": + response["memory_task"]["services"].append(source_svc["service"]+"/"+source_svc["endpoint"]) + response["memory_task"]["statuses"].append(r) + + executor.shutdown() + + return response + + +def execute_cpu_bounded_task(conf): + if len(conf["cpu_affinity"]) > 0: + res = subprocess.run(['stress-ng --class cpu --cpu %s --cpu-method %s --taskset %s --cpu-load %s --timeout %s --metrics-brief' % (conf["workers"], conf["method"], ",".join(str(cpu_id) for cpu_id in conf["cpu_affinity"]), conf["cpu_load"], conf["execution_time"])], capture_output=True, shell=True) + else: + res = subprocess.run(['stress-ng --class cpu --cpu %s --cpu-method %s --cpu-load %s --timeout %s --metrics-brief' % (conf["workers"], conf["method"], conf["cpu_load"], conf["execution_time"])], capture_output=True, shell=True) + + return res.stderr.decode("utf-8"), "cpu" + + +def execute_memory_bounded_task(conf): + res = subprocess.run(['stress-ng --class memory --vm %s --vm-method %s --vm-bytes %s --timeout %s --metrics-brief' % (conf["workers"], conf["method"], conf["bytes_load"], conf["execution_time"])], capture_output=True, shell=True) + + return res.stderr.decode("utf-8"), "memory" + + +def run_network_task(source_svc, service_endpoint, headers, res_payload): + + if service_endpoint["network_complexity"]["forward_requests"] == "asynchronous": asyncio.set_event_loop(asyncio.new_event_loop()) loop = asyncio.get_event_loop() - response = loop.run_until_complete(async_tasks(service_endpoint, headers)) - return response + nw_response = loop.run_until_complete(async_network_task(source_svc, service_endpoint, headers, res_payload)) else: # "synchronous" - asyncio.set_event_loop(asyncio.new_event_loop()) - loop = asyncio.get_event_loop() - response = loop.run_until_complete(sync_tasks(service_endpoint, headers)) + nw_response = sync_network_task(source_svc, service_endpoint, headers, res_payload) + + return nw_response, "network" - return response +async def async_network_task(source_svc, service_endpoint, headers, res_payload): -async def async_tasks(service_endpoint, headers): async with ClientSession() as session: - # TODO: CPU-bounded tasks not supported yet + response = create_response() + response["network_task"]["payload"] = res_payload + io_tasks = [] - if request.get_data() == bytes("", "utf-8"): - json_data = {} - else: - json_data = request.json - if len(service_endpoint["called_services"]) > 0: - for svc in service_endpoint["called_services"]: - io_task = asyncio.create_task(execute_io_bounded_task(session=session, target_service=svc, - json_data=json_data, forward_headers=headers)) + if len(service_endpoint["network_complexity"]["called_services"]) > 0: + for target_svc in service_endpoint["network_complexity"]["called_services"]: + io_task = asyncio.create_task(async_execute_io_bounded_task(session=session, source_service=source_svc, target_service=target_svc, forward_headers=headers)) io_tasks.append(io_task) services = await asyncio.gather(*io_tasks) - # Concatenate json responses - response = {} - response["services"] = [] - response["statuses"] = [] + if len(service_endpoint["network_complexity"]["called_services"]) > 0: + for svc in services: + response = concatenate_response_simple(response, svc) - if len(service_endpoint["called_services"]) > 0: - for svc in services: - response["services"] += svc["services"] - response["statuses"] += svc["statuses"] return response -async def sync_tasks(service_endpoint, headers): - async with ClientSession() as session: - # TODO: CPU-bounded tasks not supported yet - response = {} - response["services"] = [] - response["statuses"] = [] - if request.get_data() == bytes("", "utf-8"): - json_data = {} - else: - json_data = request.json - if len(service_endpoint["called_services"]) > 0: - for svc in service_endpoint["called_services"]: - res = await execute_io_bounded_task(session=session, target_service=svc, json_data=json_data, forward_headers=headers) - response["services"] += res["services"] - response["statuses"] += res["statuses"] +def sync_network_task(source_svc, service_endpoint, headers, res_payload): + + response = create_response() + response["network_task"]["payload"] = res_payload + + if len(service_endpoint["network_complexity"]["called_services"]) > 0: + for target_svc in service_endpoint["network_complexity"]["called_services"]: + res = sync_execute_io_bounded_task(source_service=source_svc, target_service=target_svc, forward_headers=headers) + response = concatenate_response_simple(response, res) return response -def execute_cpu_bounded_task(origin_service_name, target_service, headers): - task_id = str(uuid.uuid4()) - task_config = {} - task_config["task_id"] = task_id - task_config["cpu_consumption"] = target_service["cpu_consumption"] - task_config["network_consumption"] = target_service["network_consumption"] - task_config["memory_consumption"] = target_service["memory_consumption"] +async def async_execute_io_bounded_task(session, source_service, target_service, forward_headers={}): - # TODO: Implement resource stress emulation... + dst = FORMATTED_REMOTE_URL.format(target_service["service"], target_service["port"], target_service["endpoint"]) + forward_headers.update({'Content-type' : 'application/json'}) - response_object = { - "status": "CPU-bounded task executed", - "data": { - "svc_name": origin_service_name, - "task_id": task_config["task_id"] - } - } - return response_object + response = create_response() + json_data = {} + json_data["payload"] = create_payload(target_service["request_payload_size"]) -async def execute_io_bounded_task(session, target_service, json_data, forward_headers={}): - # TODO: Request forwarding to a service on a particular cluster is not supported yet - # TODO: Requests for other protocols than html are not supported yet + forward_ratio = target_service["traffic_forward_ratio"] + if forward_ratio > 0: + async with ClientSession() as session: + io_tasks = [] + + for i in range(forward_ratio): + io_task = asyncio.create_task(session.post(dst, data=json_data, headers=forward_headers)) + io_tasks.append(io_task) + calls = await asyncio.gather(*io_tasks) + + for res in calls: + res_payload = await res.json() + response = concatenate_response(response, res_payload, source_service, target_service) + response["network_task"]['statuses'].append(res.status) + + return response + + +def sync_execute_io_bounded_task(source_service, target_service, forward_headers={}): dst = FORMATTED_REMOTE_URL.format(target_service["service"], target_service["port"], target_service["endpoint"]) forward_headers.update({'Content-type' : 'application/json'}) - # TODO: traffic_forward_ratio not supported yet - traffic_forward_ratio = target_service["traffic_forward_ratio"] + response = create_response() - res = await session.post(dst, data=json_data, headers=forward_headers) - res_payload = await res.json() + json_data = {} + json_data["payload"] = create_payload(target_service["request_payload_size"]) + + forward_ratio = target_service["traffic_forward_ratio"] + if forward_ratio > 0: + for i in range(forward_ratio): + res = requests.post(dst, data=json_data, headers=forward_headers) + response = concatenate_response(response, res.json(), source_service, target_service) + response["network_task"]['statuses'].append(res.status_code) + + return response + + +def create_payload(payload_size): + + request_payload = subprocess.run(['cat /dev/urandom | tr -dc "[:alnum:]" | head -c${1:-%s}' % payload_size], capture_output=True, shell=True) + + return request_payload.stdout.decode("utf-8") + + +def create_response(): response = {} - response["services"] = res_payload["services"] - response['services'].append(str(res.url)) - response["statuses"] = res_payload["statuses"] - response['statuses'].append(res.status) + response["cpu_task"] = {} + response["cpu_task"]["services"] = [] + response["cpu_task"]["statuses"] = [] + + response["memory_task"] = {} + response["memory_task"]["services"] = [] + response["memory_task"]["statuses"] = [] + + response["network_task"] = {} + response["network_task"]["services"] = [] + response["network_task"]["statuses"] = [] + response["network_task"]["payload"] = "" + + return response + + +def concatenate_response_simple(response, res): + + response["cpu_task"]["services"] += res["cpu_task"]["services"] + response["cpu_task"]["statuses"] += res["cpu_task"]["statuses"] + + response["memory_task"]["services"] += res["memory_task"]["services"] + response["memory_task"]["statuses"] += res["memory_task"]["statuses"] + + response["network_task"]["services"] += res["network_task"]["services"] + response["network_task"]["statuses"] += res["network_task"]["statuses"] + response["network_task"]["payload"] += res["network_task"]["payload"] + return response + + +def concatenate_response(response, res_payload, source_service, target_service): + + response["cpu_task"]["services"] += res_payload["cpu_task"]["services"] + response["cpu_task"]["statuses"] += res_payload["cpu_task"]["statuses"] + + response["memory_task"]["services"] += res_payload["memory_task"]["services"] + response["memory_task"]["statuses"] += res_payload["memory_task"]["statuses"] + + response["network_task"]["services"] += res_payload["network_task"]["services"] + response["network_task"]['services'].append("("+source_service["service"]+"/"+source_service["endpoint"]+", "+target_service["service"]+"/"+target_service["endpoint"]+")") + response["network_task"]["statuses"] += res_payload["network_task"]["statuses"] + response["network_task"]["payload"] = res_payload["network_task"]["payload"] + + return response \ No newline at end of file diff --git a/model/run.sh b/model/run.sh index 96e03ece..c2162156 100755 --- a/model/run.sh +++ b/model/run.sh @@ -18,7 +18,7 @@ PROTOCOL=$(jq '.endpoints[0].protocol' config/conf.json -r) PROCESSES=$(jq '.processes' config/conf.json -r) if [ $PROTOCOL = "http" ]; then - $(gunicorn --chdir restful -w $PROCESSES app:app -b 0.0.0.0:5000 ); + $(gunicorn --chdir restful -w $PROCESSES app:app -b 0.0.0.0:5000 --capture-output --log-level debug); elif [ $PROTOCOL = "grpc" ]; then $(cat config/service.proto > service.proto) $(python -m grpc_tools.protoc -I. --python_out=./common --grpc_python_out=./common service.proto); @@ -27,4 +27,4 @@ elif [ $PROTOCOL = "grpc" ]; then # Uninstall the extra apps apt remove -y 2to3 wget $(cd grpc && python app.py) -fi \ No newline at end of file +fi