diff --git a/CHANGELOG.md b/CHANGELOG.md index 64bc426..a01eb51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [3.0.7] - 27 June 2024 + +### Changed + +* fixed null entries in oplog inserts for output and command values to be explicit or removed +* updated query execute function to use self.client.connect_async reconnecting AIOHTTP transport instead of making a new session each time + ## [3.0.6] - 8 April 2024 ### Changed diff --git a/sync.py b/sync.py index d90c7bd..2dc95c7 100644 --- a/sync.py +++ b/sync.py @@ -20,7 +20,7 @@ # Mythic Sync Libraries from mythic import mythic, mythic_classes -VERSION = "3.0.4" +VERSION = "3.0.7" # Logging configuration # Level applies to all loggers, including ``gql`` Transport and Client loggers @@ -95,7 +95,7 @@ class MythicSync: mutation InsertMythicSyncLog ( $oplog: bigint!, $startDate: timestamptz, $endDate: timestamptz, $sourceIp: String, $destIp: String, $tool: String, $userContext: String, $command: String, $description: String, - $output: String, $comments: String, $operatorName: String, $entry_identifier: String!, $extraFields: jsonb! + $comments: String, $operatorName: String, $entry_identifier: String!, $extraFields: jsonb! ) { insert_oplogEntry(objects: { oplog: $oplog, @@ -107,7 +107,6 @@ class MythicSync: userContext: $userContext, command: $command, description: $description, - output: $output, comments: $comments, operatorName: $operatorName, entryIdentifier: $entry_identifier @@ -125,7 +124,7 @@ class MythicSync: mutation UpdateMythicSyncLog ( $id: bigint!, $oplog: bigint!, $startDate: timestamptz, $endDate: timestamptz, $sourceIp: String, $destIp: String, $tool: String, $userContext: String, $command: String, - $description: String, $output: String, $comments: String, $operatorName: String, + $description: String, $comments: String, $operatorName: String, $entry_identifier: String, $extraFields: jsonb ) { update_oplogEntry(where: { @@ -140,7 +139,6 @@ class MythicSync: userContext: $userContext, command: $command, description: $description, - output: $output, comments: $comments, operatorName: $operatorName, entryIdentifier: $entry_identifier, @@ -200,6 +198,10 @@ class MythicSync: "Authorization": f"Bearer {GHOSTWRITER_API_KEY}", "Content-Type": "application/json" } + last_error_timestamp = datetime.utcnow() - timedelta(hours=1) + last_error_delta = timedelta(minutes=30) + session = None + client = None transport = AIOHTTPTransport(url=GRAPHQL_URL, timeout=10, headers=headers) def __init__(self): @@ -210,6 +212,8 @@ async def initialize(self) -> None: Function to initialize necessary connections with Mythic services. This must always be run before anything else. """ + self.client = Client(transport=self.transport, fetch_schema_from_transport=False, ) + self.session = await self.client.connect_async(reconnecting=True) await self._wait_for_redis() mythic_sync_log.info("Successfully connected to Redis") @@ -250,51 +254,50 @@ async def _execute_query(self, query: DocumentNode, variable_values: dict = None """ while True: try: - async with Client(transport=self.transport, fetch_schema_from_transport=False, ) as session: - try: - result = await session.execute(query, variable_values=variable_values) - mythic_sync_log.debug("Successfully executed query with result: %s", result) - return result - except TimeoutError: - mythic_sync_log.error( - "Timeout occurred while trying to connect to Ghostwriter at %s", - self.GHOSTWRITER_URL - ) - await self._post_error_notification(f"MythicSync:\nTimeout occurred while trying to connect to Ghostwriter at {self.GHOSTWRITER_URL}",) - await asyncio.sleep(self.wait_timeout) - continue - except TransportQueryError as e: - mythic_sync_log.exception("Error encountered while fetching GraphQL schema: %s", e) - await self._post_error_notification(f"MythicSync:\nError encountered while fetching GraphQL schema: {e}") - payload = e.errors[0] - if "extensions" in payload: - if "code" in payload["extensions"]: - if payload["extensions"]["code"] == "access-denied": - mythic_sync_log.error( - "Access denied for the provided Ghostwriter API token! Check if it is valid, update your configuration, and restart") - await mythic.send_event_log_message( - mythic=self.mythic_instance, - message=f"Access denied for the provided Ghostwriter API token! Check if it is valid, update your Mythic Sync configuration, and restart the service.", - source="mythic_sync_reject", - level="warning" - ) - exit(1) - if payload["extensions"]["code"] == "postgres-error": - mythic_sync_log.error( - "Ghostwriter's database rejected the query! Check if your configured log ID is correct.") - await mythic.send_event_log_message( - mythic=self.mythic_instance, - message=f"Ghostwriter's database rejected the query! Check if your configured log ID ({self.GHOSTWRITER_OPLOG_ID}) is correct.", - source="mythic_sync_reject", - level="warning" - ) - await asyncio.sleep(self.wait_timeout) - continue - except GraphQLError as e: - mythic_sync_log.exception("Error with GraphQL query: %s", e) - await self._post_error_notification(f"MythicSync:\nError with GraphQL query: {e}") - await asyncio.sleep(self.wait_timeout) - continue + try: + result = await self.session.execute(query, variable_values=variable_values) + mythic_sync_log.debug("Successfully executed query with result: %s", result) + return result + except TimeoutError: + mythic_sync_log.error( + "Timeout occurred while trying to connect to Ghostwriter at %s", + self.GHOSTWRITER_URL + ) + await self._post_error_notification(f"MythicSync:\nTimeout occurred while trying to connect to Ghostwriter at {self.GHOSTWRITER_URL}",) + await asyncio.sleep(self.wait_timeout) + continue + except TransportQueryError as e: + mythic_sync_log.exception("Error encountered while fetching GraphQL schema: %s", e) + await self._post_error_notification(f"MythicSync:\nError encountered while fetching GraphQL schema: {e}") + payload = e.errors[0] + if "extensions" in payload: + if "code" in payload["extensions"]: + if payload["extensions"]["code"] == "access-denied": + mythic_sync_log.error( + "Access denied for the provided Ghostwriter API token! Check if it is valid, update your configuration, and restart") + await mythic.send_event_log_message( + mythic=self.mythic_instance, + message=f"Access denied for the provided Ghostwriter API token! Check if it is valid, update your Mythic Sync configuration, and restart the service.", + source="mythic_sync_reject", + level="warning" + ) + exit(1) + if payload["extensions"]["code"] == "postgres-error": + mythic_sync_log.error( + "Ghostwriter's database rejected the query! Check if your configured log ID is correct.") + await mythic.send_event_log_message( + mythic=self.mythic_instance, + message=f"Ghostwriter's database rejected the query! Check if your configured log ID ({self.GHOSTWRITER_OPLOG_ID}) is correct.", + source="mythic_sync_reject", + level="warning" + ) + await asyncio.sleep(self.wait_timeout) + continue + except GraphQLError as e: + mythic_sync_log.exception("Error with GraphQL query: %s", e) + await self._post_error_notification(f"MythicSync:\nError with GraphQL query: {e}") + await asyncio.sleep(self.wait_timeout) + continue except Exception as exc: mythic_sync_log.exception( "Exception occurred while trying to post the query to Ghostwriter! Trying again in %s seconds...", @@ -307,21 +310,12 @@ async def _execute_query(self, query: DocumentNode, variable_values: dict = None async def _check_token(self) -> None: """Send a `whoami` query to Ghostwriter to check authentication and token expiration.""" whoami = await self._execute_query(self.whoami_query) - try: - expiry = datetime.fromisoformat(whoami["whoami"]["expires"]) - except Exception: - expiry = whoami["whoami"]["expires"] - await mythic.send_event_log_message( - mythic=self.mythic_instance, - message=f"Mythic Sync has successfully authenticated to Ghostwriter. Your configured token expires at: {expiry}", - source="mythic_sync", - level="info" - ) # Check if the token will expire within 24 hours now = datetime.now(timezone.utc) - if isinstance(expiry, datetime) and expiry - now < timedelta(hours=24): + expiry = datetime.fromisoformat(whoami["whoami"]["expires"]) + if expiry - now < timedelta(hours=24): mythic_sync_log.debug(f"The provided Ghostwriter API token expires in less than 24 hours ({expiry})!") await mythic.send_event_log_message( mythic=self.mythic_instance, @@ -329,6 +323,12 @@ async def _check_token(self) -> None: source="mythic_sync", level="warning" ) + await mythic.send_event_log_message( + mythic=self.mythic_instance, + message=f"Mythic Sync has successfully authenticated to Ghostwriter. Your configured token expires at: {expiry}", + source="mythic_sync", + level="info" + ) async def _create_initial_entry(self) -> None: """Send the initial log entry to Ghostwriter's Oplog.""" @@ -424,6 +424,7 @@ async def _mythic_callback_to_ghostwriter_message(self, message: dict) -> dict: gw_message["oplog"] = self.GHOSTWRITER_OPLOG_ID gw_message['entry_identifier'] = message["agent_callback_id"] gw_message['extraFields'] = {} + gw_message["command"] = "" except Exception: mythic_sync_log.exception( "Encountered an exception while processing Mythic's message into a message for Ghostwriter! Received message: %s",