diff --git a/Solutions/Proofpoint On demand(POD) Email Security/Data Connectors/ProofpointSentinelConn.zip b/Solutions/Proofpoint On demand(POD) Email Security/Data Connectors/ProofpointSentinelConn.zip index 2453a889be9..e653127d442 100644 Binary files a/Solutions/Proofpoint On demand(POD) Email Security/Data Connectors/ProofpointSentinelConn.zip and b/Solutions/Proofpoint On demand(POD) Email Security/Data Connectors/ProofpointSentinelConn.zip differ diff --git a/Solutions/Proofpoint On demand(POD) Email Security/Data Connectors/ProofpointSentinelConnector/__init__.py b/Solutions/Proofpoint On demand(POD) Email Security/Data Connectors/ProofpointSentinelConnector/__init__.py index 74bccfd07c4..8702ec89714 100644 --- a/Solutions/Proofpoint On demand(POD) Email Security/Data Connectors/ProofpointSentinelConnector/__init__.py +++ b/Solutions/Proofpoint On demand(POD) Email Security/Data Connectors/ProofpointSentinelConnector/__init__.py @@ -21,15 +21,15 @@ cluster_id = os.environ['ProofpointClusterID'] _token = os.environ['ProofpointToken'] time_delay_minutes = 60 -event_types = ["maillog","message"] +event_types = ["maillog", "message"] logAnalyticsUri = os.environ.get('logAnalyticsUri') if ((logAnalyticsUri in (None, '') or str(logAnalyticsUri).isspace())): logAnalyticsUri = 'https://' + customer_id + '.ods.opinsights.azure.com' pattern = r'https:\/\/([\w\-]+)\.ods\.opinsights\.azure.([a-zA-Z\.]+)$' -match = re.match(pattern,str(logAnalyticsUri)) -if(not match): +match = re.match(pattern, str(logAnalyticsUri)) +if not match: raise Exception("ProofpointPOD: Invalid Log Analytics Uri.") def main(mytimer: func.TimerRequest) -> None: @@ -55,10 +55,33 @@ def gen_timeframe(self, time_delay_minutes): before_time = datetime.datetime.utcnow() - datetime.timedelta(minutes=time_delay_minutes) self.before_time = before_time.strftime("%Y-%m-%dT%H:59:59.999999") self.after_time = before_time.strftime("%Y-%m-%dT%H:00:00.000000") + + def check_and_split_msgParts(self, msg_parts, max_size=32000): + # If msg_parts is a list or dictionary, convert it to a string (JSON format) + if isinstance(msg_parts, (dict, list)): + msg_parts = json.dumps(msg_parts) + + # Calculate the length of the message in bytes + msglen = len(msg_parts.encode('utf-8')) + + # If the message size exceeds the max size, split it + if msglen > max_size: + split_point = len(msg_parts) // 2 + part1 = msg_parts[:split_point] + part2 = msg_parts[split_point:] + + # Recursively split both parts if they are still too large + split_parts = [] + split_parts.extend(self.check_and_split_msgParts(part1, max_size)) # Corrected + split_parts.extend(self.check_and_split_msgParts(part2, max_size)) # Corrected + + return split_parts + else: + return [msg_parts] def set_websocket_conn(self, event_type): + max_retries = 3 url = f"wss://logstream.proofpoint.com:443/v1/stream?cid={self.cluster_id}&type={event_type}&sinceTime={self.after_time}&toTime={self.before_time}" - logging.info('Opening Websocket logstream {}'.format(url)) # defining headers for websocket connection (do not change this) header = { "Connection": "Upgrade", @@ -72,21 +95,26 @@ def set_websocket_conn(self, event_type): 'ca_certs': certifi.where(), 'check_hostname': True } - try: - ws = websocket.create_connection(url, header=header, sslopt=sslopt) - ws.settimeout(20) - time.sleep(2) - logging.info( - 'Websocket connection established to cluster_id={}, event_type={}'.format(self.cluster_id, event_type)) - print( - 'Websocket connection established to cluster_id={}, event_type={}'.format(self.cluster_id, event_type)) - return ws - except Exception as err: - logging.error('Error while connectiong to websocket {}'.format(err)) - print('Error while connectiong to websocket {}'.format(err)) - return None - - def gen_chunks_to_object(self,data,chunksize=100): + for attempt in range(max_retries): + try: + logging.info('Opening Websocket logstream {}'.format(url)) + ws = websocket.create_connection(url, header=header, sslopt=sslopt) + ws.settimeout(20) + time.sleep(2) + logging.info( + 'Websocket connection established to cluster_id={}, event_type={}'.format(self.cluster_id, event_type)) + print('Websocket connection established to cluster_id={}, event_type={}'.format(self.cluster_id, event_type)) + return ws + except Exception as err: + logging.error('Error while connectiong to websocket {}'.format(err)) + print('Error while connectiong to websocket {}'.format(err)) + if attempt < max_retries - 1: + logging.info('Retrying connection in 5 seconds...') + time.sleep(5) # Wait for a while before retrying + else: + return None + + def gen_chunks_to_object(self, data, chunksize=100): chunk = [] for index, line in enumerate(data): if (index % chunksize == 0 and index > 0): @@ -95,15 +123,26 @@ def gen_chunks_to_object(self,data,chunksize=100): chunk.append(line) yield chunk - def gen_chunks(self,data,event_type): + def gen_chunks(self, data, event_type): for chunk in self.gen_chunks_to_object(data, chunksize=10000): print(len(chunk)) obj_array = [] for row in chunk: if row != None and row != '': y = json.loads(row) + #logging.info(f'json row : {y}') y.update({'event_type': event_type}) + if 'msgParts' in y: + msg_parts = y['msgParts'] + split_parts = self.check_and_split_msgParts(msg_parts) + if len(split_parts) == 1: # No splitting required + y["msgParts"] = split_parts[0] + else: # Splitting required + for i, part in enumerate(split_parts, start=1): + y[f"msgParts{i}"] = part + del y["msgParts"] obj_array.append(y) + #logging.info(f'Response Object array : {obj_array}') sentinel = AzureSentinelConnector( log_analytics_uri=logAnalyticsUri, @@ -113,6 +152,7 @@ def gen_chunks(self,data,event_type): queue_size=5000 ) for event in obj_array: + #logging.info(f'Response event : {event}') sentinel.send(event) sentinel.flush() @@ -128,7 +168,7 @@ def get_data(self, event_type=None): events.append(data) sent_events += 1 if len(events) > 500: - self.gen_chunks(events,event_type) + self.gen_chunks(events, event_type) events = [] except websocket._exceptions.WebSocketTimeoutException: break @@ -142,10 +182,10 @@ def get_data(self, event_type=None): logging.error('Error while closing socket: {}'.format(err)) print('Error while closing socket: {}'.format(err)) if sent_events > 0: - self.gen_chunks(events,event_type) + self.gen_chunks(events, event_type) logging.info('Total events sent: {}. Type: {}. Period(UTC): {} - {}'.format(sent_events, event_type, self.after_time, self.before_time)) print('Total events sent: {}. Type: {}. Period(UTC): {} - {}'.format(sent_events, event_type, self.after_time, - self.before_time)) \ No newline at end of file + self.before_time)) diff --git a/Solutions/Proofpoint On demand(POD) Email Security/Data Connectors/host.json b/Solutions/Proofpoint On demand(POD) Email Security/Data Connectors/host.json index a9b836982df..f8f3efaf0e7 100644 --- a/Solutions/Proofpoint On demand(POD) Email Security/Data Connectors/host.json +++ b/Solutions/Proofpoint On demand(POD) Email Security/Data Connectors/host.json @@ -1,15 +1,16 @@ { - "version": "2.0", - "logging": { - "applicationInsights": { - "samplingSettings": { - "isEnabled": true, - "excludedTypes": "Request" - } + "version": "2.0", + "functionTimeout": "00:10:00", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request" } - }, - "extensionBundle": { - "id": "Microsoft.Azure.Functions.ExtensionBundle", - "version": "[3.*, 4.0.0)" } - } \ No newline at end of file + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle", + "version": "[3.*, 4.0.0)" + } +} \ No newline at end of file