Skip to content

Commit

Permalink
Fix searching when connected to qemu-system instance (#906)
Browse files Browse the repository at this point in the history
* Fix searching when connected to `qemu-system` instance

* Fix Qemu user/kernel mode detection

* Properly save offset as int

* Update styling

* Fix single quotes

* Change remote initializing check

* Style changes and parsing unit test
  • Loading branch information
clubby789 authored Nov 23, 2022
1 parent 4e89034 commit d1833d3
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 13 deletions.
70 changes: 57 additions & 13 deletions gef.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,23 +627,32 @@ def __str__(self) -> str:
perm_str += "x" if self & Permission.EXECUTE else "-"
return perm_str

@staticmethod
def from_info_sections(*args: str) -> "Permission":
perm = Permission(0)
@classmethod
def from_info_sections(cls, *args: str) -> "Permission":
perm = cls(0)
for arg in args:
if "READONLY" in arg: perm |= Permission.READ
if "DATA" in arg: perm |= Permission.WRITE
if "CODE" in arg: perm |= Permission.EXECUTE
return perm

@staticmethod
def from_process_maps(perm_str: str) -> "Permission":
perm = Permission(0)
@classmethod
def from_process_maps(cls, perm_str: str) -> "Permission":
perm = cls(0)
if perm_str[0] == "r": perm |= Permission.READ
if perm_str[1] == "w": perm |= Permission.WRITE
if perm_str[2] == "x": perm |= Permission.EXECUTE
return perm

@classmethod
def from_info_mem(cls, perm_str: str) -> "Permission":
perm = cls(0)
# perm_str[0] shows if this is a user page, which
# we don't track
if perm_str[1] == "r": perm |= Permission.READ
if perm_str[2] == "w": perm |= Permission.WRITE
return perm


class Section:
"""GEF representation of process memory sections."""
Expand Down Expand Up @@ -3343,24 +3352,24 @@ def get_os() -> str:
def is_qemu() -> bool:
if not is_remote_debug():
return False
response = gdb.execute('maintenance packet Qqemu.sstepbits', to_string=True, from_tty=False)
return 'ENABLE=' in response
response = gdb.execute("maintenance packet Qqemu.sstepbits", to_string=True, from_tty=False)
return "ENABLE=" in response


@lru_cache()
def is_qemu_usermode() -> bool:
if not is_qemu():
return False
response = gdb.execute('maintenance packet QOffsets', to_string=True, from_tty=False)
response = gdb.execute("maintenance packet qOffsets", to_string=True, from_tty=False)
return "Text=" in response


@lru_cache()
def is_qemu_system() -> bool:
if not is_qemu():
return False
response = gdb.execute('maintenance packet QOffsets', to_string=True, from_tty=False)
return 'received: ""' in response
response = gdb.execute("maintenance packet qOffsets", to_string=True, from_tty=False)
return "received: \"\"" in response


def get_filepath() -> Optional[str]:
Expand Down Expand Up @@ -3535,6 +3544,7 @@ def exit_handler(_: "gdb.ExitedEvent") -> None:
gef.session.remote.close()
del gef.session.remote
gef.session.remote = None
gef.session.remote_initializing = False
return


Expand Down Expand Up @@ -3748,7 +3758,7 @@ def is_in_x86_kernel(address: int) -> bool:

def is_remote_debug() -> bool:
""""Return True is the current debugging session is running through GDB remote session."""
return gef.session.remote is not None
return gef.session.remote_initializing or gef.session.remote is not None


def de_bruijn(alphabet: bytes, n: int) -> Generator[str, None, None]:
Expand Down Expand Up @@ -5948,7 +5958,12 @@ def do_invoke(self, _: List[str], **kwargs: Any) -> None:
return

# try to establish the remote session, throw on error
# Set `.remote_initializing` to True here - `GefRemoteSessionManager` invokes code which
# calls `is_remote_debug` which checks if `remote_initializing` is True or `.remote` is None
# This prevents some spurious errors being thrown during startup
gef.session.remote_initializing = True
gef.session.remote = GefRemoteSessionManager(args.host, args.port, args.pid, qemu_binary)
gef.session.remote_initializing = False
reset_all_caches()
gdb.execute("context")
return
Expand Down Expand Up @@ -10187,6 +10202,12 @@ def maps(self) -> List[Section]:

def __parse_maps(self) -> List[Section]:
"""Return the mapped memory sections"""
try:
if is_qemu_system():
return list(self.__parse_info_mem())
except gdb.error:
# Target may not support this command
pass
try:
return list(self.__parse_procfs_maps())
except FileNotFoundError:
Expand Down Expand Up @@ -10251,6 +10272,27 @@ def __parse_gdb_info_sections(self) -> Generator[Section, None, None]:
continue
return

def __parse_info_mem(self) -> Generator[Section, None, None]:
"""Get the memory mapping from GDB's command `monitor info mem`"""
for line in StringIO(gdb.execute("monitor info mem", to_string=True)):
if not line:
break
try:
ranges, off, perms = line.split()
off = int(off, 16)
start, end = [int(s, 16) for s in ranges.split("-")]
except ValueError as e:
continue

perm = Permission.from_info_mem(perms)
yield Section(
page_start=start,
page_end=end,
offset=off,
permission=perm,
inode="",
)


class GefHeapManager(GefManager):
"""Class managing session heap."""
Expand Down Expand Up @@ -10429,6 +10471,7 @@ class GefSessionManager(GefManager):
def __init__(self) -> None:
self.reset_caches()
self.remote: Optional["GefRemoteSessionManager"] = None
self.remote_initializing: bool = False
self.qemu_mode: bool = False
self.convenience_vars_index: int = 0
self.heap_allocated_chunks: List[Tuple[int, int]] = []
Expand Down Expand Up @@ -10461,7 +10504,8 @@ def __str__(self) -> str:
def auxiliary_vector(self) -> Optional[Dict[str, int]]:
if not is_alive():
return None

if is_qemu_system():
return None
if not self._auxiliary_vector:
auxiliary_vector = {}
auxv_info = gdb.execute("info auxv", to_string=True)
Expand Down
12 changes: 12 additions & 0 deletions tests/api/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ def test_func_parse_address(self):
res = gdb_test_python_method(func)
self.assertException(res)

def test_func_parse_maps(self):
func = "Permission.from_info_sections(' [10] 0x555555574000->0x55555557401b at 0x00020000: .init ALLOC LOAD READONLY CODE HAS_CONTENTS')"
res = gdb_test_python_method(func)
self.assertNoException(res)

func = "Permission.from_process_maps('0x0000555555554000 0x0000555555574000 0x0000000000000000 r-- /usr/bin/bash')"
res = gdb_test_python_method(func)
self.assertNoException(res)

func = "Permission.from_info_mem('ffffff2a65e0b000-ffffff2a65e0c000 0000000000001000 -r-')"
res = gdb_test_python_method(func)
self.assertNoException(res)

@pytest.mark.slow
@pytest.mark.online
Expand Down

0 comments on commit d1833d3

Please sign in to comment.