Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

infra for snmp support and linkUp/Down and Config change traps #225

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/ax_interface/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

from .mib import MIBTable, MIBMeta
from .socket_io import SocketManager
from .trap import TrapInfra

# how long to wait before forcibly killing background task(s) during the shutdown procedure.
BACKGROUND_WAIT_TIMEOUT = 10 # seconds


class Agent:
def __init__(self, mib_cls, update_frequency, loop):
def __init__(self, mib_cls, update_frequency, loop, trap_handlers):
if not type(mib_cls) is MIBMeta:
raise ValueError("Expected a class with type: {}".format(MIBMeta))

Expand All @@ -19,8 +20,11 @@ def __init__(self, mib_cls, update_frequency, loop):
self.oid_updaters_enabled = asyncio.Event(loop=loop)
self.stopped = asyncio.Event(loop=loop)

# Initialize our Trap infra
self.trap_infra = TrapInfra(loop,trap_handlers)

# Initialize our MIB
self.mib_table = MIBTable(mib_cls, update_frequency)
self.mib_table = MIBTable(mib_cls, update_frequency, self.trap_infra)

# containers
self.socket_mgr = SocketManager(self.mib_table, self.run_enabled, self.loop)
Expand Down Expand Up @@ -54,6 +58,8 @@ async def run_in_event_loop(self):
async def shutdown(self):
# allow the agent to quit
self.run_enabled.clear()
# shutdown trap infra
await self.trap_infra.shutdown()
# close the socket
self.socket_mgr.close()
# wait for the agent to signal back
Expand Down
7 changes: 6 additions & 1 deletion src/ax_interface/mib.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,14 @@ class MIBTable(dict):
Simplistic LUT for Get/GetNext OID. Interprets iterables as keys and implements the same interfaces as dict's.
"""

def __init__(self, mib_cls, update_frequency=DEFAULT_UPDATE_FREQUENCY):
def __init__(self, mib_cls, update_frequency=DEFAULT_UPDATE_FREQUENCY, trap_infra_obj=None):
if type(mib_cls) is not MIBMeta:
raise ValueError("Supplied object is not a MIB class instance.")
super().__init__(getattr(mib_cls, MIBMeta.KEYSTORE))
self.update_frequency = update_frequency
self.updater_instances = getattr(mib_cls, MIBMeta.UPDATERS)
self.prefixes = getattr(mib_cls, MIBMeta.PREFIXES)
self.trap_infra_obj = trap_infra_obj

@staticmethod
def _done_background_task_callback(fut):
Expand All @@ -282,6 +283,10 @@ def start_background_tasks(self, event):
fut = asyncio.ensure_future(updater.start())
fut.add_done_callback(MIBTable._done_background_task_callback)
tasks.append(fut)

if self.trap_infra_obj is not None:
fut = asyncio.ensure_future(self.trap_infra_obj.db_listener())
tasks.append(fut)
return asyncio.gather(*tasks, loop=event._loop)

def _find_parent_prefix(self, item):
Expand Down
23 changes: 17 additions & 6 deletions src/ax_interface/pdu_implementations.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,15 +362,26 @@ def __init__(self, *args, **kwargs):
# header only.


# class NotifyPDU(ContextOptionalPDU):
# """
# https://tools.ietf.org/html/rfc2741#section-6.2.10
# """
# header_type_ = PduTypes.NOTIFY
# TODO: Impl
class NotifyPDU(ContextOptionalPDU):
"""
https://tools.ietf.org/html/rfc2741#section-6.2.10
"""
header_type_ = PduTypes.NOTIFY

def __init__(self, header=None, context=None, payload=None, varBinds=[]):
super().__init__(header=header, context=context, payload=payload)

self.varBinds = varBinds
# Reducing the header length as per RFC 2741
# https://tools.ietf.org/html/rfc2741#section-6.1
payload_len = len(self.encode()) - constants.AGENTX_HEADER_LENGTH
self.header = self.header._replace(payload_length=payload_len)

def encode(self):
ret = super().encode()
for value in self.varBinds:
ret += value.to_bytes(self.header.endianness)
return ret

class PingPDU(ContextOptionalPDU):
"""
Expand Down
3 changes: 2 additions & 1 deletion src/ax_interface/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from .encodings import ObjectIdentifier
from .pdu import PDUHeader, PDUStream
from .pdu_implementations import RegisterPDU, ResponsePDU, OpenPDU

from .trap import TrapInfra

class AgentX(asyncio.Protocol):
"""
Expand Down Expand Up @@ -49,6 +49,7 @@ def opening_handshake(self):

def register_subtrees(self, pdu):
self.session_id = pdu.header.session_id
TrapInfra.protocol_obj = self
logger.info("AgentX session starting with ID: {}".format(self.session_id))

for idx, subtree in enumerate(self.mib_table.prefixes):
Expand Down
179 changes: 179 additions & 0 deletions src/ax_interface/trap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import asyncio
from . import logger, constants
from .mib import ValueType
from .encodings import ObjectIdentifier, ValueRepresentation
from .pdu import PDUHeader
from .pdu_implementations import NotifyPDU
import re
import json
import os

class TrapInfra:
"""
Trap infrastructure's core services are define here.
"""
protocol_obj = None # this will be set in the AgentX protocol class
def __init__(self, loop, trap_handlers):
logger.debug("Init begins for Trap infra")
self.loop = loop
self.redis_instances = dict()
self.db_to_redis_dict = dict()
if trap_handlers is None:
return
self.dbKeyToHandler = dict()
trap_handlers_set = set(trap_handlers)
for t_handler in trap_handlers_set:
t_instance = t_handler()
dbKeys = t_instance.dbKeys
for dbkey in dbKeys:
if dbkey not in self.dbKeyToHandler:
self.dbKeyToHandler[dbkey] = list()
self.dbKeyToHandler[dbkey].append(t_instance)
else:
self.dbKeyToHandler[dbkey].append(t_instance)
t_instance.trap_init()
logger.debug("Init successful for Trap infra")

async def db_listener(self):
"""
Co routine which listens for DB notification events
"""
import aioredis
from aioredis.pubsub import Receiver

logger.debug("starting redis co routine")
logger.info("starting redis DB listener routine")
# Read Config File and setup DB instances
CONFIG_FILE = os.getenv('DB_CONFIG_FILE', "/var/run/redis/sonic-db/database_config.json")
if not os.path.exists(CONFIG_FILE):
raise RuntimeError("[Trap:db_listener - DB config file not found " + str(CONFIG_FILE))
else:
with open(CONFIG_FILE, "r") as config_file:
db_config_data = json.load(config_file)
if not 'INSTANCES' in db_config_data:
raise RuntimeError("[Trap:db_listener - No DB instances found in DB config file")
for instance in db_config_data['INSTANCES']:
entry = db_config_data['INSTANCES'][instance]
if instance not in self.redis_instances:
self.redis_instances[instance] = {"host": entry["hostname"], \
"port": entry["port"], "keyPatterns": [], \
"patternObjs": [], "receiver_handle": None, \
"connection_obj": None \
}
for db in db_config_data['DATABASES']:
entry = db_config_data['DATABASES'][db]
db_id = int(entry["id"])
if db_id not in self.db_to_redis_dict:
if entry["instance"] not in self.redis_instances:
raise RuntimeError("[Trap:db_listener - No DB instance found for " + str(entry["instance"]))
self.db_to_redis_dict[db_id] = self.redis_instances[entry["instance"]]

async def reader(receiver_handle):
logger.debug("Listening for notifications")
async for channel, msg in receiver_handle.iter():
logger.debug("Got {!r} in channel {!r}".format(msg, channel))
self.process_trap(channel,msg)

for instance in self.redis_instances:
address_tuple = (self.redis_instances[instance]['host'], self.redis_instances[instance]['port'])
self.redis_instances[instance]["connection_obj"] = await aioredis.create_redis_pool(address_tuple)
receiver_handle = Receiver(loop=self.loop)
self.redis_instances[instance]["receiver_handle"] = receiver_handle
asyncio.ensure_future(reader(receiver_handle))

for pat in self.dbKeyToHandler.keys():
#Get DB number
db_num = re.match(r'__keyspace@(\d+)__:',pat).group(1)
if db_num is None or db_num == "":
raise RuntimeError("[Trap:db_listener - DB number cannot be determined for key " + str(pat))

db_num = int(db_num)
db_instance = self.db_to_redis_dict[db_num]
db_instance["patternObjs"].append(db_instance["receiver_handle"].pattern(pat))
db_instance["keyPatterns"].append(pat)

for instance in self.redis_instances:
if len(self.redis_instances[instance]["patternObjs"]) == 0:
continue
await self.redis_instances[instance]["connection_obj"].psubscribe(*self.redis_instances[instance]["patternObjs"])

def dispatch_trap(self, varBinds):
"""
Prepare Notify PDU and sends to Master using AgentX protocol
"""
logger.debug("dispatch_trap invoked")
if TrapInfra.protocol_obj is not None:
notifyPDU = NotifyPDU(header=PDUHeader(1, \
constants.PduTypes.NOTIFY, \
PDUHeader.MASK_NEWORK_BYTE_ORDER, 0, \
TrapInfra.protocol_obj.session_id, \
0, 0, 0), varBinds=varBinds)
TrapInfra.protocol_obj.send_pdu(notifyPDU)
logger.debug("processed trap successfully")
else:
logger.warning("Protocol Object is None, cannot process traps")

def process_trap(self, channel, msg):
"""
Invokes trap handlers
"""
db_pattern = channel.name.decode('utf-8')
changed_key = msg[0].decode('utf-8')

for t_instance in self.dbKeyToHandler[db_pattern]:
varbindsDict = t_instance.trap_process(msg, changed_key)
if varbindsDict is None:
continue # no process
assert isinstance(varbindsDict, dict)
assert 'TrapOid' in varbindsDict
assert 'varBinds' in varbindsDict
varbinds = varbindsDict['varBinds']
TrapOid = varbindsDict['TrapOid']
assert isinstance(TrapOid, ObjectIdentifier)
varbindsList = []
# Insert standard SNMP trap
snmpTrapOid = ObjectIdentifier(11, 0, 0, 0, (1, 3, 6, 1, 6, 3, 1, 1, 4, 1, 0))
snmpTrapVarBind = ValueRepresentation(ValueType.OBJECT_IDENTIFIER, 0, snmpTrapOid, TrapOid)
varbindsList.append(snmpTrapVarBind)
if len(varbinds) > 0:
for vb in varbinds:
if not isinstance(vb, ValueRepresentation):
raise RuntimeError("list entry is not of type ValueRepresentation")
else:
varbindsList.append(vb)
else:
raise RuntimeError("Return value must contain atleast one VarBind")
self.dispatch_trap(varbindsList)

async def shutdown(self):
for instance in self.redis_instances:
if len(self.redis_instances[instance]["keyPatterns"]) > 0:
await self.redis_instances[instance]["connection_obj"].punsubscribe(*self.redis_instances[instance]["keyPatterns"])
self.redis_instances[instance]["receiver_handle"].stop()
self.redis_instances[instance]["connection_obj"].close()
await self.redis_instances[instance]["connection_obj"].wait_closed()

class Trap:
"""
Interface for developing Trap handlers
"""
def __init__(self, **kwargs):
self.run_event = asyncio.Event()
assert isinstance(kwargs["dbKeys"], list)
self.dbKeys = kwargs["dbKeys"]

def trap_init(self):
"""
Children may override this method.
"""
logger.info("I am trap_init from infra")

def trap_process(self, dbMessage, changedKey):
"""
Children may override this method.
"""
logger.info("I am trap_process from infra")




11 changes: 7 additions & 4 deletions src/sonic_ax_impl/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import ax_interface
from sonic_ax_impl.mibs import ieee802_1ab, Namespace
from . import logger
from .mibs.ietf import rfc1213, rfc2737, rfc2863, rfc3433, rfc4292, rfc4363
from .mibs.vendor import dell, cisco
from .mibs.ietf import rfc1213, rfc2737, rfc2863, rfc3433, rfc4292, rfc4363, link_up_down_trap
from .mibs.vendor import dell, cisco, broadcom

# Background task update frequency ( in seconds )
DEFAULT_UPDATE_FREQUENCY = 5
Expand Down Expand Up @@ -46,6 +46,8 @@ class SonicMIB(
If SONiC was to create custom MIBEntries, they may be specified here.
"""

# Register Trap handlers
trap_handlers = [link_up_down_trap.linkUpDownTrap, broadcom.enterprise_trap.configChangeTrap]

def shutdown(signame, agent):
# FIXME: If the Agent dies, the background tasks will zombie.
Expand All @@ -61,7 +63,7 @@ def main(update_frequency=None):
Namespace.init_sonic_db_config()

# initialize handler and set update frequency (or use the default)
agent = ax_interface.Agent(SonicMIB, update_frequency or DEFAULT_UPDATE_FREQUENCY, event_loop)
agent = ax_interface.Agent(SonicMIB, update_frequency or DEFAULT_UPDATE_FREQUENCY, event_loop, trap_handlers)

# add "shutdown" signal handlers
# https://docs.python.org/3.5/library/asyncio-eventloop.html#set-signal-handlers-for-sigint-and-sigterm
Expand All @@ -74,7 +76,8 @@ def main(update_frequency=None):
event_loop.run_until_complete(agent.run_in_event_loop())

except Exception:
logger.exception("Uncaught exception in {}".format(__name__))
if shutdown_task is None:
logger.exception("Uncaught exception in {}".format(__name__))
sys.exit(1)
finally:
if shutdown_task is not None:
Expand Down
Loading