Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix heap chunks cmd for multiple heaps per arena #716

Merged
merged 11 commits into from
Sep 20, 2021
24 changes: 14 additions & 10 deletions docs/commands/heap.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,28 @@ gef➤ heap <sub_commands>

### `heap chunks` command ###

Displays all the chunks from the `heap` section.
Displays all the chunks from the `heap` section of the current arena.

```
gef➤ heap chunks
```

In some cases, the allocation will start immediately from start of the page. If
so, specify the base address of the first chunk as follows:
![heap-chunks](https://i.imgur.com/y90SfKH.png)

To select from which arena to display chunks either use the `heap set-arena`
command or provide the base address of the other arena like this:

```
gef➤ heap chunks [address]
gef➤ heap chunks [arena_address]
```

![heap-chunks](https://i.imgur.com/2Ew2fA6.png)
![heap-chunks-arena](https://i.imgur.com/y1fybRx.png)
theguy147 marked this conversation as resolved.
Show resolved Hide resolved

Because usually the heap chunks are aligned to a certain number of bytes in
memory GEF automatically re-aligns the chunks data start addresses to match
Glibc's behavior. To be able to view unaligned chunks as well, you can
disable this with the `--allow-unaligned` flag.
Glibc's behavior. To be able to view unaligned chunks as well, you can disable
theguy147 marked this conversation as resolved.
Show resolved Hide resolved
this with the `--allow-unaligned` flag. Note that this might result in
incorrect output.

### `heap chunk` command ###

Expand All @@ -41,12 +44,13 @@ information related to a specific chunk:
gef➤ heap chunk [address]
```

![heap-chunk](https://i.imgur.com/SAWNptW.png)
![heap-chunk](https://i.imgur.com/WXpHR58.png)

Because usually the heap chunks are aligned to a certain number of bytes in
memory GEF automatically re-aligns the chunks data start addresses to match
Glibc's behavior. To be able to view unaligned chunks as well, you can
disable this with the `--allow-unaligned` flag.
Glibc's behavior. To be able to view unaligned chunks as well, you can disable
this with the `--allow-unaligned` flag. Note that this might result in
incorrect output.

### `heap arenas` command ###

Expand Down
183 changes: 140 additions & 43 deletions gef.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,8 @@ def __init__(self, addr):
try:
self.__addr = to_unsigned_long(gdb.parse_and_eval("&{}".format(addr)))
except gdb.error:
warn("Could not parse address '&{}' when searching malloc_state struct, "
"using '&main_arena' instead".format(addr))
self.__addr = search_for_main_arena()

self.num_fastbins = 10
Expand Down Expand Up @@ -741,6 +743,62 @@ def __getitem__(self, item):
return getattr(self, item)


class GlibcHeapInfo:
"""Glibc heap_info struct
See https://github.com/bminor/glibc/blob/glibc-2.34/malloc/arena.c#L64"""

def __init__(self, addr):
self.__addr = addr if type(addr) is int else parse_address(addr)
self.size_t = cached_lookup_type("size_t")
if not self.size_t:
ptr_type = "unsigned long" if current_arch.ptrsize == 8 else "unsigned int"
self.size_t = cached_lookup_type(ptr_type)

@property
def addr(self):
return self.__addr

@property
def ar_ptr_addr(self):
return self.addr

@property
def prev_addr(self):
return self.ar_ptr_addr + current_arch.ptrsize

@property
def size_addr(self):
return self.prev_addr + current_arch.ptrsize

@property
def mprotect_size_addr(self):
return self.size_addr + self.size_t.sizeof

@property
def ar_ptr(self):
return self._get_size_t_pointer(self.ar_ptr_addr)

@property
def prev(self):
return self._get_size_t_pointer(self.prev_addr)

@property
def size(self):
return self._get_size_t(self.size_addr)

@property
def mprotect_size(self):
return self._get_size_t(self.mprotect_size_addr)

# helper methods
def _get_size_t_pointer(self, addr):
size_t_pointer = self.size_t.pointer()
return dereference(addr).cast(size_t_pointer)

def _get_size_t(self, addr):
return dereference(addr).cast(self.size_t)


class GlibcArena:
"""Glibc arena class
Ref: https://github.com/sploitfun/lsploits/blob/master/glibc/malloc/malloc.c#L1671"""
Expand Down Expand Up @@ -795,17 +853,43 @@ def get_next(self):
return None
return GlibcArena("*{:#x} ".format(addr_next))

def heap_addr(self):
main_arena_addr = to_unsigned_long(gdb.parse_and_eval("&main_arena"))
if int(self) == main_arena_addr:
def is_main_arena(self):
return int(self) == parse_address("&main_arena")

def heap_addr(self, allow_unaligned=False):
if self.is_main_arena():
heap_section = HeapBaseFunction.heap_base()
if not heap_section:
err("Heap not initialized")
return None
return heap_section
_addr = int(self) + self.struct_size
if allow_unaligned:
return _addr
return malloc_align_address(_addr)

def get_heap_info_list(self):
if self.is_main_arena():
return None
heap_addr = self.get_heap_for_ptr(self.top)
heap_infos = [GlibcHeapInfo(heap_addr)]
while heap_infos[-1].prev != 0:
prev = int(heap_infos[-1].prev)
heap_info = GlibcHeapInfo(prev)
heap_infos.append(heap_info)
return heap_infos[::-1]

@staticmethod
def get_heap_for_ptr(ptr):
"""Find the corresponding heap for a given pointer (int).
See https://github.com/bminor/glibc/blob/glibc-2.34/malloc/arena.c#L129"""
if is_32bit():
default_mmap_threshold_max = 512 * 1024
else: # 64bit
default_mmap_threshold_max = 4 * 1024 * 1024 * cached_lookup_type("long").sizeof
heap_max_size = 2 * default_mmap_threshold_max
return ptr & ~(heap_max_size - 1)

def __str__(self):
fmt = "Arena (base={:#x}, top={:#x}, last_remainder={:#x}, next={:#x}, next_free={:#x}, system_mem={:#x})"
return fmt.format(self.__addr, self.top, self.last_remainder, self.n, self.nfree, self.sysmem)
Expand Down Expand Up @@ -851,9 +935,12 @@ def usable_size(self):
def get_prev_chunk_size(self):
return read_int_from_memory(self.prev_size_addr)

def get_next_chunk(self):
addr = self.data_address + self.get_chunk_size()
return GlibcChunk(addr)
def get_next_chunk(self, allow_unaligned=False):
addr = self.get_next_chunk_addr()
return GlibcChunk(addr, allow_unaligned=allow_unaligned)

def get_next_chunk_addr(self):
return self.data_address + self.get_chunk_size()

# if free-ed functions
def get_fwd_ptr(self, sll):
Expand Down Expand Up @@ -1001,15 +1088,13 @@ def get_libc_version():
return 0, 0


def get_main_arena():
@lru_cache()
def get_glibc_arena(addr=None):
Grazfather marked this conversation as resolved.
Show resolved Hide resolved
try:
return GlibcArena(__gef_current_arena__)
addr = "*{}".format(addr) if addr else __gef_current_arena__
return GlibcArena(addr)
except Exception as e:
err(
"Failed to get the main arena, heap commands may not work properly: {}".format(
e
)
)
err("Failed to get the glibc arena, heap commands may not work properly: {}".format(e))
return None


Expand Down Expand Up @@ -6682,7 +6767,7 @@ def do_invoke(self, *args, **kwargs):
self.usage()
return

if get_main_arena() is None:
if get_glibc_arena() is None:
return

addr = to_unsigned_long(gdb.parse_and_eval(args.address))
Expand All @@ -6693,61 +6778,73 @@ def do_invoke(self, *args, **kwargs):

@register_command
class GlibcHeapChunksCommand(GenericCommand):
"""Display information all chunks from main_arena heap. If a location is
passed, it must correspond to the base address of the first chunk."""
"""Display all heap chunks for the current arena. As an optional argument
the base address of a different arena can be passed"""

_cmdline_ = "heap chunks"
_syntax_ = "{0} [-h] [--allow-unaligned] [address]".format(_cmdline_)
_syntax_ = "{0} [-h] [--allow-unaligned] [arena_address]".format(_cmdline_)
_example_ = "\n{0}\n{0} 0x555555775000".format(_cmdline_)

def __init__(self):
super().__init__(complete=gdb.COMPLETE_LOCATION)
self.add_setting("peek_nb_byte", 16, "Hexdump N first byte(s) inside the chunk data (0 to disable)")
return

@parse_arguments({"address": ""}, {"--allow-unaligned": True})
theguy147 marked this conversation as resolved.
Show resolved Hide resolved
@parse_arguments({"arena_address": ""}, {"--allow-unaligned": True})
@only_if_gdb_running
def do_invoke(self, *args, **kwargs):
args = kwargs["arguments"]

arena = get_main_arena()
arena = get_glibc_arena(addr=args.arena_address)
if arena is None:
err("No valid arena")
return
self.dump_chunks_arena(arena, allow_unaligned=args.allow_unaligned)

if not args.address:
heap_addr = arena.heap_addr()
if heap_addr is None:
return
def dump_chunks_arena(self, arena, print_arena=False, allow_unaligned=False):
top_chunk_addr = arena.top
heap_addr = arena.heap_addr(allow_unaligned=allow_unaligned)
if heap_addr is None:
err("Could not find heap for arena")
return
if print_arena:
gef_print(str(arena))
if arena.is_main_arena():
self.dump_chunks_heap(heap_addr, top=top_chunk_addr, allow_unaligned=allow_unaligned)
else:
heap_addr = parse_address(args.address)

heap_info_structs = arena.get_heap_info_list()
first_heap_info = heap_info_structs.pop(0)
heap_info_t_size = int(arena) - first_heap_info.addr
until = first_heap_info.addr + first_heap_info.size
self.dump_chunks_heap(heap_addr, until=until, top=top_chunk_addr, allow_unaligned=allow_unaligned)
for heap_info in heap_info_structs:
start = heap_info.addr + heap_info_t_size
until = heap_info.addr + heap_info.size
self.dump_chunks_heap(start, until=until, top=top_chunk_addr, allow_unaligned=allow_unaligned)
return

def dump_chunks_heap(self, start, until=None, top=None, allow_unaligned=False):
nb = self.get_setting("peek_nb_byte")
current_chunk = GlibcChunk(heap_addr, from_base=True, allow_unaligned=args.allow_unaligned)
current_chunk = GlibcChunk(start, from_base=True, allow_unaligned=allow_unaligned)
while True:
if current_chunk.base_address == arena.top:
if current_chunk.base_address == top:
gef_print("{} {} {}".format(str(current_chunk), LEFT_ARROW, Color.greenify("top chunk")))
break

if current_chunk.base_address > arena.top:
break

if current_chunk.size == 0:
# EOF
break

line = str(current_chunk)
if nb:
line += "\n [" + hexdump(read_memory(current_chunk.data_address, nb), nb, base=current_chunk.data_address) + "]"
line += "\n [{}]".format(hexdump(read_memory(current_chunk.data_address, nb), nb, base=current_chunk.data_address))
gef_print(line)

next_chunk = current_chunk.get_next_chunk()
if next_chunk is None:
next_chunk_addr = current_chunk.get_next_chunk_addr()
if until and next_chunk_addr >= until:
break
if not Address(value=next_chunk_addr).valid:
break

next_chunk_addr = Address(value=next_chunk.data_address)
if not next_chunk_addr.valid:
# corrupted
next_chunk = current_chunk.get_next_chunk()
if next_chunk is None:
break

current_chunk = next_chunk
Expand Down Expand Up @@ -6986,7 +7083,7 @@ def fastbin_index(sz):
MAX_FAST_SIZE = 80 * SIZE_SZ // 4
NFASTBINS = fastbin_index(MAX_FAST_SIZE) - 1

arena = GlibcArena("*{:s}".format(argv[0])) if len(argv) == 1 else get_main_arena()
arena = GlibcArena("*{:s}".format(argv[0])) if len(argv) == 1 else get_glibc_arena()

if arena is None:
err("Invalid Glibc arena")
Expand Down Expand Up @@ -7039,7 +7136,7 @@ def __init__(self):

@only_if_gdb_running
def do_invoke(self, argv):
if get_main_arena() is None:
if get_glibc_arena() is None:
err("Invalid Glibc arena")
return

Expand All @@ -7063,7 +7160,7 @@ def __init__(self):

@only_if_gdb_running
def do_invoke(self, argv):
if get_main_arena() is None:
if get_glibc_arena() is None:
err("Invalid Glibc arena")
return

Expand Down Expand Up @@ -7092,7 +7189,7 @@ def __init__(self):

@only_if_gdb_running
def do_invoke(self, argv):
if get_main_arena() is None:
if get_glibc_arena() is None:
err("Invalid Glibc arena")
return

Expand Down
10 changes: 9 additions & 1 deletion tests/runtests.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ def test_cmd_heap_chunks(self):
self.assertNoException(res)
self.assertIn("Chunk(addr=", res)
self.assertIn("top chunk", res)

cmd = "python gdb.execute('heap chunks {}'.format(get_glibc_arena().next))"
target = "/tmp/heap-non-main.out"
res = gdb_run_silent_cmd(cmd, target=target)
self.assertNoException(res)
self.assertNotIn("using '&main_arena' instead", res)
self.assertIn("Chunk(addr=", res)
self.assertIn("top chunk", res)
return

def test_cmd_heap_bins_fast(self):
Expand All @@ -236,7 +244,7 @@ def test_cmd_heap_bins_fast(self):
return

def test_cmd_heap_bins_non_main(self):
cmd = "python gdb.execute('heap bins fast {}'.format(get_main_arena().next))"
cmd = "python gdb.execute('heap bins fast {}'.format(get_glibc_arena().next))"
before = ["set environment GLIBC_TUNABLES glibc.malloc.tcache_count=0"]
target = "/tmp/heap-non-main.out"
res = gdb_run_silent_cmd(cmd, before=before, target=target)
Expand Down