Skip to content
This repository has been archived by the owner on Apr 22, 2024. It is now read-only.

Commit

Permalink
chore: cleanup from code review
Browse files Browse the repository at this point in the history
  • Loading branch information
Shawn Sarwar committed May 4, 2018
1 parent a7e1334 commit 5434093
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 23 deletions.
1 change: 0 additions & 1 deletion aether-producer/conf/pip/primary-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ aether.client
kafka
psycopg2-binary
requests
spavro

# test libraries
coverage
Expand Down
2 changes: 1 addition & 1 deletion aether-producer/conf/pip/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ pycodestyle==2.3.1
pyflakes==1.6.0
requests==2.18.4
six==1.11.0
spavro==1.1.10
spavro==1.1.15
urllib3==1.22
33 changes: 15 additions & 18 deletions aether-producer/producer/aether_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@
# specific language governing permissions and limitations
# under the License.

import json
import psycopg2
import avro.schema
import io
import ast
import io
import json
import os
import psycopg2
import signal
import avro.schema
import sys

from avro.io import Validate
from avro.io import DatumWriter
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
from avro.io import Validate


from kafka import KafkaProducer, KafkaConsumer
Expand Down Expand Up @@ -111,6 +111,9 @@ def count_since(offset=None, topic=None):
offset = ""
with psycopg2.connect(**_settings.postgres_connection_info) as conn:
cursor = conn.cursor(cursor_factory=DictCursor)
# We'd originally used a 'count(CASE WHEN e.modified >'
# That broke mysteriously and borked everything so we're using a less optimal call.
# Benchmarks show it good enough for now but probably should be fixed # TODO
count_str = '''
SELECT
e.id,
Expand All @@ -119,7 +122,7 @@ def count_since(offset=None, topic=None):
ps.id as project_schema_id,
s.name as schema_name,
s.id as schema_id
FROM kernel_entity e
FROM kernel_entity e
inner join kernel_projectschema ps on e.projectschema_id = ps.id
inner join kernel_schema s on ps.schema_id = s.id
WHERE e.modified > '%s'
Expand All @@ -133,8 +136,6 @@ def count_since(offset=None, topic=None):
return sum([1 for row in cursor])




def get_entities(offset = None, max_size=1000): # TODO implement batch pull by topic in Stream
if not offset:
offset = ""
Expand All @@ -152,21 +153,18 @@ def get_entities(offset = None, max_size=1000): # TODO implement batch pull by
s.name as schema_name,
s.id as schema_id,
s.revision as schema_revision
from kernel_entity e
FROM kernel_entity e
inner join kernel_projectschema ps on e.projectschema_id = ps.id
inner join kernel_schema s on ps.schema_id = s.id
WHERE e.modified > '%s'
ORDER BY e.modified ASC;
''' % (offset)
ORDER BY e.modified ASC
LIMIT %d;
''' % (offset, max_size)
cursor.execute(query_str)

for x, row in enumerate(cursor):
if x >= max_size - 1:
raise StopIteration
for row in cursor:
yield {key : row[key] for key in row.keys()}



class KafkaStream(object):
def __init__(self, topic, kernel):
self.topic = topic
Expand Down Expand Up @@ -210,7 +208,6 @@ def get_avro(self):
#Gets avro schema used for encoding messages
#TODO Fix issue with json coming from API Client being single quoted
definition = ast.literal_eval(str(self.kernel.Resource.Schema.get(self.topic).definition))
#self.schema = spavro.schema.parse(json.dumps(definition))
self.schema = avro.schema.Parse(json.dumps(definition))


Expand Down
19 changes: 19 additions & 0 deletions aether-utils/aether-saladbar/aether/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
# Copyright (C) 2018 by eHealth Africa : http://www.eHealthAfrica.org
#
# See the NOTICE file distributed with this work for additional information
# regarding copyright ownership.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on anx
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


try:
__import__('pkg_resources').declare_namespace(__name__)
except ImportError:
Expand Down
17 changes: 17 additions & 0 deletions aether-utils/aether-saladbar/aether/saladbar/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright (C) 2018 by eHealth Africa : http://www.eHealthAfrica.org
#
# See the NOTICE file distributed with this work for additional information
# regarding copyright ownership.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on anx
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
17 changes: 17 additions & 0 deletions aether-utils/aether-saladbar/aether/saladbar/saladbar/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright (C) 2018 by eHealth Africa : http://www.eHealthAfrica.org
#
# See the NOTICE file distributed with this work for additional information
# regarding copyright ownership.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on anx
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
3 changes: 0 additions & 3 deletions aether-utils/aether-saladbar/aether/saladbar/wizard.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
from aether.client import KernelClient

import aether.saladbar.library.library_utils as library_utils
# from library import library_utils
import aether.saladbar.saladbar.parsers as Parsers
# from saladbar import parsers as Parsers
# from saladbar import salad_handler as salad
import aether.saladbar.saladbar.salad_handler as salad

HERE = (os.path.dirname(os.path.realpath(__file__)))
Expand Down

0 comments on commit 5434093

Please sign in to comment.