Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixing GraphQL Outputs and TCP Already connected bugs #17

Merged
merged 1 commit into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [3.0.7] - 27 June 2024

### Changed

* fixed null entries in oplog inserts for output and command values to be explicit or removed
* updated query execute function to use self.client.connect_async reconnecting AIOHTTP transport instead of making a new session each time

## [3.0.6] - 8 April 2024

### Changed
Expand Down
123 changes: 62 additions & 61 deletions sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# Mythic Sync Libraries
from mythic import mythic, mythic_classes

VERSION = "3.0.4"
VERSION = "3.0.7"

# Logging configuration
# Level applies to all loggers, including ``gql`` Transport and Client loggers
Expand Down Expand Up @@ -95,7 +95,7 @@ class MythicSync:
mutation InsertMythicSyncLog (
$oplog: bigint!, $startDate: timestamptz, $endDate: timestamptz, $sourceIp: String, $destIp: String,
$tool: String, $userContext: String, $command: String, $description: String,
$output: String, $comments: String, $operatorName: String, $entry_identifier: String!, $extraFields: jsonb!
$comments: String, $operatorName: String, $entry_identifier: String!, $extraFields: jsonb!
) {
insert_oplogEntry(objects: {
oplog: $oplog,
Expand All @@ -107,7 +107,6 @@ class MythicSync:
userContext: $userContext,
command: $command,
description: $description,
output: $output,
comments: $comments,
operatorName: $operatorName,
entryIdentifier: $entry_identifier
Expand All @@ -125,7 +124,7 @@ class MythicSync:
mutation UpdateMythicSyncLog (
$id: bigint!, $oplog: bigint!, $startDate: timestamptz, $endDate: timestamptz, $sourceIp: String,
$destIp: String, $tool: String, $userContext: String, $command: String,
$description: String, $output: String, $comments: String, $operatorName: String,
$description: String, $comments: String, $operatorName: String,
$entry_identifier: String, $extraFields: jsonb
) {
update_oplogEntry(where: {
Expand All @@ -140,7 +139,6 @@ class MythicSync:
userContext: $userContext,
command: $command,
description: $description,
output: $output,
comments: $comments,
operatorName: $operatorName,
entryIdentifier: $entry_identifier,
Expand Down Expand Up @@ -200,6 +198,10 @@ class MythicSync:
"Authorization": f"Bearer {GHOSTWRITER_API_KEY}",
"Content-Type": "application/json"
}
last_error_timestamp = datetime.utcnow() - timedelta(hours=1)
last_error_delta = timedelta(minutes=30)
session = None
client = None
transport = AIOHTTPTransport(url=GRAPHQL_URL, timeout=10, headers=headers)

def __init__(self):
Expand All @@ -210,6 +212,8 @@ async def initialize(self) -> None:
Function to initialize necessary connections with Mythic services. This must
always be run before anything else.
"""
self.client = Client(transport=self.transport, fetch_schema_from_transport=False, )
self.session = await self.client.connect_async(reconnecting=True)
await self._wait_for_redis()
mythic_sync_log.info("Successfully connected to Redis")

Expand Down Expand Up @@ -250,51 +254,50 @@ async def _execute_query(self, query: DocumentNode, variable_values: dict = None
"""
while True:
try:
async with Client(transport=self.transport, fetch_schema_from_transport=False, ) as session:
try:
result = await session.execute(query, variable_values=variable_values)
mythic_sync_log.debug("Successfully executed query with result: %s", result)
return result
except TimeoutError:
mythic_sync_log.error(
"Timeout occurred while trying to connect to Ghostwriter at %s",
self.GHOSTWRITER_URL
)
await self._post_error_notification(f"MythicSync:\nTimeout occurred while trying to connect to Ghostwriter at {self.GHOSTWRITER_URL}",)
await asyncio.sleep(self.wait_timeout)
continue
except TransportQueryError as e:
mythic_sync_log.exception("Error encountered while fetching GraphQL schema: %s", e)
await self._post_error_notification(f"MythicSync:\nError encountered while fetching GraphQL schema: {e}")
payload = e.errors[0]
if "extensions" in payload:
if "code" in payload["extensions"]:
if payload["extensions"]["code"] == "access-denied":
mythic_sync_log.error(
"Access denied for the provided Ghostwriter API token! Check if it is valid, update your configuration, and restart")
await mythic.send_event_log_message(
mythic=self.mythic_instance,
message=f"Access denied for the provided Ghostwriter API token! Check if it is valid, update your Mythic Sync configuration, and restart the service.",
source="mythic_sync_reject",
level="warning"
)
exit(1)
if payload["extensions"]["code"] == "postgres-error":
mythic_sync_log.error(
"Ghostwriter's database rejected the query! Check if your configured log ID is correct.")
await mythic.send_event_log_message(
mythic=self.mythic_instance,
message=f"Ghostwriter's database rejected the query! Check if your configured log ID ({self.GHOSTWRITER_OPLOG_ID}) is correct.",
source="mythic_sync_reject",
level="warning"
)
await asyncio.sleep(self.wait_timeout)
continue
except GraphQLError as e:
mythic_sync_log.exception("Error with GraphQL query: %s", e)
await self._post_error_notification(f"MythicSync:\nError with GraphQL query: {e}")
await asyncio.sleep(self.wait_timeout)
continue
try:
result = await self.session.execute(query, variable_values=variable_values)
mythic_sync_log.debug("Successfully executed query with result: %s", result)
return result
except TimeoutError:
mythic_sync_log.error(
"Timeout occurred while trying to connect to Ghostwriter at %s",
self.GHOSTWRITER_URL
)
await self._post_error_notification(f"MythicSync:\nTimeout occurred while trying to connect to Ghostwriter at {self.GHOSTWRITER_URL}",)
await asyncio.sleep(self.wait_timeout)
continue
except TransportQueryError as e:
mythic_sync_log.exception("Error encountered while fetching GraphQL schema: %s", e)
await self._post_error_notification(f"MythicSync:\nError encountered while fetching GraphQL schema: {e}")
payload = e.errors[0]
if "extensions" in payload:
if "code" in payload["extensions"]:
if payload["extensions"]["code"] == "access-denied":
mythic_sync_log.error(
"Access denied for the provided Ghostwriter API token! Check if it is valid, update your configuration, and restart")
await mythic.send_event_log_message(
mythic=self.mythic_instance,
message=f"Access denied for the provided Ghostwriter API token! Check if it is valid, update your Mythic Sync configuration, and restart the service.",
source="mythic_sync_reject",
level="warning"
)
exit(1)
if payload["extensions"]["code"] == "postgres-error":
mythic_sync_log.error(
"Ghostwriter's database rejected the query! Check if your configured log ID is correct.")
await mythic.send_event_log_message(
mythic=self.mythic_instance,
message=f"Ghostwriter's database rejected the query! Check if your configured log ID ({self.GHOSTWRITER_OPLOG_ID}) is correct.",
source="mythic_sync_reject",
level="warning"
)
await asyncio.sleep(self.wait_timeout)
continue
except GraphQLError as e:
mythic_sync_log.exception("Error with GraphQL query: %s", e)
await self._post_error_notification(f"MythicSync:\nError with GraphQL query: {e}")
await asyncio.sleep(self.wait_timeout)
continue
except Exception as exc:
mythic_sync_log.exception(
"Exception occurred while trying to post the query to Ghostwriter! Trying again in %s seconds...",
Expand All @@ -307,28 +310,25 @@ async def _execute_query(self, query: DocumentNode, variable_values: dict = None
async def _check_token(self) -> None:
"""Send a `whoami` query to Ghostwriter to check authentication and token expiration."""
whoami = await self._execute_query(self.whoami_query)
try:
expiry = datetime.fromisoformat(whoami["whoami"]["expires"])
except Exception:
expiry = whoami["whoami"]["expires"]

await mythic.send_event_log_message(
mythic=self.mythic_instance,
message=f"Mythic Sync has successfully authenticated to Ghostwriter. Your configured token expires at: {expiry}",
source="mythic_sync",
level="info"
)

# Check if the token will expire within 24 hours
now = datetime.now(timezone.utc)
if isinstance(expiry, datetime) and expiry - now < timedelta(hours=24):
expiry = datetime.fromisoformat(whoami["whoami"]["expires"])
if expiry - now < timedelta(hours=24):
mythic_sync_log.debug(f"The provided Ghostwriter API token expires in less than 24 hours ({expiry})!")
await mythic.send_event_log_message(
mythic=self.mythic_instance,
message=f"The provided Ghostwriter API token expires in less than 24 hours ({expiry})!",
source="mythic_sync",
level="warning"
)
await mythic.send_event_log_message(
mythic=self.mythic_instance,
message=f"Mythic Sync has successfully authenticated to Ghostwriter. Your configured token expires at: {expiry}",
source="mythic_sync",
level="info"
)

async def _create_initial_entry(self) -> None:
"""Send the initial log entry to Ghostwriter's Oplog."""
Expand Down Expand Up @@ -424,6 +424,7 @@ async def _mythic_callback_to_ghostwriter_message(self, message: dict) -> dict:
gw_message["oplog"] = self.GHOSTWRITER_OPLOG_ID
gw_message['entry_identifier'] = message["agent_callback_id"]
gw_message['extraFields'] = {}
gw_message["command"] = ""
except Exception:
mythic_sync_log.exception(
"Encountered an exception while processing Mythic's message into a message for Ghostwriter! Received message: %s",
Expand Down