Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix visualize_heap #77

Merged
merged 5 commits into from
Jul 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/commands/visualize_heap.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ Currently only the glibc heap support is implemented. The command doesn't take a
gef➤ visualize-libc-heap-chunks
```

![img](https://i.imgur.com/IKHlLlp.png)
![img](https://i.imgur.com/jQYaiyB.png)
12 changes: 6 additions & 6 deletions docs/commands/windbg.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ This plugin is a set of commands, aliases and extensions to mimic some of the mo
- `ea` : `patch string`
- `dps` : `dereference`
- `bp` : `break`
- `bl` : "info`breakpoints`
- `bd` : "disable`breakpoints`
- `bc` : "delete`breakpoints`
- `be` : "enable`breakpoints`
- `bl` : `info breakpoints`
- `bd` : `disable breakpoints`
- `bc` : `delete breakpoints`
- `be` : `enable breakpoints`
- `tbp` : `tbreak`
- `s` : `grep`
- `pa` : `advance`
- `kp` : "info`stack`
- `kp` : `info stack`
- `ptc` : `finish`
- `uf` : `disassemble`
- `uf` : `disassemble`
3 changes: 2 additions & 1 deletion scripts/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1627,6 +1627,7 @@ class GefMemoryManager(GefManager):
class GefHeapManager(GefManager):
def __init__(self) -> None: ...
def reset_caches(self) -> None: ...
@property
def main_arena(self) -> Optional[GlibcArena]: ...
@property
def selected_arena(self) -> Optional[GlibcArena]: ...
Expand All @@ -1637,7 +1638,7 @@ class GefHeapManager(GefManager):
@property
def base_address(self) -> Optional[int]: ...
@property
def chunks(self) -> Union[List, Iterator]: ...
def chunks(self) -> Union[List[GlibcChunk], Iterator[GlibcChunk]]: ...
def min_chunk_size(self) -> int: ...
def malloc_alignment(self) -> int: ...
def csize2tidx(self, size: int) -> int: ...
Expand Down
163 changes: 86 additions & 77 deletions scripts/visualize_heap.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""
Provide an ascii-based graphical representation of the heap layout.

Note: Mostly done for x64, other architectures were not throughly tested.
"""

__AUTHOR__ = "hugsy"
__VERSION__ = 0.3
__VERSION__ = 0.4
__LICENSE__ = "MIT"

import os
Expand Down Expand Up @@ -43,11 +45,14 @@ def get_tcache_count():
@lru_cache(128)
def collect_known_values() -> Dict[int, str]:
arena = gef.heap.main_arena
result: Dict[int, str] = {} # format is { 0xaddress : "name" ,}
if not arena:
raise RuntimeError

version = gef.libc.version
if not version:
return result
raise RuntimeError

result: Dict[int, str] = {} # format is { 0xaddress : "name" ,}

# tcache
if version >= (2, 27):
Expand Down Expand Up @@ -84,26 +89,28 @@ def collect_known_values() -> Dict[int, str]:

# other bins
for name in ["unorderedbins", "smallbins", "largebins"]:

fw, bk = arena.bin(j)
if bk == 0x00 and fw == 0x00:
continue
head = GlibcChunk(bk, from_base=True).fwd
if head == fw:
continue

chunk = GlibcChunk(head, from_base=True)
j = 0
i = 0
while True:
if chunk is None:
(fw, bk) = arena.bin(i)
if (fw, bk) == (0, 0):
break
result[chunk.data_address] = f"{name}[{i}/{j}]"
next_chunk_address = chunk.get_fwd_ptr(True)
if not next_chunk_address:
break
next_chunk = GlibcChunk(next_chunk_address, from_base=True)
j += 1
chunk = next_chunk

head = GlibcChunk(bk, from_base=True).fwd
if head == fw:
continue

chunk = GlibcChunk(head, from_base=True)
j = 0
while True:
if chunk is None:
break
result[chunk.data_address] = f"{name}[{i}/{j}]"
next_chunk_address = chunk.get_fwd_ptr(True)
if not next_chunk_address:
break
next_chunk = GlibcChunk(next_chunk_address, from_base=True)
j += 1
chunk = next_chunk

return result

Expand All @@ -119,6 +126,21 @@ def collect_known_ranges() -> List[Tuple[range, str]]:
return result


def is_corrupted(chunk: GlibcChunk, arena: GlibcArena) -> bool:
"""Various checks to see if a chunk is corrupted"""

if chunk.base_address > chunk.data_address:
return False

if chunk.base_address > arena.top:
return True

if chunk.size == 0:
return True

return False


@register
class VisualizeHeapChunksCommand(GenericCommand):
"""Visual helper for glibc heap chunks"""
Expand All @@ -134,92 +156,79 @@ def __init__(self):

@only_if_gdb_running
def do_invoke(self, _):
ptrsize = gef.arch.ptrsize
heap_base_address = gef.heap.base_address
arena = gef.heap.main_arena
if not arena.top or not heap_base_address:
if not gef.heap.main_arena or not gef.heap.base_address:
err("The heap has not been initialized")
return

top = align_address(int(arena.top))
base = align_address(heap_base_address)
ptrsize = gef.arch.ptrsize
arena = gef.heap.main_arena

colors = ["cyan", "red", "yellow", "blue", "green"]
cur = GlibcChunk(base, from_base=True)
idx = 0
colors = ("cyan", "red", "yellow", "blue", "green")
color_idx = 0
chunk_idx = 0

known_ranges = collect_known_ranges()
known_values = collect_known_values()
known_values = [] # collect_known_values()

for chunk in gef.heap.chunks:
if is_corrupted(chunk, arena):
err("Corrupted heap, cannot continue.")
return

while True:
base = cur.base_address
addr = cur.data_address
aggregate_nuls = 0
base = chunk.base_address

if base == top:
if base == arena.top:
gef_print(
f"{format_address(addr)} {format_address(gef.memory.read_integer(addr))} {Color.colorify(LEFT_ARROW + 'Top Chunk', 'red bold')}\n"
f"{format_address(addr+ptrsize)} {format_address(gef.memory.read_integer(addr+ptrsize))} {Color.colorify(LEFT_ARROW + 'Top Chunk Size', 'red bold')}"
f"{format_address(base)} {format_address(gef.memory.read_integer(base))} {Color.colorify(LEFT_ARROW + 'Top Chunk', 'red bold')}\n"
f"{format_address(base+ptrsize)} {format_address(gef.memory.read_integer(base+ptrsize))} {Color.colorify(LEFT_ARROW + 'Top Chunk Size', 'red bold')}"
)
break

if cur.size == 0:
warn("Unexpected size for chunk, cannot pursue. Corrupted heap?")
break

for off in range(0, cur.size, ptrsize):
addr = base + off
value = gef.memory.read_integer(addr)
for current in range(base, base + chunk.size, ptrsize):
value = gef.memory.read_integer(current)
if value == 0:
if off != 0 and off != cur.size - ptrsize:
if current != base and current != (base + chunk.size - ptrsize):
# Only aggregate null bytes that are not starting/finishing the chunk
aggregate_nuls += 1
if aggregate_nuls > 1:
continue

if aggregate_nuls > 2:
gef_print(
" ↓",
" [...]",
" ↓"
)
# If here, we have some aggregated null bytes, print a small thing to mention that
gef_print(" ↓ [...] ↓")
aggregate_nuls = 0

text = "".join(
[chr(b) if 0x20 <= b < 0x7F else "." for b in gef.memory.read(addr, ptrsize)])
line = f"{format_address(addr)} {Color.colorify(format_address(value), colors[idx % len(colors)])}"
line += f" {text}"
derefs = dereference_from(addr)
# Read the context in a hexdump-like format
hexdump = "".join(map(lambda b: chr(b) if 0x20 <= b < 0x7F else ".",
gef.memory.read(current, ptrsize)))

if gef.arch.endianness == Endianness.LITTLE_ENDIAN:
hexdump = hexdump[::-1]

line = f"{format_address(current)} {Color.colorify(format_address(value), colors[color_idx])}"
line += f" {hexdump}"
derefs = dereference_from(current)
if len(derefs) > 2:
line += f" [{LEFT_ARROW}{derefs[-1]}]"

if off == 0:
line += f" Chunk[{idx}]"
if off == ptrsize:
line += f" {value&~7 }" \
f"{'|NON_MAIN_ARENA' if value&4 else ''}" \
f"{'|IS_MMAPED' if value&2 else ''}" \
f"{'|PREV_INUSE' if value&1 else ''}"
# The first entry of the chunk gets added some extra info about the chunk itself
if current == base:
line += f" Chunk[{chunk_idx}], Flag={chunk.flags!s}"
chunk_idx += 1

# look in mapping
for x in known_ranges:
if value in x[0]:
line += f" (in {Color.redify(x[1])})"
# Populate information for known ranges, if any
for _range, _value in known_ranges:
if value in _range:
line += f" (in {Color.redify(_value)})"

# look in known values
# Populate information from other chunks/bins, if any
if value in known_values:
line += f"{RIGHT_ARROW}{Color.cyanify(known_values[value])}"

# All good, print it out
gef_print(line)

next_chunk = cur.get_next_chunk()
if next_chunk is None:
break

next_chunk_addr = Address(value=next_chunk.data_address)
if not next_chunk_addr.valid:
warn("next chunk probably corrupted")
break
color_idx = (color_idx + 1) % len(colors)

cur = next_chunk
idx += 1
return
2 changes: 1 addition & 1 deletion tests/commands/visualize_heap.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ def test_cmd_heap_view(self):

for i in range(4):
self.assertIn(
f"0x0000000000000000 ........ Chunk[{i}]", res)
f"0x0000000000000000 ........ Chunk[{i}]", res)