Skip to content

Commit

Permalink
Updated order funct for SQLHistorian brought over base. #68
Browse files Browse the repository at this point in the history
  • Loading branch information
craig8 committed Jun 19, 2015
1 parent c1c0d12 commit b4d8a36
Show file tree
Hide file tree
Showing 5 changed files with 752 additions and 29 deletions.
7 changes: 7 additions & 0 deletions Agents/PlatformHistorianAgent/sqlite-historian.agent
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"agent": {
"exec": "sqlite_historianagent-0.1-py2.7.egg --config \"%c\" --sub \"%s\" --pub \"%p\""
},
"agentid": "sqlite_historian",
"db": "~/.volttron/data/platform.historian.sqlite"
}
11 changes: 11 additions & 0 deletions Agents/SQLHistorianAgent/config.sqlite.platform.historian
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"agentid": "sqlhistorian-sqlite",
"identity": "platform.historian",
"connection": {
"type": "sqlite",
"params": {
"database": "~/.volttron/data/historian.sqlite",
"detect_types": "sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES"
}
}
}
64 changes: 37 additions & 27 deletions Agents/SQLHistorianAgent/sqlhistorian/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ def sqlhistorian(config_path, **kwargs):
identity = config.get('identity', None)

if databaseType == 'sqlite':
from sqlitefuncts import prepare, connect, query_topics
from sqlitefuncts import (prepare, connect, query_topics, insert_topic,
insert_data)

class SQLHistorian(BaseHistorianAgent, BaseQueryHistorianAgent):
'''This is a simple example of a historian agent that writes stuff
Expand All @@ -102,52 +103,52 @@ class SQLHistorian(BaseHistorianAgent, BaseQueryHistorianAgent):
@Core.receiver("onstart")
def starting(self, sender, **kwargs):

# Check to see if the platform agent is available, if it isn't then
# subscribe to the /platform topic to be notified when the platform
# agent becomes available.
try:
ping = self.vip.ping('platform.agent', 'awake?').get(timeout=3)
self.vip.rpc.call('platform.agent', 'register_service',
self.core.identity).get(timeout=3)
except Unreachable:
_log.debug('Could not register historian service')
finally:
self.vip.pubsub.subscribe('pubsub', '/platform', self.__platform)
if self.core.identity == 'platform.historian':
# Check to see if the platform agent is available, if it isn't then
# subscribe to the /platform topic to be notified when the platform
# agent becomes available.
try:
ping = self.vip.ping('platform.agent',
'awake?').get(timeout=3)
self.vip.rpc.call('platform.agent', 'register_service',
self.core.identity).get(timeout=3)
except Unreachable:
_log.debug('Could not register historian service')
finally:
self.vip.pubsub.subscribe('pubsub', '/platform',
self.__platform)
_log.debug("Listening to /platform")

def __platform(self, peer, sender, bus, topic, headers, message):
_log.debug('Platform is now: ', message)
if message == 'available':
if message == 'available' and \
self.core.identity == 'platform.historian':
gevent.spawn(self.vip.rpc.call, 'platform.agent', 'register_service',
self.core.identity)
gevent.sleep(0)

def publish_to_historian(self, to_publish_list):
#self.report_all_published()
c = self.conn.cursor()
#print 'Publish info'
_log.debug("publish_to_historian number of items: {}"
.format(len(to_publish_list)))
for x in to_publish_list:
ts = x['timestamp']
topic = x['topic']
value = x['value']

topic_id = self.topics.get(topic)
topic_id = self._topic_map.get(topic, None)

if topic_id is None:
c.execute('''INSERT INTO topics values (?,?)''', (None, topic))
c.execute('''SELECT last_insert_rowid()''')
row = c.fetchone()
row = insert_topic = ins
row = insert_topic(topic, self.conn, False)
topic_id = row[0]
self.topics[topic] = topic_id

c.execute('''INSERT OR REPLACE INTO data values(?, ?, ?)''',
(ts,topic_id,jsonapi.dumps(value)))
insert_data(ts,topic_id, value, self.conn)

#pprint(x)
print('published {} data values:'.format(len(to_publish_list)))

self.conn.commit()
c.close()

self.report_all_published()

def query_topic_list(self):
Expand All @@ -157,7 +158,8 @@ def query_topic_list(self):
# do quer on db and return results.
return []

def query_historian(self, topic, start=None, end=None, skip=0, count=None):
def query_historian(self, topic, start=None, end=None, skip=0,
count=None, order="FIRST_TO_LAST"):
"""This function should return the results of a query in the form:
{"values": [(timestamp1, value1), (timestamp2, value2), ...],
"metadata": {"key1": value1, "key2": value2, ...}}
Expand All @@ -167,7 +169,7 @@ def query_historian(self, topic, start=None, end=None, skip=0, count=None):
query = '''SELECT data.ts, data.value_string
FROM data, topics
{where}
ORDER BY data.ts
{order_by}
{limit}
{offset}'''

Expand All @@ -184,6 +186,10 @@ def query_historian(self, topic, start=None, end=None, skip=0, count=None):

where_statement = ' AND '.join(where_clauses)

order_by = 'ORDER BY data.ts ASC'
if order == 'LAST_TO_FIRST':
order_by = ' ORDER BY data.ts DESC'

#can't have an offset without a limit
# -1 = no limit and allows the user to
# provied just an offset
Expand All @@ -198,9 +204,12 @@ def query_historian(self, topic, start=None, end=None, skip=0, count=None):
offset_statement = 'OFFSET ?'
args.append(skip)

_log.debug("About to do real_query")

real_query = query.format(where=where_statement,
limit=limit_statement,
offset=offset_statement)
offset=offset_statement,
order_by=order_by)

print(real_query)
print(args)
Expand All @@ -211,6 +220,7 @@ def query_historian(self, topic, start=None, end=None, skip=0, count=None):

return {'values':values}


def historian_setup(self):
prepare(**connection['params'])
self.conn = connect()
Expand Down
Loading

0 comments on commit b4d8a36

Please sign in to comment.