Skip to content

Commit

Permalink
Merge branch 'dev' of github.com:chaoss/augur into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
parthsharma2 committed Jun 20, 2019
2 parents d2385ed + 60444ff commit 23f2cfb
Show file tree
Hide file tree
Showing 18 changed files with 727 additions and 282 deletions.
185 changes: 89 additions & 96 deletions augur/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def __init__(self, id, location=None, qualifications=None):
self.given = qualifications[0]['given'][0][0]
self.models = qualifications[0]['models']
print("Worker given: " + self.id)
self.queue = Queue()
self.user_queue = Queue()
self.maintain_queue = Queue()

class Job():
""" Defines the instructions to give to a worker about the data needed to be inserted in our database
Expand Down Expand Up @@ -70,9 +71,9 @@ def __init__(self):
self.connected_workers = {}
self.created_jobs = []
logging.info("Broker spawned and is ready to accept tasks.")
# process = multiprocessing.Process(target=self.main)
# process.daemon = True
# process.start()
process = multiprocessing.Process(target=self.main)
process.daemon = True
process.start()


def add_new_worker(self, worker):
Expand All @@ -95,57 +96,48 @@ def create_job(self, job_received):
# print(model in self.connected_workers['com.augurlabs.core.github_worker'].models, model, worker.models)
compatible_workers = [worker for worker in self.connected_workers.values() if worker.given == given and model in worker.models]

for worker in compatible_workers:
if compatible_workers is not None:
for worker in compatible_workers:

""" PRIORITY KEY:
0 - user/endpoint created job ("UPDATE" job type)
1 - a maintained job created by the housekeeper ("MAINTAIN" job type)
"""
if job_received['job_type'] == "UPDATE":
self.connected_workers[worker.id].user_queue.put(job_received)
elif job_received['job_type'] == "MAINTAIN":
self.connected_workers[worker.id].maintain_queue.put(job_received)

if job_received['job_type'] == "UPDATE":
self.connected_workers[worker.id].queue.put(job_received)
elif job_received['job_type'] == "MAINTAIN":
self.connected_workers[worker.id].queue.put(job_received)
print(worker.id + "'s queue after adding the job: ")
print(dump_queue(self.connected_workers[worker.id].queue))

print(worker.id + "'s queue after adding the job: ")
print(dump_queue(self.connected_workers[worker.id].queue))

requests.post(worker.location + '/AUGWOP/task', json=job_received)
requests.post(worker.location + '/AUGWOP/task', json=job_received)
# Otherwise, let the frontend know that the request can't be served
else:
logging.info(f"Augur does not have knowledge of any workers that are capable of handing the request: " + str(job_received) + ". \nPlease install a worker that can serve requests given `{given}`")
# frontend.send_multipart([client, b"", b"NO-WORKERS"])

def completed_job(self, job):
completed_job = self.connected_workers[job['worker_id']].queue.get()
logging.info("Job completed: ", job)
if job['action'] == "UPDATE":
completed_job = self.connected_workers[job['worker_id']].queue.get()
elif job['action'] == "MAINTAIN":
logging.info("Job completed: ", job)

def main(self):
"""Load balancer main loop."""

# Prepare context and sockets

context = zmq.Context.instance()
frontend = context.socket(zmq.ROUTER)
frontend.bind("ipc://frontend.ipc")
backend = context.socket(zmq.ROUTER)
backend.bind("ipc://backend.ipc")
# backend.bind("tcp://*:5558")

# Start background tasks
def start(task, *args):
process = multiprocessing.Process(target=task, args=args)
process.daemon = True
process.start()

# for i in range(NBR_WORKERS):
# start(github_worker_task, i)
# start(facade_worker_task, i)
# for i in range(NBR_CLIENTS):
# start(client_git_url_task, i)
# start(client_owner_repo_task, i)

# Initialize main loop state
count = NBR_CLIENTS
workers = []
jobs = {}
client_job_ids = {}
# workers = []
# jobs = {}
# client_job_ids = {}
poller = zmq.Poller()

# Only poll for requests from backend
poller.register(backend, zmq.POLLIN)

Expand All @@ -160,8 +152,8 @@ def start(task, *args):

# Identify the worker and the client
logging.info("Broker is waiting for a new message...\n")
worker = backend.recv()
logging.info("Broker recieved a new message.")
client_id = backend.recv().decode('ascii')
logging.info("Broker recieved a new message.\n")
logging.info("Message sender: " + str(worker))
request = backend.recv_multipart()
logging.info("Message request: " + str(request))
Expand All @@ -173,46 +165,47 @@ def start(task, *args):

# delimiter = request[0].decode('ascii')#.split(" ")
logging.info("Parsed message: " + str(message))
logging.info("Client: " + str(client))
action = message[0]
job_received = json.loads(message[1])
# logging.info("Client: " + str(client))

# If there are no workers currently available
# if not workers:
#listen on the frontend now that a worker is available (we just found one above)
# poller.register(frontend, zmq.POLLIN)

if 'HELLO' == message[0]:
spec = json.loads(message[1])

# Create a new augur worker
augur_worker = AugurWorker(ID=worker,
given=spec['qualifications'][0]['given'][0],
models=spec['qualifications'][0]['models'])
# If the client's message is UPDATE, it is a message to maintain a model
# from the housekeeper or update from frontend
if action == "UPDATE" or action == "MAINTAIN":
job_received['action'] = action
create_job(job_received)
# Send to the client that made the request the contents of the reply
# backend.send_multipart([client, b"", reply])

# Add new worker to the list of available workers
self.connected_workers.append(augur_worker)

# If the client's message is not READY and there are more than 3 parts to it, then that mean's a reply to a request
if client != b"READY" and len(request) > 3:
# Identify the reply
delimiter, reply = request[3:]
# # If the client's message is not READY and there are more than 3 parts to it, then that mean's a reply to a request
# if client != b"READY" and len(request) > 3:
# # Identify the reply
# delimiter, reply = request[3:]

if 'DONE' == reply.decode('ascii'):
job_items = list(jobs.items())
job_id = next(job[0] for job in job_items if worker.decode('ascii') in list(job[1].keys()))
# if 'DONE' == reply.decode('ascii'):
# job_items = list(jobs.items())
# job_id = next(job[0] for job in job_items if worker.decode('ascii') in list(job[1].keys()))

jobs[job_id][worker.decode('ascii')] = 'complete'
# self.connected_workers[job['worker_id']].queue.get()

if len([job for job in jobs[job_id].values()]) == len([job for job in jobs[job_id].values() if job == 'complete']):
logging.info(f"Job {job_id} is finished")
frontend.send_multipart([client_job_ids[job_id], b"", b"DONE"])
else:
logging.info(f"Job {job_id} is still in progress")
# if len([job for job in jobs[job_id].values()]) == len([job for job in jobs[job_id].values() if job == 'complete']):
# logging.info(f"Job {job_id} is finished")
# frontend.send_multipart([client_job_ids[job_id], b"", b"DONE"])
# else:
# logging.info(f"Job {job_id} is still in progress")

# Send to the client that made the request the contents of the reply
frontend.send_multipart([client, b"", reply])
# # Send to the client that made the request the contents of the reply
# frontend.send_multipart([client, b"", reply])

# Note that one of the requests has been served
count -= 1
# # Note that one of the requests has been served
# count -= 1

# # If there are no more requests to be served, terminate
# if not count:
Expand All @@ -221,39 +214,39 @@ def start(task, *args):


# If there's activity on the frontend
if frontend in sockets:
# Get next client request
msg = frontend.recv_multipart()
client = msg[0]
delimiter = msg[1]
request = msg[2]

# Identify given from request
given = list(json.loads((request.decode('ascii')).split(" ")[1])['given'])[0]

# Find workers that can process that given
compatible_workers = [worker for worker in workers if worker.given[1] == given]

# If any exist, give each one a copy of the request
if workers is not None:
job_id = random.randint(0, 100)
jobs[job_id] = {}
client_job_ids[job_id] = client
logging.info(f"Created job {job_id} for {client.decode('ascii')}")
for worker in compatible_workers:
# Send to the backend the worker to use, which client requested, and the message from the client
#logging.info(f"{request.decode('ascii')} is being routed to {worker.ID.decode('ascii')}")
backend.send_multipart([worker.ID, b"", client, b"", request])

# Add the job to the list of current jobs
jobs[job_id][worker.ID.decode('ascii')] = 'working'

# Otherwise, let the frontend know that the request can't be served
else:
logging.warning(f"Augur does not have knowledge of any workers that are capable of handing the request {request.decode('ascii')}. \nPlease install a worker that can serve requests given `{given}`")
frontend.send_multipart([client, b"", b"NO-WORKERS"])
# else:
# logging.info("TIMEOUT")
# if frontend in sockets:
# # Get next client request
# msg = frontend.recv_multipart()
# client = msg[0]
# delimiter = msg[1]
# request = msg[2]

# # Identify given from request
# given = list(json.loads((request.decode('ascii')).split(" ")[1])['given'])[0]

# # Find workers that can process that given
# compatible_workers = [worker for worker in workers if worker.given[1] == given]

# # If any exist, give each one a copy of the request
# if workers is not None:
# job_id = random.randint(0, 100)
# jobs[job_id] = {}
# client_job_ids[job_id] = client
# logging.info(f"Created job {job_id} for {client.decode('ascii')}")
# for worker in compatible_workers:
# # Send to the backend the worker to use, which client requested, and the message from the client
# #logging.info(f"{request.decode('ascii')} is being routed to {worker.ID.decode('ascii')}")
# backend.send_multipart([worker.ID, b"", client, b"", request])

# # Add the job to the list of current jobs
# jobs[job_id][worker.ID.decode('ascii')] = 'working'

# # Otherwise, let the frontend know that the request can't be served
# else:
# logging.warning(f"Augur does not have knowledge of any workers that are capable of handing the request {request.decode('ascii')}. \nPlease install a worker that can serve requests given `{given}`")
# frontend.send_multipart([client, b"", b"NO-WORKERS"])
# # else:
# # logging.info("TIMEOUT")

# Clean up
backend.close()
Expand Down
7 changes: 4 additions & 3 deletions augur/broker/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def job():
logging.info("Recieved a new job to distribute for model: " + job['models'][0])
logging.info(job['given'])
broker.create_job(job)
# return jsonify({"job": job})
return jsonify({"status": "success"})

@server.app.route('/{}/workers'.format(server.api_version), methods=['POST'])
def worker():
Expand All @@ -26,10 +26,11 @@ def worker():
worker = request.json
logging.info("Recieved HELLO message from worker: " + worker['id'])
broker.add_new_worker(worker)
# return jsonify({"status": "success"})
return jsonify({"status": "success"})

@server.app.route('/{}/completed_task'.format(server.api_version), methods=['POST'])
def sync_queue():
job = request.json
logging.info("Message recieved that worker " + job['worker_id'] + " completed task: " + str(job))
broker.completed_job(job)
broker.completed_job(job)
return jsonify({"status": "success"})
97 changes: 73 additions & 24 deletions augur/datasources/augur_db/augur_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -943,40 +943,89 @@ def downloaded_repos(self):

return results

@annotate(tag='rg-closed-issues-count')
def rg_open_issues_count(self, repo_group_id):
@annotate(tag='closed-issues-count')
def open_issues_count(self, repo_group_id, repo_id=None):
"""
Returns number of lines changed per author per day
:param repo_url: the repository's URL
"""
openIssueCountSQL = s.sql.text("""
SELECT rg_name, count(issue_id) AS open_count, date_trunc('week', issues.created_at) AS DATE
FROM issues, repo, repo_groups
WHERE issue_state = 'open'
AND repo.repo_id = issues.repo_id
AND repo.repo_group_id = repo_groups.repo_group_id
GROUP BY date, repo_groups.rg_name
ORDER BY date
""")
results = pd.read_sql(openIssueCountSQL, self.db)#, params={"rg_id": '{}'.format(rg_id)})
return results
if not repo_id:
openIssueCountSQL = s.sql.text("""
SELECT rg_name, count(issue_id) AS open_count, date_trunc('week', issues.created_at) AS DATE
FROM issues, repo, repo_groups
WHERE issue_state = 'open'
AND issues.repo_id IN (SELECT repo_id FROM repo WHERE repo_group_id = :repo_group_id)
AND repo.repo_id = issues.repo_id
AND repo.repo_group_id = repo_groups.repo_group_id
GROUP BY date, repo_groups.rg_name
ORDER BY date
""")
results = pd.read_sql(openIssueCountSQL, self.db, params={'repo_group_id': repo_group_id})
return results
else:
openIssueCountSQL = s.sql.text("""
SELECT repo.repo_id, count(issue_id) AS open_count, date_trunc('week', issues.created_at) AS DATE
FROM issues, repo, repo_groups
WHERE issue_state = 'open'
AND issues.repo_id = :repo_id
AND repo.repo_id = issues.repo_id
AND repo.repo_group_id = repo_groups.repo_group_id
GROUP BY date, repo.repo_id
ORDER BY date
""")
results = pd.read_sql(openIssueCountSQL, self.db, params={'repo_id': repo_id})
return results


@annotate(tag='rg-closed-issues-count')
def rg_closed_issues_count(self, repo_group_id):
@annotate(tag='closed-issues-count')
def closed_issues_count(self, repo_group_id, repo_id=None):
"""
Returns number of lines changed per author per day
:param repo_url: the repository's URL
"""
closedIssueCountSQL = s.sql.text("""
SELECT rg_name, count(issue_id) AS closed_count, date_trunc('week', issues.created_at) AS DATE
FROM issues, repo, repo_groups
WHERE issue_state = 'closed'
AND repo.repo_id = issues.repo_id
AND repo.repo_group_id = repo_groups.repo_group_id
GROUP BY date, repo_groups.rg_name
ORDER BY date
if not repo_id:
closedIssueCountSQL = s.sql.text("""
SELECT rg_name, count(issue_id) AS closed_count, date_trunc('week', issues.created_at) AS DATE
FROM issues, repo, repo_groups
WHERE issue_state = 'closed'
AND issues.repo_id IN (SELECT repo_id FROM repo WHERE repo_group_id = :repo_group_id)
AND repo.repo_id = issues.repo_id
AND repo.repo_group_id = repo_groups.repo_group_id
GROUP BY date, repo_groups.rg_name
ORDER BY date
""")
results = pd.read_sql(closedIssueCountSQL, self.db, params={'repo_group_id': repo_group_id})
return results
else:
closedIssueCountSQL = s.sql.text("""
SELECT repo.repo_id, count(issue_id) AS closed_count, date_trunc('week', issues.created_at) AS DATE
FROM issues, repo, repo_groups
WHERE issue_state = 'closed'
AND issues.repo_id = :repo_id
AND repo.repo_id = issues.repo_id
AND repo.repo_group_id = repo_groups.repo_group_id
GROUP BY date, repo.repo_id
ORDER BY date
""")
results = pd.read_sql(closedIssueCountSQL, self.db, params={'repo_id': repo_id})
return results

@annotate(tag='get-repo')
def get_repo(self, owner, repo):
"""
Returns repo id and repo group id by owner and repo
:param owner: the owner of the repo
:param repo: the name of the repo
"""
getRepoSQL = s.sql.text("""
SELECT repo_id, repo_group_id
FROM repo
WHERE repo_name = :repo AND repo_path LIKE :owner
""")
results = pd.read_sql(closedIssueCountSQL, self.db)#, params={"rg_id": '{}'.format(rg_id)})

results = pd.read_sql(getRepoSQL, self.db, params={'owner': '%{}_'.format(owner), 'repo': repo,})

return results
Loading

0 comments on commit 23f2cfb

Please sign in to comment.