From 38153f16bf76d3d521ae4305a394533ac1a645e9 Mon Sep 17 00:00:00 2001 From: "jianjun.dong" Date: Wed, 15 Apr 2020 01:36:40 -0700 Subject: [PATCH 1/9] MCLAG sync FDB MAC from STATE_DB --- mclagsyncd/mclag.h | 100 ++--- mclagsyncd/mclaglink.cpp | 827 ++++++++++++++++++++------------------ mclagsyncd/mclaglink.h | 136 ++++--- mclagsyncd/mclagsyncd.cpp | 111 ++--- 4 files changed, 616 insertions(+), 558 deletions(-) diff --git a/mclagsyncd/mclag.h b/mclagsyncd/mclag.h index 066b919862..722633ff8f 100644 --- a/mclagsyncd/mclag.h +++ b/mclagsyncd/mclag.h @@ -20,14 +20,20 @@ #ifndef _MCLAG_H #define _MCLAG_H -#define MCLAG_DEFAULT_IP 0x7f000006 +#define MCLAG_DEFAULT_IP 0x7f000006 -enum MCLAG_FDB_OP_TYPE { - MCLAG_FDB_OPER_ADD =1, +#define MCLAG_ETHER_ADDR_STR_LEN 18 +#define MCLAG_MAX_L_PORT_NAME 20 + +enum MCLAG_FDB_OP_TYPE +{ + MCLAG_FDB_OPER_NONE = 0, + MCLAG_FDB_OPER_ADD = 1, MCLAG_FDB_OPER_DEL = 2, }; -enum MCLAG_FDB_TYPE { +enum MCLAG_FDB_TYPE +{ MCLAG_FDB_TYPE_STATIC = 1, MCLAG_FDB_TYPE_DYNAMIC = 2, }; @@ -40,59 +46,64 @@ enum MCLAG_FDB_TYPE { /* * Largest message that can be sent to or received from the MCLAG. */ +#define MSG_BATCH_SIZE 32 #define MCLAG_MAX_MSG_LEN 4096 #define MCLAG_MAX_SEND_MSG_LEN 4096 -typedef struct mclag_msg_hdr_t_ { - /* +typedef struct mclag_msg_hdr_t_ +{ + /* * Protocol version. */ uint8_t version; - /* + /* * Type of message, see below. */ uint8_t msg_type; - /* + /* * Length of entire message, including the header. */ uint16_t msg_len; -}mclag_msg_hdr_t; +} mclag_msg_hdr_t; #define MCLAG_PROTO_VERSION 1 -#define MCLAG_MSG_HDR_LEN (sizeof (mclag_msg_hdr_t)) +#define MCLAG_MSG_HDR_LEN (sizeof(mclag_msg_hdr_t)) -/*syncd send msg type to iccpd*/ -typedef enum mclag_syncd_msg_type_e_ { +/* syncd send msg type to iccpd */ +typedef enum mclag_syncd_msg_type_e_ +{ MCLAG_SYNCD_MSG_TYPE_NONE = 0, MCLAG_SYNCD_MSG_TYPE_FDB_OPERATION = 1 -}mclag_syncd_msg_type_e; +} mclag_syncd_msg_type_e; -/*iccpd send msg type to syncd*/ -typedef enum mclag_msg_type_e_ { +/* iccpd send msg type to syncd */ +typedef enum mclag_msg_type_e_ +{ MCLAG_MSG_TYPE_NONE = 0, MCLAG_MSG_TYPE_PORT_ISOLATE = 1, MCLAG_MSG_TYPE_PORT_MAC_LEARN_MODE = 2, MCLAG_MSG_TYPE_FLUSH_FDB = 3, MCLAG_MSG_TYPE_SET_INTF_MAC = 4, MCLAG_MSG_TYPE_SET_FDB = 5, - MCLAG_MSG_TYPE_FLUSH_FDB_BY_PORT = 6, - MCLAG_MSG_TYPE_GET_FDB_CHANGES = 20 -}mclag_msg_type_e; + MCLAG_MSG_TYPE_FLUSH_FDB_BY_PORT = 6 +} mclag_msg_type_e; -typedef struct mclag_sub_option_hdr_t_ { +typedef struct mclag_sub_option_hdr_t_ +{ uint8_t op_type; - /* + /* * Length of option value, not including the header. */ uint16_t op_len; -}mclag_sub_option_hdr_t; +} mclag_sub_option_hdr_t; -#define MCLAG_SUB_OPTION_HDR_LEN (sizeof (mclag_sub_option_hdr_t)) +#define MCLAG_SUB_OPTION_HDR_LEN (sizeof(mclag_sub_option_hdr_t)) -typedef enum mclag_sub_option_type_e_ { +typedef enum mclag_sub_option_type_e_ +{ MCLAG_SUB_OPTION_TYPE_NONE = 0, MCLAG_SUB_OPTION_TYPE_ISOLATE_SRC = 1, MCLAG_SUB_OPTION_TYPE_ISOLATE_DST = 2, @@ -102,8 +113,7 @@ typedef enum mclag_sub_option_type_e_ { MCLAG_SUB_OPTION_TYPE_SET_MAC_DST = 6 } mclag_sub_option_type_e; -static inline size_t -mclag_msg_len (const mclag_msg_hdr_t *hdr) +static inline size_t mclag_msg_len(const mclag_msg_hdr_t *hdr) { return hdr->msg_len; } @@ -111,10 +121,9 @@ mclag_msg_len (const mclag_msg_hdr_t *hdr) /* * mclag_msg_data_len */ -static inline size_t -mclag_msg_data_len (const mclag_msg_hdr_t *hdr) +static inline size_t mclag_msg_data_len(const mclag_msg_hdr_t *hdr) { - return (mclag_msg_len (hdr) - MCLAG_MSG_HDR_LEN); + return (mclag_msg_len(hdr) - MCLAG_MSG_HDR_LEN); } /* @@ -122,20 +131,19 @@ mclag_msg_data_len (const mclag_msg_hdr_t *hdr) * * Returns TRUE if a message header looks well-formed. */ -static inline int -mclag_msg_hdr_ok (const mclag_msg_hdr_t *hdr) +static inline int mclag_msg_hdr_ok(const mclag_msg_hdr_t *hdr) { - size_t msg_len; + size_t msg_len; - if (hdr->msg_type == MCLAG_MSG_TYPE_NONE) - return 0; + if (hdr->msg_type == MCLAG_MSG_TYPE_NONE) + return 0; - msg_len = mclag_msg_len (hdr); + msg_len = mclag_msg_len(hdr); - if (msg_len < MCLAG_MSG_HDR_LEN || msg_len > MCLAG_MAX_MSG_LEN) - return 0; + if (msg_len < MCLAG_MSG_HDR_LEN || msg_len > MCLAG_MAX_MSG_LEN) + return 0; - return 1; + return 1; } /* @@ -145,20 +153,18 @@ mclag_msg_hdr_ok (const mclag_msg_hdr_t *hdr) * * @param len The length in bytes from 'hdr' to the end of the buffer. */ -static inline int -mclag_msg_ok (const mclag_msg_hdr_t *hdr, size_t len) +static inline int mclag_msg_ok(const mclag_msg_hdr_t *hdr, size_t len) { - if (len < MCLAG_MSG_HDR_LEN) - return 0; + if (len < MCLAG_MSG_HDR_LEN) + return 0; - if (!mclag_msg_hdr_ok (hdr)) - return 0; + if (!mclag_msg_hdr_ok(hdr)) + return 0; - if (mclag_msg_len (hdr) > len) - return 0; + if (mclag_msg_len(hdr) > len) + return 0; - return 1; + return 1; } - #endif diff --git a/mclagsyncd/mclaglink.cpp b/mclagsyncd/mclaglink.cpp index 369dff08f0..ecd03aeb7f 100644 --- a/mclagsyncd/mclaglink.cpp +++ b/mclagsyncd/mclaglink.cpp @@ -22,208 +22,63 @@ #include #include #include +#include #include #include "logger.h" #include "netmsg.h" #include "netdispatcher.h" #include "swss/notificationproducer.h" -#include "mclagsyncd/mclaglink.h" -#include "mclagsyncd/mclag.h" -#include +#include "mclaglink.h" +#include "mclag.h" #include +#include "schema.h" +#include "tokenize.h" using namespace swss; using namespace std; -void MclagLink::getOidToPortNameMap(std::unordered_map & port_map) -{ - std::unordered_map::iterator it; - auto hash = p_redisClient_to_counters->hgetall("COUNTERS_PORT_NAME_MAP"); - - for (it = hash.begin(); it != hash.end(); ++it) - port_map.insert(pair(it->second, it->first)); - - return; -} - -void MclagLink::getBridgePortIdToAttrPortIdMap(std::map *oid_map) -{ - std::string bridge_port_id; - size_t pos1 = 0; - - std::unordered_map::iterator attr_port_id; - - auto keys = p_redisClient_to_asic->keys("ASIC_STATE:SAI_OBJECT_TYPE_BRIDGE_PORT:*"); - - for (auto& key : keys) - { - pos1 = key.find("oid:", 0); - bridge_port_id = key.substr(pos1); - - auto hash = p_redisClient_to_asic->hgetall(key); - attr_port_id = hash.find("SAI_BRIDGE_PORT_ATTR_PORT_ID"); - if (attr_port_id == hash.end()) - { - attr_port_id = hash.find("SAI_BRIDGE_PORT_ATTR_TUNNEL_ID"); - if (attr_port_id == hash.end()) - continue; - } - - oid_map->insert(pair(bridge_port_id, attr_port_id->second)); - } - - return; -} - -void MclagLink::getVidByBvid(std::string &bvid, std::string &vlanid) -{ - std::unordered_map::iterator attr_vlan_id; - std::string pre = "ASIC_STATE:SAI_OBJECT_TYPE_VLAN:"; - std::string key = pre + bvid; - - auto hash = p_redisClient_to_asic->hgetall(key.c_str()); - - attr_vlan_id = hash.find("SAI_VLAN_ATTR_VLAN_ID"); - if (attr_vlan_id == hash.end()) - return; - - vlanid = attr_vlan_id->second; - return; -} - -void MclagLink::getFdbSet(std::set *fdb_set) -{ - string bvid; - string bri_port_id; - string port_name; - string mac; - string type; - string vlanid; - int vid; - size_t pos1 = 0; - size_t pos2 = 0; - std::unordered_map oid_to_portname_map; - std::map brPortId_to_attrPortId_map; - std::unordered_map::iterator type_it; - std::unordered_map::iterator brPortId_it; - std::map::iterator brPortId_to_attrPortId_it; - std::unordered_map::iterator oid_to_portName_it; - - auto keys = p_redisClient_to_asic->keys("ASIC_STATE:SAI_OBJECT_TYPE_FDB_ENTRY:*"); - - for (auto& key : keys) - { - /*get vid*/ - pos1 = key.find("vlan", 0); - if (pos1 != key.npos) - { - pos1 = pos1 + 7; - pos2 = key.find(",", pos1) - 2; - vlanid = key.substr(pos1, pos2 - pos1 + 1); - } - else - { - pos1 = key.find("oid:", 0); - pos2 = key.find(",", 0) - 2; - bvid = key.substr(pos1, pos2 - pos1 + 1); - getVidByBvid(bvid, vlanid); - } - - vid = atoi(vlanid.c_str()); - /*get mac*/ - pos1 = key.find("mac", 0) + 6; - pos2 = key.find(",", pos1) - 2; - mac = key.substr(pos1, pos2 - pos1 + 1); - - /*get type*/ - auto hash = p_redisClient_to_asic->hgetall(key); - type_it = hash.find("SAI_FDB_ENTRY_ATTR_TYPE"); - if (type_it == hash.end()) - { - continue; - } - - if (memcmp(type_it->second.c_str(), "SAI_FDB_ENTRY_TYPE_DYNAMIC", type_it->second.length()) == 0) - type = "dynamic"; - else - type = "static"; - - /*get port name*/ - getOidToPortNameMap(oid_to_portname_map); - getBridgePortIdToAttrPortIdMap(&brPortId_to_attrPortId_map); - brPortId_it = hash.find("SAI_FDB_ENTRY_ATTR_BRIDGE_PORT_ID"); - if (brPortId_it == hash.end()) - { - continue; - } - bri_port_id = brPortId_it->second; - - brPortId_to_attrPortId_it = brPortId_to_attrPortId_map.find(bri_port_id); - if (brPortId_to_attrPortId_it == brPortId_to_attrPortId_map.end()) - { - continue; - } - - oid_to_portName_it = oid_to_portname_map.find(brPortId_to_attrPortId_it->second); - if (oid_to_portName_it == oid_to_portname_map.end()) - { - continue; - } - - port_name = oid_to_portName_it->second; - - /*insert set*/ - SWSS_LOG_DEBUG("Read one fdb entry(MAC:%s, vid:%d, port_name:%s, type:%s) from ASIC_DB and insert new_set.", mac.c_str(), vid, port_name.c_str(), type.c_str()); - fdb_set->insert(mclag_fdb(mac, vid, port_name, type)); - } - - return; -} - void MclagLink::setPortIsolate(char *msg) { mclag_sub_option_hdr_t *op_hdr = NULL; string isolate_src_port; string isolate_dst_port; - char * cur = NULL; + char *cur = NULL; string acl_name = "mclag"; string acl_rule_name = "mclag:mclag"; vector acl_attrs; vector acl_rule_attrs; - std::string acl_key = std::string("") + APP_ACL_TABLE_TABLE_NAME + ":" + acl_name; - std::string acl_rule_key = std::string("") + APP_ACL_RULE_TABLE_NAME + ":" + acl_rule_name; + string acl_key = string("") + APP_ACL_TABLE_TABLE_NAME + ":" + acl_name; + string acl_rule_key = string("") + APP_ACL_RULE_TABLE_NAME + ":" + acl_rule_name; static int acl_table_is_added = 0; cur = msg; - /*get isolate src port infor*/ + /* get isolate src port infor */ op_hdr = (mclag_sub_option_hdr_t *)cur; cur = cur + MCLAG_SUB_OPTION_HDR_LEN; - isolate_src_port.insert(0, (const char*)cur, op_hdr->op_len); + isolate_src_port.insert(0, (const char *)cur, op_hdr->op_len); cur = cur + op_hdr->op_len; - /*get isolate dst ports infor*/ + /* get isolate dst ports infor */ op_hdr = (mclag_sub_option_hdr_t *)cur; cur = cur + MCLAG_SUB_OPTION_HDR_LEN; - isolate_dst_port.insert(0, (const char*)cur, op_hdr->op_len); + isolate_dst_port.insert(0, (const char *)cur, op_hdr->op_len); if (op_hdr->op_len == 0) { /* If dst port is NULL, delete the acl table 'mclag' */ - p_acl_table_tbl->del(acl_name); + m_aclTable.del(acl_name); acl_table_is_added = 0; - SWSS_LOG_DEBUG("Disable port isolate, src port: %s, dst port is NULL", - isolate_src_port.c_str()); + SWSS_LOG_NOTICE("Set port isolate, src port: %s, dst port is NULL", isolate_src_port.c_str()); return; } - SWSS_LOG_DEBUG("Set port isolate, src port: %s, dst port: %s", - isolate_src_port.c_str(), isolate_dst_port.c_str()); + SWSS_LOG_NOTICE("Set port isolate, src port: %s, dst port: %s", isolate_src_port.c_str(), isolate_dst_port.c_str()); if (acl_table_is_added == 0) { - /*First create ACL table*/ + /* First create ACL table */ FieldValueTuple desc_attr("policy_desc", "Mclag egress port isolate acl"); acl_attrs.push_back(desc_attr); @@ -233,13 +88,12 @@ void MclagLink::setPortIsolate(char *msg) FieldValueTuple port_attr("ports", isolate_src_port); acl_attrs.push_back(port_attr); - p_acl_table_tbl->set(acl_name, acl_attrs); - + m_aclTable.set(acl_name, acl_attrs); acl_table_is_added = 1; - /*End create ACL table*/ + /* End create ACL table */ } - /*Then create ACL rule table*/ + /* Then create ACL rule table */ FieldValueTuple ip_type_attr("IP_TYPE", "ANY"); acl_rule_attrs.push_back(ip_type_attr); @@ -249,22 +103,22 @@ void MclagLink::setPortIsolate(char *msg) FieldValueTuple packet_attr("PACKET_ACTION", "DROP"); acl_rule_attrs.push_back(packet_attr); - p_acl_rule_tbl->set(acl_rule_name, acl_rule_attrs); - /*End create ACL rule table*/ + m_aclRuleTable.set(acl_rule_name, acl_rule_attrs); + /* End create ACL rule table */ return; } -void MclagLink::setPortMacLearnMode(char *msg) +void MclagLink::setPortLearnMode(char *msg) { string learn_port; string learn_mode; mclag_sub_option_hdr_t *op_hdr = NULL; - char * cur = NULL; + char *cur = NULL; cur = msg; - /*get port learning mode info*/ + /* get port learning mode info */ op_hdr = (mclag_sub_option_hdr_t *)cur; if (op_hdr->op_type == MCLAG_SUB_OPTION_TYPE_MAC_LEARN_ENABLE) { @@ -277,53 +131,51 @@ void MclagLink::setPortMacLearnMode(char *msg) cur = cur + MCLAG_SUB_OPTION_HDR_LEN; - learn_port.insert(0, (const char*)cur, op_hdr->op_len); + learn_port.insert(0, (const char *)cur, op_hdr->op_len); vector attrs; FieldValueTuple learn_attr("learn_mode", learn_mode); attrs.push_back(learn_attr); if (strncmp(learn_port.c_str(), PORTCHANNEL_PREFIX, strlen(PORTCHANNEL_PREFIX)) == 0) - p_lag_tbl->set(learn_port, attrs); - /*vxlan tunnel dont supported currently, for src_ip is the mandatory attribute*/ - /*else if(strncmp(learn_port.c_str(),VXLAN_TUNNEL_PREFIX,5)==0) - p_tnl_tbl->set(learn_port, attrs); */ + m_lagTable.set(learn_port, attrs); + /* vxlan tunnel dont supported currently, for src_ip is the mandatory attribute */ + /* else if(strncmp(learn_port.c_str(),VXLAN_TUNNEL_PREFIX,5) == 0) p_tnl_tbl->set(learn_port, attrs); */ else - p_port_tbl->set(learn_port, attrs); + m_portTable.set(learn_port, attrs); - SWSS_LOG_DEBUG("Set port mac learn mode, port: %s, learn-mode: %s", - learn_port.c_str(), learn_mode.c_str()); + SWSS_LOG_NOTICE("Set port mac learn mode, port: %s, learn-mode: %s", learn_port.c_str(), learn_mode.c_str()); return; } -void MclagLink::setFdbFlush() +void MclagLink::flushFdb() { - swss::NotificationProducer flushFdb(p_appl_db, "FLUSHFDBREQUEST"); + swss::NotificationProducer flushFdb(&m_applDb, "FLUSHFDBREQUEST"); vector values; - SWSS_LOG_DEBUG("Send fdb flush notification"); + SWSS_LOG_NOTICE("Send fdb flush notification"); flushFdb.send("ALL", "ALL", values); return; } -void MclagLink::setFdbFlushByPort(char *msg) +void MclagLink::flushFdbByPort(char *msg) { string port; char *cur = NULL; mclag_sub_option_hdr_t *op_hdr = NULL; - swss::NotificationProducer flushFdb(p_appl_db, "FLUSHFDBREQUEST"); + swss::NotificationProducer flushFdb(&m_applDb, "FLUSHFDBREQUEST"); vector values; cur = msg; - /*get port infor*/ + /* get port infor */ op_hdr = (mclag_sub_option_hdr_t *)cur; cur = cur + MCLAG_SUB_OPTION_HDR_LEN; - port.insert(0, (const char*)cur, op_hdr->op_len); + port.insert(0, (const char *)cur, op_hdr->op_len); - SWSS_LOG_DEBUG("Send fdb flush by port %s notification", port.c_str()); + SWSS_LOG_NOTICE("Send fdb flush by port %s notification", port.c_str()); flushFdb.send("ALL", port, values); @@ -339,30 +191,30 @@ void MclagLink::setIntfMac(char *msg) cur = msg; - /*get intf key name*/ + /* get intf key name */ op_hdr = (mclag_sub_option_hdr_t *)cur; cur = cur + MCLAG_SUB_OPTION_HDR_LEN; - intf_key.insert(0, (const char*)cur, op_hdr->op_len); + intf_key.insert(0, (const char *)cur, op_hdr->op_len); cur = cur + op_hdr->op_len; - /*get mac*/ + /* get mac */ op_hdr = (mclag_sub_option_hdr_t *)cur; cur = cur + MCLAG_SUB_OPTION_HDR_LEN; - mac_value.insert(0, (const char*)cur, op_hdr->op_len); + mac_value.insert(0, (const char *)cur, op_hdr->op_len); - SWSS_LOG_DEBUG("Set mac to chip, intf key name: %s, mac: %s", intf_key.c_str(), mac_value.c_str()); + SWSS_LOG_NOTICE("Set mac to chip, intf key name: %s, mac: %s", intf_key.c_str(), mac_value.c_str()); vector attrs; FieldValueTuple mac_attr("mac_addr", mac_value); attrs.push_back(mac_attr); - p_intf_tbl->set(intf_key, attrs); + m_intfTable.set(intf_key, attrs); return; } void MclagLink::setFdbEntry(char *msg, int msg_len) { - struct mclag_fdb_info * fdb_info = NULL; + struct mclag_fdb_info *fdb_info = NULL; struct mclag_fdb fdb; string fdb_key; char key[64] = { 0 }; @@ -370,7 +222,7 @@ void MclagLink::setFdbEntry(char *msg, int msg_len) short count = 0; int index = 0; int exist = 0; - set ::iterator it; + set::iterator it; cur = msg; count = (short)(msg_len / sizeof(struct mclag_fdb_info)); @@ -384,278 +236,181 @@ void MclagLink::setFdbEntry(char *msg, int msg_len) fdb.mac = fdb_info->mac; fdb.port_name = fdb_info->port_name; fdb.vid = fdb_info->vid; + fdb.op_type = MCLAG_FDB_OPER_NONE; if (fdb_info->type == MCLAG_FDB_TYPE_STATIC) fdb.type = "static"; else fdb.type = "dynamic"; - if ((it = find(p_old_fdb->begin(), p_old_fdb->end(), fdb)) == p_old_fdb->end()) + snprintf(key, 64, "%s%d:%s", "Vlan", fdb_info->vid, fdb_info->mac); + fdb_key = key; + + if ((it = find(m_pFdbSet->begin(), m_pFdbSet->end(), fdb)) == m_pFdbSet->end()) exist = 0; else exist = 1; - snprintf(key, 64, "%s%d:%s", "Vlan", fdb_info->vid, fdb_info->mac); - fdb_key = key; - if (fdb_info->op_type == MCLAG_FDB_OPER_ADD) { vector attrs; - /*set port attr*/ + /* set port attr */ FieldValueTuple port_attr("port", fdb.port_name); attrs.push_back(port_attr); - /*set type attr*/ + /* set type attr */ FieldValueTuple type_attr("type", fdb.type); attrs.push_back(type_attr); if (exist == 0) { - p_old_fdb->insert(fdb); - SWSS_LOG_DEBUG("Insert node(portname =%s, mac =%s, vid =%d, type =%s) into old_fdb_set", - fdb.port_name.c_str(), fdb.mac.c_str(), fdb.vid, fdb.type.c_str()); + m_pFdbSet->insert(fdb); + SWSS_LOG_NOTICE("Insert node(portname =%s, mac =%s, vid =%d, type =%s) into fdb_set", + fdb.port_name.c_str(), fdb.mac.c_str(), fdb.vid, fdb.type.c_str()); } else { if (it->port_name == fdb.port_name && it->type == fdb.type) { - SWSS_LOG_DEBUG("All items of mac is same (mac =%s, vid =%d, portname :%s ==> %s, type:%s ==>%s), return.", - fdb.mac.c_str(), fdb.vid, it->port_name.c_str(), fdb.port_name.c_str(), it->type.c_str(), fdb.type.c_str()); - return; + SWSS_LOG_NOTICE("Insert node(portname =%s, mac =%s, vid =%d, type =%s), all the same", + fdb.port_name.c_str(), fdb.mac.c_str(), fdb.vid, fdb.type.c_str()); + continue; } - SWSS_LOG_DEBUG("Modify node(mac =%s, vid =%d, portname :%s ==> %s, type:%s ==>%s)", - fdb.mac.c_str(), fdb.vid, it->port_name.c_str(), fdb.port_name.c_str(), it->type.c_str(), fdb.type.c_str()); - p_old_fdb->erase(it); - p_old_fdb->insert(fdb); - #if 0 - fdb_entry = &(*it); - fdb_entry->port_name = fdb.port_name; - fdb_entry->type = fdb.type; - #endif + + SWSS_LOG_NOTICE("Modify node(mac =%s, vid =%d, portname :%s==>%s, type:%s==>%s)", + fdb.mac.c_str(), fdb.vid, it->port_name.c_str(), fdb.port_name.c_str(), it->type.c_str(), fdb.type.c_str()); + m_pFdbSet->erase(it); + m_pFdbSet->insert(fdb); } - p_fdb_tbl->set(fdb_key, attrs); - SWSS_LOG_DEBUG("Add fdb entry into ASIC_DB:key =%s, type =%s", fdb_key.c_str(), fdb.type.c_str()); + m_fdbTable.set(fdb_key, attrs); + SWSS_LOG_NOTICE("Add fdb entry into ASIC_DB:key =%s, type =%s", fdb_key.c_str(), fdb.type.c_str()); } else if (fdb_info->op_type == MCLAG_FDB_OPER_DEL) { if (exist) { - SWSS_LOG_DEBUG("Erase node(portname =%s, mac =%s, vid =%d, type =%s) from old_fdb_set", - it->port_name.c_str(), it->mac.c_str(), it->vid, it->type.c_str()); - p_old_fdb->erase(it); + m_pFdbSet->erase(it); + m_fdbTable.del(fdb_key); + SWSS_LOG_NOTICE("Del fdb entry from ASIC_DB:key =%s", fdb_key.c_str()); } - p_fdb_tbl->del(fdb_key); - SWSS_LOG_DEBUG("Del fdb entry from ASIC_DB:key =%s", fdb_key.c_str()); } } return; } -ssize_t MclagLink::getFdbChange(char *msg_buf) +void MclagLink::notifyFdbChange() { - set new_fdb; - set del_fdb; - set add_fdb; struct mclag_fdb_info info; - mclag_msg_hdr_t * msg_head = NULL; + mclag_msg_hdr_t *msg_head = NULL; ssize_t write = 0; - size_t infor_len = 0; - char *infor_start = msg_buf; - set *p_new_fdb = &new_fdb; - - del_fdb.clear(); - add_fdb.clear(); - p_new_fdb->clear(); + size_t info_len = 0; + char *info_start = m_msgSndBuf; - infor_len = infor_len + sizeof(mclag_msg_hdr_t); + info_len += sizeof(mclag_msg_hdr_t); - getFdbSet(p_new_fdb); - - set_difference(p_old_fdb->begin(), p_old_fdb->end(), p_new_fdb->begin(), - p_new_fdb->end(), inserter(del_fdb, del_fdb.begin())); - set_difference(p_new_fdb->begin(), p_new_fdb->end(), p_old_fdb->begin(), - p_old_fdb->end(), inserter(add_fdb, add_fdb.begin())); - - p_old_fdb->swap(*p_new_fdb); - - /*Remove the same item from del set, this may be MAC move*/ - auto itdel = del_fdb.begin(); - while (itdel != del_fdb.end()) + for (auto it = m_pFdbEvent->begin(); it != m_pFdbEvent->end(); it++) { - auto ittmp = itdel; - itdel++; - for (auto itadd = add_fdb.begin(); itadd != add_fdb.end(); itadd++) + if (MCLAG_MAX_SEND_MSG_LEN - info_len < sizeof(struct mclag_fdb_info)) { - if (ittmp->mac == itadd->mac && ittmp->vid == itadd->vid) - { - SWSS_LOG_DEBUG("Mac move: mac %s, vid %d, portname %s, type %s", - ittmp->mac.c_str(), ittmp->vid, ittmp->port_name.c_str(), ittmp->type.c_str()); - del_fdb.erase(ittmp); - break; - } - } - } - - for (auto it = del_fdb.begin(); it != del_fdb.end(); it++) - { - if (MCLAG_MAX_SEND_MSG_LEN - infor_len < sizeof(struct mclag_fdb_info)) - { - msg_head = (mclag_msg_hdr_t *)infor_start; + msg_head = (mclag_msg_hdr_t *)info_start; msg_head->version = 1; - msg_head->msg_len = (unsigned short)infor_len; + msg_head->msg_len = (unsigned short)info_len; msg_head->msg_type = MCLAG_SYNCD_MSG_TYPE_FDB_OPERATION; - SWSS_LOG_DEBUG("Mclagsycnd send msg to iccpd, msg_len =%d, msg_type =%d", - msg_head->msg_len, msg_head->msg_type); - write = ::write(m_connection_socket, infor_start, msg_head->msg_len); - if (write <= 0) - return write; + SWSS_LOG_NOTICE("Mclagsycnd send msg to iccpd, msg_len =%d, msg_type =%d", msg_head->msg_len, msg_head->msg_type); - infor_len = sizeof(mclag_msg_hdr_t); - } - SWSS_LOG_DEBUG("Notify iccpd to del fdb_entry:mac:%s, vid:%d, portname:%s, type:%s", - it->mac.c_str(), it->vid, it->port_name.c_str(), it->type.c_str()); - memset(&info, 0, sizeof(struct mclag_fdb_info)); - info.op_type = MCLAG_FDB_OPER_DEL; - memcpy(info.mac, it->mac.c_str(), it->mac.length()); - info.vid = it->vid; - memcpy(info.port_name, it->port_name.c_str(), it->port_name.length()); - if (memcmp(it->type.c_str(), "SAI_FDB_ENTRY_TYPE_DYNAMIC", it->type.length()) == 0) - info.type = MCLAG_FDB_TYPE_DYNAMIC; - else - info.type = MCLAG_FDB_TYPE_STATIC; - - memcpy((char*)(infor_start + infor_len), (char*)&info, sizeof(struct mclag_fdb_info)); - infor_len = infor_len + sizeof(struct mclag_fdb_info); - } + write =::write(m_connectionSocket, info_start, msg_head->msg_len); + if (write < 0) + { + m_connectionState = false; + goto OUT; + } - for (auto it = add_fdb.begin(); it != add_fdb.end(); it++) - { - if (MCLAG_MAX_SEND_MSG_LEN - infor_len < sizeof(struct mclag_fdb_info)) - { - msg_head = (mclag_msg_hdr_t *)infor_start; - msg_head->version = 1; - msg_head->msg_len = (unsigned short)infor_len; - msg_head->msg_type = MCLAG_SYNCD_MSG_TYPE_FDB_OPERATION; + info_len = sizeof(mclag_msg_hdr_t); + } - /*SWSS_LOG_DEBUG("Mclagsycnd send msg to iccpd, msg_len =%d, msg_type =%d", - msg_head->msg_len, msg_head->msg_type);*/ - write = ::write(m_connection_socket, infor_start, msg_head->msg_len); - if (write <= 0) - return write; + SWSS_LOG_NOTICE("Notify fdb msg(mac:%s, vid:%d, portname:%s, type:%s, op_type:%s) to iccpd.", + it->mac.c_str(), it->vid, it->port_name.c_str(), it->type.c_str(), (it->op_type == MCLAG_FDB_OPER_ADD) ? "add" : "del"); - infor_len = sizeof(mclag_msg_hdr_t); - } - SWSS_LOG_DEBUG("Notify iccpd to add fdb_entry:mac:%s, vid:%d, portname:%s, type:%s", - it->mac.c_str(), it->vid, it->port_name.c_str(), it->type.c_str()); memset(&info, 0, sizeof(struct mclag_fdb_info)); - info.op_type = MCLAG_FDB_OPER_ADD; + info.op_type = it->op_type; memcpy(info.mac, it->mac.c_str(), it->mac.length()); info.vid = it->vid; memcpy(info.port_name, it->port_name.c_str(), it->port_name.length()); + if (memcmp(it->type.c_str(), "dynamic", it->type.length()) == 0) info.type = MCLAG_FDB_TYPE_DYNAMIC; else info.type = MCLAG_FDB_TYPE_STATIC; - memcpy((char*)(infor_start + infor_len), (char*)&info, sizeof(struct mclag_fdb_info)); - infor_len = infor_len + sizeof(struct mclag_fdb_info); + memcpy((char *)(info_start + info_len), (char *)&info, sizeof(struct mclag_fdb_info)); + info_len = info_len + sizeof(struct mclag_fdb_info); } - if (infor_len <= sizeof(mclag_msg_hdr_t)) /*no fdb entry need notifying iccpd*/ - return 1; + if (info_len <= sizeof(mclag_msg_hdr_t)) /* no fdb entry need notifying iccpd */ + goto OUT; - msg_head = (mclag_msg_hdr_t *)infor_start; + msg_head = (mclag_msg_hdr_t *)info_start; msg_head->version = 1; - msg_head->msg_len = (unsigned short)infor_len; + msg_head->msg_len = (unsigned short)info_len; msg_head->msg_type = MCLAG_SYNCD_MSG_TYPE_FDB_OPERATION; - /*SWSS_LOG_DEBUG("Mclagsycnd send msg to iccpd, msg_len =%d, msg_type =%d", - msg_head->msg_len, msg_head->msg_type);*/ - write = ::write(m_connection_socket, infor_start, msg_head->msg_len); + SWSS_LOG_NOTICE("Mclagsycnd send msg to iccpd, msg_len =%d, msg_type =%d", msg_head->msg_len, msg_head->msg_type); - return write; -} + write =::write(m_connectionSocket, info_start, msg_head->msg_len); -MclagLink::MclagLink(int port) : - MSG_BATCH_SIZE(256), - m_bufSize(MCLAG_MAX_MSG_LEN * MSG_BATCH_SIZE), - m_messageBuffer(NULL), - m_pos(0), - m_connected(false), - m_server_up(false) -{ - struct sockaddr_in addr; - int true_val = 1; - - m_server_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); - if (m_server_socket < 0) - throw system_error(errno, system_category()); - - if (setsockopt(m_server_socket, SOL_SOCKET, SO_REUSEADDR, &true_val, - sizeof(true_val)) < 0) - { - close(m_server_socket); - throw system_error(errno, system_category()); - } - - if (setsockopt(m_server_socket, SOL_SOCKET, SO_KEEPALIVE, &true_val, - sizeof(true_val)) < 0) - { - close(m_server_socket); - throw system_error(errno, system_category()); - } - - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = htonl(MCLAG_DEFAULT_IP); - - if (bind(m_server_socket, (struct sockaddr *)&addr, sizeof(addr)) < 0) + if (write < 0) { - close(m_server_socket); - throw system_error(errno, system_category()); + m_connectionState = false; } - if (listen(m_server_socket, 2) != 0) - { - close(m_server_socket); - throw system_error(errno, system_category()); - } +OUT: + m_pFdbEvent->clear(); - m_server_up = true; - m_messageBuffer = new char[m_bufSize]; - m_messageBuffer_send = new char[MCLAG_MAX_SEND_MSG_LEN]; + return; } -MclagLink::~MclagLink() +vectorMclagLink::getFdbGatherSelectables() { - delete m_messageBuffer; - delete m_messageBuffer_send; - if (m_connected) - close(m_connection_socket); - if (m_server_up) - close(m_server_socket); + return m_fdbGather.getSelectables(); } -void MclagLink::accept() +MclagLink::MclagLink(int fd) : + m_connectionState(true), + m_pos(0), + m_bufSize(MCLAG_MAX_MSG_LEN * MSG_BATCH_SIZE), + m_applDb(APPL_DB, DBConnector::DEFAULT_UNIXSOCKET, 0), + m_stateDb(STATE_DB, DBConnector::DEFAULT_UNIXSOCKET, 0), + m_portTable(&m_applDb, APP_PORT_TABLE_NAME), + m_lagTable(&m_applDb, APP_LAG_TABLE_NAME), + m_tnlTable(&m_applDb, APP_VXLAN_TUNNEL_TABLE_NAME), + m_intfTable(&m_applDb, APP_INTF_TABLE_NAME), + m_fdbTable(&m_applDb, APP_FDB_TABLE_NAME), + m_aclTable(&m_applDb, APP_ACL_TABLE_TABLE_NAME), + m_aclRuleTable(&m_applDb, APP_ACL_RULE_TABLE_NAME), + m_stateFdbTable(&m_stateDb, STATE_FDB_TABLE_NAME), + m_fdbGatherTables(1, m_stateFdbTable), + m_fdbGather(&m_stateDb, m_fdbGatherTables) { - struct sockaddr_in client_addr; - socklen_t client_len; + m_connectionSocket = fd; + m_pFdbEvent = &m_fdbGather.m_fdbEvent; + m_pFdbSet = &m_fdbGather.m_fdbSet; + m_fdbGather.getFdbFromStatedb(); +} - m_connection_socket = ::accept(m_server_socket, (struct sockaddr *)&client_addr, - &client_len); - if (m_connection_socket < 0) - throw system_error(errno, system_category()); +MclagLink::~MclagLink() +{ + m_pFdbSet->clear(); - SWSS_LOG_NOTICE("New connection accepted from: %s", inet_ntoa(client_addr.sin_addr)); + if (m_connectionState) + close(m_connectionSocket); } int MclagLink::getFd() { - return m_connection_socket; + return m_connectionSocket; } uint64_t MclagLink::readData() @@ -664,19 +419,24 @@ uint64_t MclagLink::readData() size_t msg_len = 0; size_t start = 0, left = 0; ssize_t read = 0; - ssize_t write = 0; - char * msg = NULL; + char *msg = NULL; + + read =::read(m_connectionSocket, m_msgBuf + m_pos, m_bufSize - m_pos); - read = ::read(m_connection_socket, m_messageBuffer + m_pos, m_bufSize - m_pos); if (read == 0) - throw MclagConnectionClosedException(); + { + SWSS_LOG_NOTICE("Read data error! connection lost?\n"); + return 0; + } + if (read < 0) throw system_error(errno, system_category()); + m_pos += (uint32_t)read; while (true) { - hdr = (mclag_msg_hdr_t *)(m_messageBuffer + start); + hdr = (mclag_msg_hdr_t *) (m_msgBuf + start); left = m_pos - start; if (left < MCLAG_MSG_HDR_LEN) break; @@ -688,42 +448,28 @@ uint64_t MclagLink::readData() if (!mclag_msg_ok(hdr, left)) throw system_error(make_error_code(errc::bad_message), "Malformed MCLAG message received"); - msg = ((char*)hdr) + MCLAG_MSG_HDR_LEN; + msg = ((char *)hdr) + MCLAG_MSG_HDR_LEN; switch (hdr->msg_type) { case MCLAG_MSG_TYPE_PORT_ISOLATE: setPortIsolate(msg); break; - case MCLAG_MSG_TYPE_PORT_MAC_LEARN_MODE: - setPortMacLearnMode(msg); + setPortLearnMode(msg); break; - case MCLAG_MSG_TYPE_FLUSH_FDB: - setFdbFlush(); + flushFdb(); break; - case MCLAG_MSG_TYPE_FLUSH_FDB_BY_PORT: - setFdbFlushByPort(msg); + flushFdbByPort(msg); break; - case MCLAG_MSG_TYPE_SET_INTF_MAC: setIntfMac(msg); break; - case MCLAG_MSG_TYPE_SET_FDB: setFdbEntry(msg, (int)(hdr->msg_len - sizeof(mclag_msg_hdr_t))); break; - - case MCLAG_MSG_TYPE_GET_FDB_CHANGES: - write = getFdbChange(m_messageBuffer_send); - if (write == 0) - throw MclagConnectionClosedException(); - if (write < 0) - throw system_error(errno, system_category()); - break; - default: break; } @@ -731,9 +477,286 @@ uint64_t MclagLink::readData() start += msg_len; } - memmove(m_messageBuffer, m_messageBuffer + start, m_pos - start); + memmove(m_msgBuf, m_msgBuf + start, m_pos - start); m_pos = m_pos - (uint32_t)start; return 0; } +MclagServerLink::MclagServerLink(int port) : + m_serverUp(false) +{ + struct sockaddr_in addr; + int true_val = 1; + + m_serverSocket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + if (m_serverSocket < 0) + throw system_error(errno, system_category()); + + if (setsockopt(m_serverSocket, SOL_SOCKET, SO_REUSEADDR, &true_val, sizeof(true_val)) < 0) + { + close(m_serverSocket); + throw system_error(errno, system_category()); + } + + if (setsockopt(m_serverSocket, SOL_SOCKET, SO_KEEPALIVE, &true_val, sizeof(true_val)) < 0) + { + close(m_serverSocket); + throw system_error(errno, system_category()); + } + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = htonl(MCLAG_DEFAULT_IP); + + if (bind(m_serverSocket, (struct sockaddr *)&addr, sizeof(addr)) < 0) + { + close(m_serverSocket); + throw system_error(errno, system_category()); + } + + if (listen(m_serverSocket, 2) != 0) + { + close(m_serverSocket); + throw system_error(errno, system_category()); + } + + m_serverUp = true; +} + +MclagServerLink::~MclagServerLink() +{ + vector::iterator it; + + for (it = m_linkList.begin(); it != m_linkList.end();) + { + MclagLink *link = (*it); + delete link; + it++; + } + + if (m_serverUp) + close(m_serverSocket); +} + +int MclagServerLink::getFd() +{ + return m_serverSocket; +} + +void MclagServerLink::accept() +{ + struct sockaddr_in client_addr; + socklen_t client_len; + int connect_socket; + MclagLink *link; + + connect_socket =::accept(m_serverSocket, (struct sockaddr *)&client_addr, &client_len); + if (connect_socket < 0) + throw system_error(errno, system_category()); + + SWSS_LOG_NOTICE("New connection(fd:%d) accepted from: %s", connect_socket, inet_ntoa(client_addr.sin_addr)); + + link = new MclagLink(connect_socket); + m_linkList.push_back(link); + + m_pSelect->addSelectable(link); + m_pSelect->addSelectables(link->getFdbGatherSelectables()); + + return; +} + +uint64_t MclagServerLink::readData() +{ + accept(); + + return 0; +} + +MclagFdbGather::MclagFdbGather(DBConnector *statDb, const vector &tables) : + Orch(tables), + m_redisClient(statDb) +{ +} + +void MclagFdbGather::doTask(Consumer &consumer) +{ + SWSS_LOG_ENTER(); + + auto table = consumer.getTableName(); + + SWSS_LOG_NOTICE("Get task from table %s", table.c_str()); + + if (table == STATE_FDB_TABLE_NAME) + { + storeFdbChange(consumer); + } +} + +void MclagFdbGather::storeFdbChange(Consumer &consumer) +{ + SWSS_LOG_ENTER(); + + auto it = consumer.m_toSync.begin(); + while (it != consumer.m_toSync.end()) + { + string mac; + string vlan_name; + unsigned int vlan_id; + string port_name; + string type; + short op_type = MCLAG_FDB_OPER_NONE; + struct mclag_fdb fdb; + int exist = 0; + set::iterator fdb_it; + + KeyOpFieldsValuesTuple t = it->second; + + /* format: : */ + vector keys = tokenize(kfvKey(t), ':', 1); + string op = kfvOp(t); + + vlan_name = keys[0]; + mac = keys[1]; + + sscanf(vlan_name.c_str(), "Vlan%d", &vlan_id); + + fdb.mac = mac; + fdb.vid = vlan_id; + fdb.op_type = MCLAG_FDB_OPER_NONE; + + if ((fdb_it = find(m_fdbSet.begin(), m_fdbSet.end(), fdb)) == m_fdbSet.end()) + exist = 0; + else + exist = 1; + + if (op == SET_COMMAND) + { + op_type = MCLAG_FDB_OPER_ADD; + + for (auto i : kfvFieldsValues(t)) + { + if (fvField(i) == "port") + { + port_name = fvValue(i); + } + + if (fvField(i) == "type") + { + /* dynamic or static */ + type = fvValue(i); + } + } + + SWSS_LOG_NOTICE("Rcv msg from STATE_FDB_DB to add fdb(mac:%s, vid:%d, port_name:%s, type:%s).", + mac.c_str(), vlan_id, port_name.c_str(), type.c_str()); + + fdb.port_name = port_name; + fdb.type = type; + + if (exist == 1) + { + if (fdb_it->port_name == port_name && fdb_it->type == type) + { + it = consumer.m_toSync.erase(it); + continue; + } + else + { + m_fdbSet.erase(fdb_it); + m_fdbSet.insert(fdb); + } + } + else + { + m_fdbSet.insert(fdb); + } + } + else if (op == DEL_COMMAND) + { + SWSS_LOG_NOTICE("Rcv msg from STATE_FDB_DB to del fdb(mac:%s, vid:%d)", mac.c_str(), vlan_id); + if (exist) + { + op_type = MCLAG_FDB_OPER_DEL; + port_name = ""; + type = ""; + + m_fdbSet.erase(fdb_it); + } + else + { + it = consumer.m_toSync.erase(it); + continue; + } + } + else + { + SWSS_LOG_ERROR("Unknown operation type %s", op.c_str()); + } + + m_fdbEvent.push_back(mclag_fdb(mac, vlan_id, port_name, type, op_type)); + it = consumer.m_toSync.erase(it); + } + + return; +} + +void MclagFdbGather::getFdbFromStatedb() +{ + string port_name; + string mac; + string type; + string vlanid; + int vid; + size_t pos1 = 0; + size_t pos2 = 0; + std::unordered_map::iterator type_it; + + auto keys = m_redisClient.keys("FDB_TABLE|*"); + + for (auto& key : keys) + { + /* get vid */ + pos1 = key.find("Vlan", 0); + if (pos1 != key.npos) + { + pos1 = pos1 + 4; + pos2 = key.find(":", pos1); + vlanid = key.substr(pos1, pos2 - pos1); + } + else + continue; + + vid = atoi(vlanid.c_str()); + + /* get mac */ + pos1 = key.find_first_of(":") + 1; + pos2 = key.find_last_of(":") + 3; + mac = key.substr(pos1, pos2 - pos1); + + /* get port && type */ + auto hash = m_redisClient.hgetall(key); + type_it = hash.find("port"); + if (type_it == hash.end()) + { + continue; + } + port_name = type_it->second; + + type_it = hash.find("type"); + if (type_it == hash.end()) + { + continue; + } + type = type_it->second; + + SWSS_LOG_NOTICE("Read fdb entry(mac:%s, vid:%d, port_name:%s, type:%s) from STATE_FDB_TABLE and record it locally.", + mac.c_str(), vid, port_name.c_str(), type.c_str()); + + m_fdbEvent.push_back(mclag_fdb(mac, vid, port_name, type, MCLAG_FDB_OPER_ADD)); + m_fdbSet.insert(mclag_fdb(mac, vid, port_name, type, MCLAG_FDB_OPER_NONE)); + } + + return; +} diff --git a/mclagsyncd/mclaglink.h b/mclagsyncd/mclaglink.h index 48aee59f4c..104e4f2c8e 100644 --- a/mclagsyncd/mclaglink.h +++ b/mclagsyncd/mclaglink.h @@ -34,23 +34,22 @@ #include #include +#include "select.h" +#include "orch.h" +#include "dbconnector.h" #include "producerstatetable.h" -#include "selectable.h" #include "redisclient.h" #include "mclagsyncd/mclag.h" namespace swss { -#define ETHER_ADDR_STR_LEN 18 -#define MAX_L_PORT_NAME 20 - struct mclag_fdb_info { - char mac[ETHER_ADDR_STR_LEN]; + char mac[MCLAG_ETHER_ADDR_STR_LEN]; unsigned int vid; - char port_name[MAX_L_PORT_NAME]; - short type; /*dynamic or static*/ - short op_type; /*add or del*/ + char port_name[MCLAG_MAX_L_PORT_NAME]; + short type; /* dynamic or static */ + short op_type; /* add or del */ }; struct mclag_fdb @@ -58,15 +57,12 @@ struct mclag_fdb std::string mac; unsigned int vid; std::string port_name; - std::string type;/*dynamic or static*/ + std::string type; /* dynamic or static */ + short op_type; /* add or del */ mclag_fdb(std::string val_mac, unsigned int val_vid, std::string val_pname, - std::string val_type) : mac(val_mac), vid(val_vid), port_name(val_pname), type(val_type) - { - } - mclag_fdb() - { - } + std::string val_type, short val_op_type):mac(val_mac),vid(val_vid),port_name(val_pname),type(val_type),op_type(val_op_type) {} + mclag_fdb() {} bool operator <(const mclag_fdb &fdb) const { @@ -76,77 +72,89 @@ struct mclag_fdb return vid < fdb.vid; else return port_name < fdb.port_name; - //else if (port_name != fdb.port_name) return port_name < fdb.port_name; - //else return type &tables); + using Orch::doTask; + std::vector m_fdbEvent; + std::set m_fdbSet; + void getFdbFromStatedb(); +private: + RedisClient m_redisClient; + void doTask(Consumer &consumer); + void doFdbUpdateTask(Consumer &consumer); + void storeFdbChange(Consumer &consumer); }; -class MclagLink : public Selectable { +class MclagLink:public Selectable +{ public: - const int MSG_BATCH_SIZE; - ProducerStateTable * p_port_tbl; - ProducerStateTable * p_lag_tbl; - ProducerStateTable * p_tnl_tbl; - ProducerStateTable * p_intf_tbl; - ProducerStateTable *p_fdb_tbl; - ProducerStateTable *p_acl_table_tbl; - ProducerStateTable *p_acl_rule_tbl; - DBConnector *p_appl_db; - RedisClient *p_redisClient_to_asic;/*redis client access to ASIC_DB*/ - RedisClient *p_redisClient_to_counters;/*redis client access to COUNTERS_DB*/ - std::set *p_old_fdb; - - MclagLink(int port = MCLAG_DEFAULT_PORT); + bool m_connectionState; + MclagLink(int fd); virtual ~MclagLink(); - /* Wait for connection (blocking) */ - void accept(); - int getFd() override; - uint64_t readData() override; - - /* readMe throws MclagConnectionClosedException when connection is lost */ - class MclagConnectionClosedException : public std::exception - { - }; + void readData() override; + void notifyFdbChange(); + std::vector getFdbGatherSelectables(); private: - unsigned int m_bufSize; - char *m_messageBuffer; - char *m_messageBuffer_send; unsigned int m_pos; + unsigned int m_bufSize; + char m_msgBuf[MCLAG_MAX_MSG_LEN * MSG_BATCH_SIZE]; + char m_msgSndBuf[MCLAG_MAX_SEND_MSG_LEN]; + int m_connectionSocket; + std::vector *m_pFdbEvent; + std::set *m_pFdbSet; + DBConnector m_applDb; + DBConnector m_stateDb; + ProducerStateTable m_portTable; + ProducerStateTable m_lagTable; + ProducerStateTable m_tnlTable; + ProducerStateTable m_intfTable; + ProducerStateTable m_fdbTable; + ProducerStateTable m_aclTable; + ProducerStateTable m_aclRuleTable; + TableConnector m_stateFdbTable; + std::vector m_fdbGatherTables; + MclagFdbGather m_fdbGather; - bool m_connected; - bool m_server_up; - int m_server_socket; - int m_connection_socket; - - void getOidToPortNameMap(std::unordered_map & port_map); - void getBridgePortIdToAttrPortIdMap(std::map *oid_map); - void getVidByBvid(std::string &bvid, std::string &vlanid); - void getFdbSet(std::set *fdb_set); void setPortIsolate(char *msg); - void setPortMacLearnMode(char *msg); - void setFdbFlush(); - void setFdbFlushByPort(char *msg); + void setPortLearnMode(char *msg); + void flushFdb(); + void flushFdbByPort(char *msg); void setIntfMac(char *msg); void setFdbEntry(char *msg, int msg_len); - ssize_t getFdbChange(char *msg_buf); - void connectionLostHandlePortIsolate(); - void connectionLostHandlePortLearnMode(); - void connectionLost(); +}; + +class MclagServerLink:public Selectable +{ +public: + Select *m_pSelect; + std::vector m_linkList; + MclagServerLink(int port = MCLAG_DEFAULT_PORT); + virtual ~MclagServerLink(); + void accept(); + int getFd() override; + void readData() override; + +private: + bool m_serverUp; + int m_serverSocket; }; } #endif - diff --git a/mclagsyncd/mclagsyncd.cpp b/mclagsyncd/mclagsyncd.cpp index 2bacf35ac2..0eeddfbc7d 100644 --- a/mclagsyncd/mclagsyncd.cpp +++ b/mclagsyncd/mclagsyncd.cpp @@ -18,74 +18,95 @@ * Maintainer: Jim Jiang from nephos */ #include -#include "logger.h" +#include #include -#include "select.h" +#include "logger.h" #include "netdispatcher.h" -#include "mclagsyncd/mclaglink.h" -#include +#include "select.h" +#include "mclaglink.h" using namespace std; using namespace swss; +#define RAPID_TIMEOUT 50 +#define SLOW_TIMEOUT 2147483647 + +int gBatchSize = 0; +bool gSwssRecord = false; +bool gLogRotate = false; +ofstream gRecordOfs; +string gRecordFile; + int main(int argc, char **argv) { - swss::Logger::linkToDbNative("mclagsyncd"); - DBConnector appl_db(APPL_DB, DBConnector::DEFAULT_UNIXSOCKET, 0); - DBConnector asic_db(ASIC_DB, DBConnector::DEFAULT_UNIXSOCKET, 0); - DBConnector counters_db(COUNTERS_DB, DBConnector::DEFAULT_UNIXSOCKET, 0); - ProducerStateTable port_tbl(&appl_db, APP_PORT_TABLE_NAME); - ProducerStateTable lag_tbl(&appl_db, APP_LAG_TABLE_NAME); - ProducerStateTable tnl_tbl(&appl_db, APP_VXLAN_TUNNEL_TABLE_NAME); - ProducerStateTable intf_tbl(&appl_db, APP_INTF_TABLE_NAME); - ProducerStateTable fdb_tbl(&appl_db, APP_FDB_TABLE_NAME); - ProducerStateTable acl_table_tbl(&appl_db, APP_ACL_TABLE_TABLE_NAME); - ProducerStateTable acl_rule_tbl(&appl_db, APP_ACL_RULE_TABLE_NAME); - RedisClient redisClient_to_asicDb(&asic_db); - RedisClient redisClient_to_countersDb(&counters_db); - map isolate; - RedisPipeline pipeline(&appl_db); - set old_fdb; + Logger::linkToDbNative("mclagsyncd"); + SWSS_LOG_ENTER(); + + SWSS_LOG_NOTICE("--- Starting mclagsyncd ---"); while (1) { try { - MclagLink mclag; Select s; + int timeout = RAPID_TIMEOUT; - mclag.p_port_tbl = &port_tbl; - mclag.p_lag_tbl = &lag_tbl; - mclag.p_tnl_tbl = &tnl_tbl; - mclag.p_intf_tbl = &intf_tbl; - mclag.p_fdb_tbl = &fdb_tbl; - mclag.p_acl_table_tbl = &acl_table_tbl; - mclag.p_acl_rule_tbl = &acl_rule_tbl; - mclag.p_appl_db = &appl_db; - mclag.p_redisClient_to_asic = &redisClient_to_asicDb; - mclag.p_redisClient_to_counters = &redisClient_to_countersDb; - mclag.p_old_fdb = &old_fdb; + MclagServerLink serverlink; + serverlink.m_pSelect = &s; - cout << "Waiting for connection..." << endl; - mclag.accept(); - cout << "Connected!" << endl; - - s.addSelectable(&mclag); + s.addSelectable(&serverlink); while (true) { Selectable *temps; - /* Reading MCLAG messages forever (and calling "readData" to read them) */ - s.select(&temps); - pipeline.flush(); - SWSS_LOG_DEBUG("Pipeline flushed"); + int ret; + ret = s.select(&temps, timeout); + + if (ret == Select::ERROR) + { + SWSS_LOG_NOTICE("Error: %s!", strerror(errno)); + continue; + } + + if (ret == Select::TIMEOUT) + { + vector::iterator link_it; + for (link_it = serverlink.m_linkList.begin(); link_it != serverlink.m_linkList.end();) + { + MclagLink *link = (*link_it); + link->notifyFdbChange(); + + if (link->m_connectionState == false) + { + vector selectables; + link_it = serverlink.m_linkList.erase(link_it); + s.removeSelectable(link); + selectables = link->getFdbGatherSelectables(); + for(auto it : selectables) + { + s.removeSelectable(it); + } + + delete link; + } + else + link_it++; + } + + timeout = SLOW_TIMEOUT; + + continue; + } + + if (typeid(*temps) == typeid(MclagServerLink) || typeid(*temps) == typeid(MclagLink)) + continue; + + auto *c = (Executor *)temps; + c->execute(); + timeout = RAPID_TIMEOUT; } } - catch (MclagLink::MclagConnectionClosedException &e) - { - cout << "Connection lost, reconnecting..." << endl; - } catch (const exception& e) { cout << "Exception \"" << e.what() << "\" had been thrown in deamon" << endl; From 0c89f09990eaa16b4d5b21d49428e23e04a0ecb3 Mon Sep 17 00:00:00 2001 From: "jianjun.dong" Date: Sat, 18 Apr 2020 04:08:39 -0700 Subject: [PATCH 2/9] For compile error --- mclagsyncd/mclaglink.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mclagsyncd/mclaglink.h b/mclagsyncd/mclaglink.h index 104e4f2c8e..5f4c40eb45 100644 --- a/mclagsyncd/mclaglink.h +++ b/mclagsyncd/mclaglink.h @@ -35,7 +35,7 @@ #include #include "select.h" -#include "orch.h" +#include "../orchagent/orch.h" #include "dbconnector.h" #include "producerstatetable.h" #include "redisclient.h" From 8019712ccfef1edd15cfdee8a89d674eb5ee8410 Mon Sep 17 00:00:00 2001 From: "jianjun.dong" Date: Sat, 18 Apr 2020 05:14:39 -0700 Subject: [PATCH 3/9] For compile error --- mclagsyncd/mclaglink.cpp | 2 +- mclagsyncd/mclaglink.h | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/mclagsyncd/mclaglink.cpp b/mclagsyncd/mclaglink.cpp index ecd03aeb7f..7e79a40759 100644 --- a/mclagsyncd/mclaglink.cpp +++ b/mclagsyncd/mclaglink.cpp @@ -571,7 +571,7 @@ uint64_t MclagServerLink::readData() { accept(); - return 0; + return 0; } MclagFdbGather::MclagFdbGather(DBConnector *statDb, const vector &tables) : diff --git a/mclagsyncd/mclaglink.h b/mclagsyncd/mclaglink.h index 5f4c40eb45..f24c03fd01 100644 --- a/mclagsyncd/mclaglink.h +++ b/mclagsyncd/mclaglink.h @@ -107,7 +107,7 @@ class MclagLink:public Selectable virtual ~MclagLink(); int getFd() override; - void readData() override; + uint64_t readData() override; void notifyFdbChange(); std::vector getFdbGatherSelectables(); @@ -149,7 +149,7 @@ class MclagServerLink:public Selectable virtual ~MclagServerLink(); void accept(); int getFd() override; - void readData() override; + uint64_t readData() override; private: bool m_serverUp; From 27ec43751bf26ad6d909d02456fe4741236b009d Mon Sep 17 00:00:00 2001 From: "jianjun.dong" Date: Sun, 19 Apr 2020 23:59:11 -0700 Subject: [PATCH 4/9] For compile error --- mclagsyncd/mclaglink.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/mclagsyncd/mclaglink.h b/mclagsyncd/mclaglink.h index f24c03fd01..d4c3888a31 100644 --- a/mclagsyncd/mclaglink.h +++ b/mclagsyncd/mclaglink.h @@ -41,7 +41,7 @@ #include "redisclient.h" #include "mclagsyncd/mclag.h" -namespace swss { +//namespace swss { struct mclag_fdb_info { @@ -99,7 +99,7 @@ class MclagFdbGather:public Orch void storeFdbChange(Consumer &consumer); }; -class MclagLink:public Selectable +class MclagLink:public swss::Selectable { public: bool m_connectionState; @@ -109,7 +109,7 @@ class MclagLink:public Selectable int getFd() override; uint64_t readData() override; void notifyFdbChange(); - std::vector getFdbGatherSelectables(); + std::vector getFdbGatherSelectables(); private: unsigned int m_pos; @@ -140,7 +140,7 @@ class MclagLink:public Selectable void setFdbEntry(char *msg, int msg_len); }; -class MclagServerLink:public Selectable +class MclagServerLink:public swss::Selectable { public: Select *m_pSelect; @@ -156,5 +156,5 @@ class MclagServerLink:public Selectable int m_serverSocket; }; -} +//} #endif From 55c777f6f97b3d25a027aab0022bb687cb20f5f7 Mon Sep 17 00:00:00 2001 From: "jianjun.dong" Date: Mon, 20 Apr 2020 00:22:08 -0700 Subject: [PATCH 5/9] For compile error --- mclagsyncd/mclaglink.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mclagsyncd/mclaglink.h b/mclagsyncd/mclaglink.h index d4c3888a31..e869920768 100644 --- a/mclagsyncd/mclaglink.h +++ b/mclagsyncd/mclaglink.h @@ -35,7 +35,7 @@ #include #include "select.h" -#include "../orchagent/orch.h" +#include "orch.h" #include "dbconnector.h" #include "producerstatetable.h" #include "redisclient.h" From fc37e06dfa0f94ba2890953acdc6a5d26f1437e8 Mon Sep 17 00:00:00 2001 From: "jianjun.dong" Date: Mon, 20 Apr 2020 01:52:25 -0700 Subject: [PATCH 6/9] For compile error --- mclagsyncd/mclaglink.h | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/mclagsyncd/mclaglink.h b/mclagsyncd/mclaglink.h index e869920768..d5d71619c7 100644 --- a/mclagsyncd/mclaglink.h +++ b/mclagsyncd/mclaglink.h @@ -35,13 +35,14 @@ #include #include "select.h" -#include "orch.h" +//#include "../orchagent/orch.h" #include "dbconnector.h" #include "producerstatetable.h" +#include "orch.h" #include "redisclient.h" #include "mclagsyncd/mclag.h" -//namespace swss { +namespace swss { struct mclag_fdb_info { @@ -156,5 +157,5 @@ class MclagServerLink:public swss::Selectable int m_serverSocket; }; -//} +} #endif From b2ceca51cff05c6b4e0644aaa3fc4a6df639215c Mon Sep 17 00:00:00 2001 From: "jianjun.dong" Date: Mon, 20 Apr 2020 02:13:13 -0700 Subject: [PATCH 7/9] For compile error --- mclagsyncd/Makefile.am | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mclagsyncd/Makefile.am b/mclagsyncd/Makefile.am index 4a72f86638..8607d95339 100644 --- a/mclagsyncd/Makefile.am +++ b/mclagsyncd/Makefile.am @@ -1,4 +1,4 @@ -INCLUDES = -I $(top_srcdir) +INCLUDES = -I $(top_srcdir) -I $(top_srcdir)/orchagent bin_PROGRAMS = mclagsyncd From 4bdc202d0442fde7cb46cf5e6f6a295ec8501770 Mon Sep 17 00:00:00 2001 From: "jianjun.dong" Date: Mon, 20 Apr 2020 02:34:09 -0700 Subject: [PATCH 8/9] For compile error --- mclagsyncd/Makefile.am | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/mclagsyncd/Makefile.am b/mclagsyncd/Makefile.am index 8607d95339..3867ab23b9 100644 --- a/mclagsyncd/Makefile.am +++ b/mclagsyncd/Makefile.am @@ -1,5 +1,7 @@ INCLUDES = -I $(top_srcdir) -I $(top_srcdir)/orchagent +CFLAGS_SAI = -I /usr/include/sai + bin_PROGRAMS = mclagsyncd if DEBUG @@ -10,6 +12,6 @@ endif mclagsyncd_SOURCES = mclagsyncd.cpp mclaglink.cpp -mclagsyncd_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) +mclagsyncd_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_SAI) mclagsyncd_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) mclagsyncd_LDADD = -lnl-3 -lnl-route-3 -lswsscommon From 9b54c5cab52e39fd609d7349b20c40344b29fca5 Mon Sep 17 00:00:00 2001 From: "jianjun.dong" Date: Tue, 21 Apr 2020 00:12:16 -0700 Subject: [PATCH 9/9] For compile error --- mclagsyncd/Makefile.am | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mclagsyncd/Makefile.am b/mclagsyncd/Makefile.am index 3867ab23b9..c30b62c906 100644 --- a/mclagsyncd/Makefile.am +++ b/mclagsyncd/Makefile.am @@ -1,4 +1,4 @@ -INCLUDES = -I $(top_srcdir) -I $(top_srcdir)/orchagent +INCLUDES = -I $(top_srcdir) -I $(top_srcdir)/orchagent -I $(top_srcdir)/orchagent/flex_counter CFLAGS_SAI = -I /usr/include/sai @@ -10,8 +10,8 @@ else DBGFLAGS = -g endif -mclagsyncd_SOURCES = mclagsyncd.cpp mclaglink.cpp +mclagsyncd_SOURCES = mclagsyncd.cpp mclaglink.cpp $(top_srcdir)/orchagent/orch.cpp $(top_srcdir)/orchagent/request_parser.cpp mclagsyncd_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_SAI) -mclagsyncd_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) +mclagsyncd_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_SAI) mclagsyncd_LDADD = -lnl-3 -lnl-route-3 -lswsscommon