Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

more mypy v1.1.1 fixes #1423

Merged
merged 15 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 80 additions & 47 deletions capa/features/extractors/elf.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,23 @@ def needed(self) -> Iterator[str]:

yield read_cstr(strtab, d_val)

@property
def symtab(self) -> Optional[Tuple[Shdr, Shdr]]:
"""
fetch the Shdr for the symtab and the associated strtab.
"""
SHT_SYMTAB = 0x2
for shdr in self.section_headers:
if shdr.type != SHT_SYMTAB:
continue

# the linked section contains strings referenced by the symtab structures.
strtab_shdr = self.parse_section_header(shdr.link)

return shdr, strtab_shdr

return None


@dataclass
class ABITag:
Expand Down Expand Up @@ -604,40 +621,63 @@ def abi_tag(self) -> Optional[ABITag]:
return ABITag(os, kmajor, kminor, kpatch)


@dataclass
class Symbol:
name_offset: int
value: int
size: int
info: int
other: int
shndx: int


class SymTab:
def __init__(self, endian: str, bitness: int, symtab_buf: bytes, symtab_entsize: int, symtab_sz: int, strtab_buf: bytes, strtab_sz: int) -> None:
self.symbols = []
self.symnum = int(symtab_sz / symtab_entsize)
self.entsize = symtab_entsize

self.strings = strtab_buf
self.strings_sz = strtab_sz
def __init__(
self,
endian: str,
bitness: int,
symtab: Shdr,
strtab: Shdr,
) -> None:
self.symbols: List[Symbol] = []

self.symtab = symtab
self.strtab = strtab

self._parse(endian, bitness, symtab_buf)
self._parse(endian, bitness, symtab.buf)

def _parse(self, endian: str, bitness: int, symtab_buf: bytes) -> None:
"""
return the symbol's information in
return the symbol's information in
the order specified by sys/elf32.h
"""
for i in range(self.symnum):
for i in range(int(len(self.symtab.buf) / self.symtab.entsize)):
if bitness == 32:
name, value, size, info, other, shndx = struct.unpack_from(endian+"IIIBBH", symtab_buf, i*self.entsize)
name_offset, value, size, info, other, shndx = struct.unpack_from(
endian + "IIIBBH", symtab_buf, i * self.symtab.entsize
)
elif bitness == 64:
name, info, other, shndx, value, size = struct.unpack_from(endian+"IBBBQQ", symtab_buf, i*self.entsize)
name_offset, info, other, shndx, value, size = struct.unpack_from(
endian + "IBBBQQ", symtab_buf, i * self.symtab.entsize
)

self.symbols.append((name, value, size, info, other, shndx))
self.symbols.append(Symbol(name_offset, value, size, info, other, shndx))

def fetch_str(self, offset) -> str:
def get_name(self, symbol: Symbol) -> str:
"""
fetch a symbol's name from symtab's
associated strings' section (SHT_STRTAB)
"""
for i in range(offset, self.strings_sz):
if self.strings[i] == 0:
return self.strings[offset:i].decode()
if not self.strtab:
raise ValueError("no strings found")

def get_symbols(self) -> Iterator[Tuple[int, int, int, int, int, int]]:
for i in range(symbol.name_offset, self.strtab.size):
if self.strtab.buf[i] == 0:
return self.strtab.buf[symbol.name_offset : i].decode("utf-8")

raise ValueError("symbol name not found")

def get_symbols(self) -> Iterator[Symbol]:
"""
return a tuple: (name, value, size, info, other, shndx)
for each symbol contained in the symbol table
Expand All @@ -646,11 +686,11 @@ def get_symbols(self) -> Iterator[Tuple[int, int, int, int, int, int]]:
yield symbol


def guess_os_from_osabi(elf) -> Optional[OS]:
def guess_os_from_osabi(elf: ELF) -> Optional[OS]:
return elf.ei_osabi


def guess_os_from_ph_notes(elf) -> Optional[OS]:
def guess_os_from_ph_notes(elf: ELF) -> Optional[OS]:
# search for PT_NOTE sections that specify an OS
# for example, on Linux there is a GNU section with minimum kernel version
PT_NOTE = 0x4
Expand Down Expand Up @@ -689,7 +729,7 @@ def guess_os_from_ph_notes(elf) -> Optional[OS]:
return None


def guess_os_from_sh_notes(elf) -> Optional[OS]:
def guess_os_from_sh_notes(elf: ELF) -> Optional[OS]:
# search for notes stored in sections that aren't visible in program headers.
# e.g. .note.Linux in Linux kernel modules.
SHT_NOTE = 0x7
Expand Down Expand Up @@ -722,7 +762,7 @@ def guess_os_from_sh_notes(elf) -> Optional[OS]:
return None


def guess_os_from_linker(elf) -> Optional[OS]:
def guess_os_from_linker(elf: ELF) -> Optional[OS]:
# search for recognizable dynamic linkers (interpreters)
# for example, on linux, we see file paths like: /lib64/ld-linux-x86-64.so.2
linker = elf.linker
Expand All @@ -732,7 +772,7 @@ def guess_os_from_linker(elf) -> Optional[OS]:
return None


def guess_os_from_abi_versions_needed(elf) -> Optional[OS]:
def guess_os_from_abi_versions_needed(elf: ELF) -> Optional[OS]:
# then lets look for GLIBC symbol versioning requirements.
# this will let us guess about linux/hurd in some cases.

Expand Down Expand Up @@ -763,7 +803,7 @@ def guess_os_from_abi_versions_needed(elf) -> Optional[OS]:
return None


def guess_os_from_needed_dependencies(elf) -> Optional[OS]:
def guess_os_from_needed_dependencies(elf: ELF) -> Optional[OS]:
for needed in elf.needed:
if needed.startswith("libmachuser.so"):
return OS.HURD
Expand All @@ -773,38 +813,31 @@ def guess_os_from_needed_dependencies(elf) -> Optional[OS]:
return None


def guess_os_from_symtab(elf) -> Optional[OS]:
SHT_SYMTAB = 0x2
SHT_STRTAB = 0x3
strtab_buf = symtab_buf = None

for shdr in elf.section_headers:
if shdr.type == SHT_STRTAB:
strtab_buf, strtab_sz= shdr.buf, shdr.size

elif shdr.type == SHT_SYMTAB:
symtab_buf, symtab_entsize, symtab_sz = shdr.buf, shdr.entsize, shdr.size

if None in (strtab_buf, symtab_buf):
Comment on lines -777 to -788
Copy link
Collaborator Author

@williballenthin williballenthin Apr 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yelhamer FYI, ELF files may have mutiple strtabs, so we need to read the correct one using the sh_link field of the symtab section header field, rather than just taking the last one found. i learned this from here: https://stackoverflow.com/a/69888949/87207

i noticed the bug because the ELF sample bf7a9c8bdfa6d47e01ad2b056264acc3fd90cf43fe0ed8deec93ab46b47d76cb has extra string tables that didn't line up with what the symtab entries expected.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note i've also done a little refactoring to make use of existing code to parse section headers and their associated data. no major changes to your logic.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I missed using sh_link to get strtab's index, that's pretty nifty!

I also really like the refactoring you did, specifically, adding symtab() to the ELF section, I think that's much better than my original solution (having it inside the guess_os_from_symtab() function).

Thank you!

def guess_os_from_symtab(elf: ELF) -> Optional[OS]:
shdrs = elf.symtab
if not shdrs:
# executable does not contain a symbol table
# or the symbol's names are stripped
return None

symtab = SymTab(
elf.endian, elf.bitness, symtab_buf, symtab_entsize, symtab_sz, strtab_buf, strtab_sz
)

symtab_shdr, strtab_shdr = shdrs
symtab = SymTab(elf.endian, elf.bitness, symtab_shdr, strtab_shdr)

keywords = {
OS.LINUX: ['linux', '/linux/',],
OS.LINUX: [
"linux",
"/linux/",
],
}

for name, *_ in symtab.get_symbols():
sym_name = symtab.fetch_str(name)

for symbol in symtab.get_symbols():
print(symbol)
sym_name = symtab.get_name(symbol)

for os, hints in keywords.items():
if any(map(lambda x: x in sym_name, hints)):
return os

return None


Expand Down Expand Up @@ -832,7 +865,7 @@ def detect_elf_os(f) -> str:
needed_dependencies_guess = guess_os_from_needed_dependencies(elf)
logger.debug("guess: needed dependencies: %s", needed_dependencies_guess)

symtab_guess = guess_os_from_symtab(elf)
symtab_guess = guess_os_from_symtab(elf)
logger.debug("guess: pertinent symbol name: %s", symtab_guess)

ret = None
Expand Down
2 changes: 1 addition & 1 deletion capa/features/extractors/ida/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def get_file_imports() -> Dict[int, Tuple[str, str, int]]:

# IDA uses section names for the library of ELF imports, like ".dynsym".
# These are not useful to us, we may need to expand this list over time
# TODO: exhaust this list, see #1419
# TODO: exhaust this list, see #1419
if library == ".dynsym":
library = ""

Expand Down
84 changes: 50 additions & 34 deletions capa/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1181,46 +1181,62 @@ def main(argv=None):
if not (args.verbose or args.vverbose or args.json):
logger.debug("file limitation short circuit, won't analyze fully.")
return E_FILE_LIMITATION

# TODO: #1411 use a real type, not a dict here.
meta: Dict[str, Any]
capabilities: MatchResults
counts: Dict[str, Any]

if format_ == FORMAT_RESULT:
# result document directly parses into meta, capabilities
result_doc = capa.render.result_document.ResultDocument.parse_file(args.sample)
meta, capabilities = result_doc.to_capa()
elif format_ == FORMAT_FREEZE:
with open(args.sample, "rb") as f:
extractor = capa.features.freeze.load(f.read())

else:
try:
if format_ == FORMAT_PE:
sig_paths = get_signatures(args.signatures)
else:
sig_paths = []
logger.debug("skipping library code matching: only have native PE signatures")
except IOError as e:
logger.error("%s", str(e))
return E_INVALID_SIG
# all other formats we must create an extractor
# and use that to extract meta and capabilities

should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
if format_ == FORMAT_FREEZE:
# freeze format deserializes directly into an extractor
with open(args.sample, "rb") as f:
extractor = capa.features.freeze.load(f.read())
else:
# all other formats we must create an extractor,
# such as viv, binary ninja, etc. workspaces
# and use those for extracting.

try:
if format_ == FORMAT_PE:
sig_paths = get_signatures(args.signatures)
else:
sig_paths = []
logger.debug("skipping library code matching: only have native PE signatures")
except IOError as e:
logger.error("%s", str(e))
return E_INVALID_SIG

should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)

try:
extractor = get_extractor(
args.sample,
format_,
args.os,
args.backend,
sig_paths,
should_save_workspace,
disable_progress=args.quiet,
)
except UnsupportedFormatError:
log_unsupported_format_error()
return E_INVALID_FILE_TYPE
except UnsupportedArchError:
log_unsupported_arch_error()
return E_INVALID_FILE_ARCH
except UnsupportedOSError:
log_unsupported_os_error()
return E_INVALID_FILE_OS

try:
extractor = get_extractor(
args.sample,
format_,
args.os,
args.backend,
sig_paths,
should_save_workspace,
disable_progress=args.quiet,
)
except UnsupportedFormatError:
log_unsupported_format_error()
return E_INVALID_FILE_TYPE
except UnsupportedArchError:
log_unsupported_arch_error()
return E_INVALID_FILE_ARCH
except UnsupportedOSError:
log_unsupported_os_error()
return E_INVALID_FILE_OS

if format_ != FORMAT_RESULT:
meta = collect_metadata(argv, args.sample, args.format, args.os, args.rules, extractor)

capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet)
Expand Down
Loading