Skip to content
This repository has been archived by the owner on Jan 16, 2024. It is now read-only.

Commit

Permalink
merged performance version into master
Browse files Browse the repository at this point in the history
  • Loading branch information
NiclasHaderer committed Dec 15, 2023
2 parents a28cb0a + 269cce7 commit caf867d
Show file tree
Hide file tree
Showing 15 changed files with 158 additions and 51 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,5 @@ results/*
# FlameGraph
perf.data*

datasets/*
datasets/*
/utils/GRCh38_latest_genomic.fna
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ The project uses Python, Golang, and (nightly) Rust.

We've used the following versions in our testing. Nightly rust is currently being used for the [std::simd](https://doc.rust-lang.org/nightly/std/simd/index.html) module. Once the module is stabilized then stable rust can be used.

| Dependency | Version |
|-------|-------|
| Python | 3.11.5 |
| poetry | 1.7.1 |
| Go | go1.21.4 linux/amd64 |
| rustc | rustc 1.76.0-nightly (eeff92ad3 2023-12-13) |
| cargo | cargo 1.76.0-nightly (1aa9df1a5 2023-12-12) |
| Dependency | Version |
|------------|---------------------------------------------|
| Python | 3.11.5 |
| poetry | 1.7.1 |
| Go | go1.21.4 linux/amd64 |
| rustc | rustc 1.76.0-nightly (eeff92ad3 2023-12-13) |
| cargo | cargo 1.76.0-nightly (1aa9df1a5 2023-12-12) |

Python dependencies are managed by [Poetry](https://python-poetry.org/) ([installation instructions](https://python-poetry.org/docs/#installation)). After installing Poetry, you can install the project's dependencies from the root folder using `poetry install`.

Expand Down
41 changes: 33 additions & 8 deletions internal/worker/master_rest_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@ type Alignment struct {
}

type WorkPackage struct {
ID *string `json:"id"`
JobID *string `json:"job_id"`
Queries []QueryTargetType `json:"queries"`
Sequences map[SequenceId]Sequence `json:"sequences"`
MatchScore int `json:"match_score"`
MismatchPenalty int `json:"mismatch_penalty"`
GapPenalty int `json:"gap_penalty"`
ID *string `json:"id"`
JobID *string `json:"job_id"`
Queries []QueryTargetType `json:"queries"`
MatchScore int `json:"match_score"`
MismatchPenalty int `json:"mismatch_penalty"`
GapPenalty int `json:"gap_penalty"`
}

type CompleteWorkPackage struct {
*WorkPackage
Sequences map[SequenceId]Sequence `json:"sequences"`
}

type MachineSpecsRequest struct {
Expand Down Expand Up @@ -101,7 +105,7 @@ func (c *RestClient) RequestWork(workerId string) (*WorkPackage, error) {
return nil, err
}

resp, err := c.client.Post(c.baseURL+"/work/", "application/json", bytes.NewReader(jsonData))
resp, err := c.client.Post(c.baseURL+"/work/raw", "application/json", bytes.NewReader(jsonData))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -174,3 +178,24 @@ func (c *RestClient) SendHeartbeat(workerId string) {
}
_ = resp.Body.Close()
}

func (c *RestClient) RequestSequence(workPackageId string, query SequenceId) (*Sequence, error) {
// Get sequence the work package from /work/{work_id}/sequence/{sequence_id}
resp, err := c.client.Get(c.baseURL + "/work/" + workPackageId + "/sequence/" + string(query))
if err != nil {
return nil, err
}

defer resp.Body.Close()

bodyBuffer := new(bytes.Buffer)
_, err = bodyBuffer.ReadFrom(resp.Body)
if err != nil {
return nil, err
}

bodyStringPtr := new(string)
*bodyStringPtr = bodyBuffer.String()

return (*Sequence)(bodyStringPtr), nil
}
49 changes: 45 additions & 4 deletions internal/worker/worker_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,54 @@ func (w *Worker) RegisterWorker() (*string, error) {
}

// GetWork Function to request work from the master, returns the work package if successful
func (w *Worker) GetWork() (*WorkPackage, error) {
return w.client.RequestWork(*w.workerId)
func (w *Worker) GetWork() (*CompleteWorkPackage, error) {
work, err := w.client.RequestWork(*w.workerId)
if err != nil {
return nil, err
}

return w.GetSequencesForWork(work)
}

func (w *Worker) GetSequencesForWork(workPackage *WorkPackage) (*CompleteWorkPackage, error) {

// Create a map of sequence IDs to sequences
sequences := make(map[SequenceId]Sequence)

var addIfNotPresent = func(id SequenceId) error {
_, ok := sequences[id]
if !ok {
sequence, err := w.client.RequestSequence(*workPackage.ID, id)
if err != nil {
return err
}
sequences[id] = *sequence
}
return nil

}

for _, query := range workPackage.Queries {
err := addIfNotPresent(query.Query)
if err != nil {
return nil, err
}
err = addIfNotPresent(query.Target)
if err != nil {
return nil, err
}
}

// Return the work package with the sequences
return &CompleteWorkPackage{
WorkPackage: workPackage,
Sequences: sequences,
}, nil
}

// ExecuteWork Function to execute the work package, returns the work result for every pair if successful
// For now we just execute the work sequentially and send the result back for every seq,tar pair
func (w *Worker) ExecuteWork(work *WorkPackage, queries []QueryTargetType) {
func (w *Worker) ExecuteWork(work *CompleteWorkPackage, queries []QueryTargetType) {
w.status = Working

// A list of results that are then sent batched to the master
Expand Down Expand Up @@ -147,7 +188,7 @@ func findAlignmentWithFallback(query, target string, alignmentScore smithwaterma

}

func (w *Worker) ExecuteWorkInParallel(work *WorkPackage) {
func (w *Worker) ExecuteWorkInParallel(work *CompleteWorkPackage) {
var cpuCount = runtime.NumCPU()
var workPackages = work.Queries
var numWorkPackages = len(workPackages)
Expand Down
3 changes: 2 additions & 1 deletion master/api_models/job.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import annotations

import json
from typing import Literal, Annotated
from uuid import UUID
Expand All @@ -22,7 +23,7 @@ def __hash__(self):

# noinspection PyNestedDecorators
class MultipartJobRequest(BaseModel):
queries: list[TargetQueryCombination]
queries: set[TargetQueryCombination]
match_score: int
mismatch_penalty: int
gap_penalty: int
Expand Down
12 changes: 7 additions & 5 deletions master/job_queue/queued_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from uuid import UUID

from master.api_models import JobRequest, TargetQueryCombination, JobState, Alignment
from master.utils.log_time import log_time


@dataclass
Expand Down Expand Up @@ -30,10 +31,11 @@ def percentage_done(self) -> float:
def done(self) -> bool:
return len(self.completed_sequences) == len(self.request.queries)

def missing_sequences(self) -> list[TargetQueryCombination]:
missing: list[TargetQueryCombination] = []
for sequence in self.request.queries:
if sequence not in self.completed_sequences and sequence not in self.sequences_in_progress:
missing.append(sequence)
@log_time
def missing_sequences(self) -> set[TargetQueryCombination]:
completed_set = self.completed_sequences.keys()
in_progress_set = self.sequences_in_progress

missing = self.request.queries - (completed_set | in_progress_set)

return missing
2 changes: 2 additions & 0 deletions master/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
from fastapi import FastAPI

from master.routers import worker_router, job_router
from master.trace_time import TraceTimeMiddleware

logging.basicConfig(level=logging.INFO)
uvicorn_access = logging.getLogger("uvicorn.access")
uvicorn_access.disabled = True

app = FastAPI(title="DLSA Master")
app.add_middleware(TraceTimeMiddleware)
app.include_router(job_router)
app.include_router(worker_router)
15 changes: 15 additions & 0 deletions master/trace_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import time

from starlette.middleware.base import BaseHTTPMiddleware


class TraceTimeMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time

if process_time > 0.1:
print(f"{request.method} {request.url} {process_time:.2f}s")

return response
14 changes: 14 additions & 0 deletions master/utils/log_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import time


def log_time(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
elapsed_time = end_time - start_time
if elapsed_time > 0.1:
print(f"{func.__name__} took {elapsed_time:.2f} seconds to execute")
return result

return wrapper
10 changes: 5 additions & 5 deletions master/work_package/_scheduler/proportional_work_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import itertools
import logging
import math
from uuid import uuid4

from master.worker.worker import Worker
from .scheduled_work_package import ScheduledWorkPackage, InternalWorkPackage
from .scheduled_work_package import ScheduledWorkPackage
from .utils import work_packages_from_queries
from .work_scheduler import WorkPackageScheduler
from ...api_models import TargetQueryCombination
Expand All @@ -29,12 +29,12 @@ def schedule_work_for(self, worker: Worker) -> None | ScheduledWorkPackage:

def _get_proportional_work_packages(
job: QueuedJob, worker: Worker, idle_workers: list[Worker]
) -> list[TargetQueryCombination]:
) -> set[TargetQueryCombination]:
# Get all missing sequences for the job
queries = job.missing_sequences()
if len(queries) == 0:
logger.error(f"Job {job.id} has no sequences to schedule")
return []
return set()

# Get all workers that are currently NOT working on a job (this includes the worker requesting work)
total_processing_power = sum([worker.resources.benchmark_result for worker in idle_workers])
Expand All @@ -52,4 +52,4 @@ def _get_proportional_work_packages(
amount_of_sequences = min(amount_of_sequences, len(queries))

# Assign the queries to the current worker
return queries[:amount_of_sequences]
return set(itertools.islice(queries, amount_of_sequences))
12 changes: 5 additions & 7 deletions master/work_package/_scheduler/scheduled_work_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from master.api_models import TargetQueryCombination, Sequence, SequenceId
from master.job_queue.queued_job import QueuedJob
from master.utils.log_time import log_time
from master.worker.worker import Worker


Expand All @@ -12,7 +13,7 @@ class InternalWorkPackage:
id: UUID
job: QueuedJob
sequences: dict[SequenceId, Sequence]
queries: list[TargetQueryCombination]
queries: set[TargetQueryCombination]
match_score: int
mismatch_penalty: int
gap_penalty: int
Expand All @@ -24,15 +25,12 @@ class ScheduledWorkPackage:
worker: Worker

@property
@log_time
def percentage_done(self) -> float:
# Get the length of the sequences that should be done in the work package
sequence_length = len(self.package.queries)
completed_sequences = 0

for sequence in self.package.queries:
if sequence in self.package.job.completed_sequences:
completed_sequences += 1

completed_sequences_set = set(self.package.job.completed_sequences.keys())
completed_sequences = len(self.package.queries & completed_sequences_set)
return completed_sequences / sequence_length

def done(self) -> bool:
Expand Down
6 changes: 3 additions & 3 deletions master/work_package/_scheduler/time_work_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def _get_n_seconds_of_work(
job: QueuedJob,
seconds: int,
worker: Worker,
) -> list[TargetQueryCombination]:
unfinished_queries = job.missing_sequences()
) -> set[TargetQueryCombination]:
unfinished_queries = [*job.missing_sequences()]
# Shuffle the unfinished queries to get a more even distribution
random.shuffle(unfinished_queries)

Expand All @@ -56,4 +56,4 @@ def _get_n_seconds_of_work(
if total_time > seconds * 0.9:
break

return queries
return set(queries)
2 changes: 1 addition & 1 deletion master/work_package/_scheduler/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def estimate_work_in_seconds(target: Sequence, query: Sequence, cups: int) -> fl


def work_packages_from_queries(
job: QueuedJob, queries: list[TargetQueryCombination], worker: Worker
job: QueuedJob, queries: set[TargetQueryCombination], worker: Worker
) -> ScheduledWorkPackage | None:
if not queries:
return None
Expand Down
6 changes: 5 additions & 1 deletion master/work_package/work_package_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from master.utils.singleton import Singleton
from master.worker.worker_collector import WorkerCollector
from ._scheduler.work_scheduler import WorkPackageScheduler, ScheduledWorkPackage
from ..utils.log_time import log_time

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -43,8 +44,10 @@ def update_work_result(self, work_id: UUID, result: WorkResult) -> None:

completed_sequences[res.combination].append(res.alignment)
# Remove the sequence from the in progress list if it is in there
if res.combination in work_package.package.job.sequences_in_progress:
try:
work_package.package.job.sequences_in_progress.remove(res.combination)
except ValueError:
pass

# Check if the work package is done
if work_package.done():
Expand All @@ -65,6 +68,7 @@ def get_new_work_package(self, worker_id: WorkerId) -> None | WorkPackage:
sequences={str(uuid): sequence for uuid, sequence in scheduled_package.package.sequences.items()},
)

@log_time
def get_new_raw_work_package(self, worker_id: WorkerId) -> None | Tuple[RawWorkPackage, ScheduledWorkPackage]:
worker = self._worker_collector.get_worker_by_id(worker_id.id)
scheduled_package = self._work_scheduler.schedule_work_for(worker)
Expand Down
Loading

0 comments on commit caf867d

Please sign in to comment.