Skip to content

Commit

Permalink
Merge pull request #17 from MythicMeta/fix_output_error
Browse files Browse the repository at this point in the history
Fixing GraphQL Outputs and TCP Already connected bugs
  • Loading branch information
chrismaddalena authored Jun 27, 2024
2 parents 92cda1b + 84c9b4e commit 496f1f4
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 61 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [3.0.7] - 27 June 2024

### Changed

* fixed null entries in oplog inserts for output and command values to be explicit or removed
* updated query execute function to use self.client.connect_async reconnecting AIOHTTP transport instead of making a new session each time

## [3.0.6] - 8 April 2024

### Changed
Expand Down
123 changes: 62 additions & 61 deletions sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# Mythic Sync Libraries
from mythic import mythic, mythic_classes

VERSION = "3.0.4"
VERSION = "3.0.7"

# Logging configuration
# Level applies to all loggers, including ``gql`` Transport and Client loggers
Expand Down Expand Up @@ -95,7 +95,7 @@ class MythicSync:
mutation InsertMythicSyncLog (
$oplog: bigint!, $startDate: timestamptz, $endDate: timestamptz, $sourceIp: String, $destIp: String,
$tool: String, $userContext: String, $command: String, $description: String,
$output: String, $comments: String, $operatorName: String, $entry_identifier: String!, $extraFields: jsonb!
$comments: String, $operatorName: String, $entry_identifier: String!, $extraFields: jsonb!
) {
insert_oplogEntry(objects: {
oplog: $oplog,
Expand All @@ -107,7 +107,6 @@ class MythicSync:
userContext: $userContext,
command: $command,
description: $description,
output: $output,
comments: $comments,
operatorName: $operatorName,
entryIdentifier: $entry_identifier
Expand All @@ -125,7 +124,7 @@ class MythicSync:
mutation UpdateMythicSyncLog (
$id: bigint!, $oplog: bigint!, $startDate: timestamptz, $endDate: timestamptz, $sourceIp: String,
$destIp: String, $tool: String, $userContext: String, $command: String,
$description: String, $output: String, $comments: String, $operatorName: String,
$description: String, $comments: String, $operatorName: String,
$entry_identifier: String, $extraFields: jsonb
) {
update_oplogEntry(where: {
Expand All @@ -140,7 +139,6 @@ class MythicSync:
userContext: $userContext,
command: $command,
description: $description,
output: $output,
comments: $comments,
operatorName: $operatorName,
entryIdentifier: $entry_identifier,
Expand Down Expand Up @@ -200,6 +198,10 @@ class MythicSync:
"Authorization": f"Bearer {GHOSTWRITER_API_KEY}",
"Content-Type": "application/json"
}
last_error_timestamp = datetime.utcnow() - timedelta(hours=1)
last_error_delta = timedelta(minutes=30)
session = None
client = None
transport = AIOHTTPTransport(url=GRAPHQL_URL, timeout=10, headers=headers)

def __init__(self):
Expand All @@ -210,6 +212,8 @@ async def initialize(self) -> None:
Function to initialize necessary connections with Mythic services. This must
always be run before anything else.
"""
self.client = Client(transport=self.transport, fetch_schema_from_transport=False, )
self.session = await self.client.connect_async(reconnecting=True)
await self._wait_for_redis()
mythic_sync_log.info("Successfully connected to Redis")

Expand Down Expand Up @@ -250,51 +254,50 @@ async def _execute_query(self, query: DocumentNode, variable_values: dict = None
"""
while True:
try:
async with Client(transport=self.transport, fetch_schema_from_transport=False, ) as session:
try:
result = await session.execute(query, variable_values=variable_values)
mythic_sync_log.debug("Successfully executed query with result: %s", result)
return result
except TimeoutError:
mythic_sync_log.error(
"Timeout occurred while trying to connect to Ghostwriter at %s",
self.GHOSTWRITER_URL
)
await self._post_error_notification(f"MythicSync:\nTimeout occurred while trying to connect to Ghostwriter at {self.GHOSTWRITER_URL}",)
await asyncio.sleep(self.wait_timeout)
continue
except TransportQueryError as e:
mythic_sync_log.exception("Error encountered while fetching GraphQL schema: %s", e)
await self._post_error_notification(f"MythicSync:\nError encountered while fetching GraphQL schema: {e}")
payload = e.errors[0]
if "extensions" in payload:
if "code" in payload["extensions"]:
if payload["extensions"]["code"] == "access-denied":
mythic_sync_log.error(
"Access denied for the provided Ghostwriter API token! Check if it is valid, update your configuration, and restart")
await mythic.send_event_log_message(
mythic=self.mythic_instance,
message=f"Access denied for the provided Ghostwriter API token! Check if it is valid, update your Mythic Sync configuration, and restart the service.",
source="mythic_sync_reject",
level="warning"
)
exit(1)
if payload["extensions"]["code"] == "postgres-error":
mythic_sync_log.error(
"Ghostwriter's database rejected the query! Check if your configured log ID is correct.")
await mythic.send_event_log_message(
mythic=self.mythic_instance,
message=f"Ghostwriter's database rejected the query! Check if your configured log ID ({self.GHOSTWRITER_OPLOG_ID}) is correct.",
source="mythic_sync_reject",
level="warning"
)
await asyncio.sleep(self.wait_timeout)
continue
except GraphQLError as e:
mythic_sync_log.exception("Error with GraphQL query: %s", e)
await self._post_error_notification(f"MythicSync:\nError with GraphQL query: {e}")
await asyncio.sleep(self.wait_timeout)
continue
try:
result = await self.session.execute(query, variable_values=variable_values)
mythic_sync_log.debug("Successfully executed query with result: %s", result)
return result
except TimeoutError:
mythic_sync_log.error(
"Timeout occurred while trying to connect to Ghostwriter at %s",
self.GHOSTWRITER_URL
)
await self._post_error_notification(f"MythicSync:\nTimeout occurred while trying to connect to Ghostwriter at {self.GHOSTWRITER_URL}",)
await asyncio.sleep(self.wait_timeout)
continue
except TransportQueryError as e:
mythic_sync_log.exception("Error encountered while fetching GraphQL schema: %s", e)
await self._post_error_notification(f"MythicSync:\nError encountered while fetching GraphQL schema: {e}")
payload = e.errors[0]
if "extensions" in payload:
if "code" in payload["extensions"]:
if payload["extensions"]["code"] == "access-denied":
mythic_sync_log.error(
"Access denied for the provided Ghostwriter API token! Check if it is valid, update your configuration, and restart")
await mythic.send_event_log_message(
mythic=self.mythic_instance,
message=f"Access denied for the provided Ghostwriter API token! Check if it is valid, update your Mythic Sync configuration, and restart the service.",
source="mythic_sync_reject",
level="warning"
)
exit(1)
if payload["extensions"]["code"] == "postgres-error":
mythic_sync_log.error(
"Ghostwriter's database rejected the query! Check if your configured log ID is correct.")
await mythic.send_event_log_message(
mythic=self.mythic_instance,
message=f"Ghostwriter's database rejected the query! Check if your configured log ID ({self.GHOSTWRITER_OPLOG_ID}) is correct.",
source="mythic_sync_reject",
level="warning"
)
await asyncio.sleep(self.wait_timeout)
continue
except GraphQLError as e:
mythic_sync_log.exception("Error with GraphQL query: %s", e)
await self._post_error_notification(f"MythicSync:\nError with GraphQL query: {e}")
await asyncio.sleep(self.wait_timeout)
continue
except Exception as exc:
mythic_sync_log.exception(
"Exception occurred while trying to post the query to Ghostwriter! Trying again in %s seconds...",
Expand All @@ -307,28 +310,25 @@ async def _execute_query(self, query: DocumentNode, variable_values: dict = None
async def _check_token(self) -> None:
"""Send a `whoami` query to Ghostwriter to check authentication and token expiration."""
whoami = await self._execute_query(self.whoami_query)
try:
expiry = datetime.fromisoformat(whoami["whoami"]["expires"])
except Exception:
expiry = whoami["whoami"]["expires"]

await mythic.send_event_log_message(
mythic=self.mythic_instance,
message=f"Mythic Sync has successfully authenticated to Ghostwriter. Your configured token expires at: {expiry}",
source="mythic_sync",
level="info"
)

# Check if the token will expire within 24 hours
now = datetime.now(timezone.utc)
if isinstance(expiry, datetime) and expiry - now < timedelta(hours=24):
expiry = datetime.fromisoformat(whoami["whoami"]["expires"])
if expiry - now < timedelta(hours=24):
mythic_sync_log.debug(f"The provided Ghostwriter API token expires in less than 24 hours ({expiry})!")
await mythic.send_event_log_message(
mythic=self.mythic_instance,
message=f"The provided Ghostwriter API token expires in less than 24 hours ({expiry})!",
source="mythic_sync",
level="warning"
)
await mythic.send_event_log_message(
mythic=self.mythic_instance,
message=f"Mythic Sync has successfully authenticated to Ghostwriter. Your configured token expires at: {expiry}",
source="mythic_sync",
level="info"
)

async def _create_initial_entry(self) -> None:
"""Send the initial log entry to Ghostwriter's Oplog."""
Expand Down Expand Up @@ -424,6 +424,7 @@ async def _mythic_callback_to_ghostwriter_message(self, message: dict) -> dict:
gw_message["oplog"] = self.GHOSTWRITER_OPLOG_ID
gw_message['entry_identifier'] = message["agent_callback_id"]
gw_message['extraFields'] = {}
gw_message["command"] = ""
except Exception:
mythic_sync_log.exception(
"Encountered an exception while processing Mythic's message into a message for Ghostwriter! Received message: %s",
Expand Down

0 comments on commit 496f1f4

Please sign in to comment.