Skip to content

Commit

Permalink
pw_tokenizer: Concatenate duplicate sections in elf_reader.py
Browse files Browse the repository at this point in the history
When processing an archive file, multiple object files may have sections
with the same name. Have the dump_sections() and dump_section_contents()
commands concatenate sections with the same name across object files.

Bug: b/254925614
Change-Id: I03dcd9c801afb8c944229d53570427d921327243
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/116211
Reviewed-by: Anthony Stange <stange@google.com>
Reviewed-by: Armando Montanez <amontanez@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
  • Loading branch information
255 authored and CQ Bot Account committed Oct 28, 2022
1 parent bfb6462 commit 567b398
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 32 deletions.
61 changes: 36 additions & 25 deletions pw_tokenizer/py/elf_reader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,19 @@

class ElfReaderTest(unittest.TestCase):
"""Tests the elf_reader.Elf class."""
def setUp(self):
def setUp(self) -> None:
super().setUp()
self._elf_file = open(TEST_ELF_PATH, 'rb')
self._elf = elf_reader.Elf(self._elf_file)

def tearDown(self):
def tearDown(self) -> None:
super().tearDown()
self._elf_file.close()

def _section(self, name):
return next(self._elf.sections_with_name(name))
def _section(self, name) -> elf_reader.Elf.Section:
return next(iter(self._elf.sections_with_name(name)))

def test_readelf_comparison_using_the_readelf_binary(self):
def test_readelf_comparison_using_the_readelf_binary(self) -> None:
"""Compares elf_reader to readelf's output."""

parse_readelf_output = re.compile(r'\s+'
Expand Down Expand Up @@ -124,13 +124,13 @@ def test_readelf_comparison_using_the_readelf_binary(self):
self.assertEqual(section.offset, offset)
self.assertEqual(section.size, size)

def test_dump_single_section(self):
def test_dump_single_section(self) -> None:
self.assertEqual(self._elf.dump_section_contents(r'\.test_section_1'),
b'You cannot pass\0')
self.assertEqual(self._elf.dump_section_contents(r'\.test_section_2'),
b'\xef\xbe\xed\xfe')

def test_dump_multiple_sections(self):
def test_dump_multiple_sections(self) -> None:
if (self._section('.test_section_1').address <
self._section('.test_section_2').address):
contents = b'You cannot pass\0\xef\xbe\xed\xfe'
Expand All @@ -140,32 +140,32 @@ def test_dump_multiple_sections(self):
self.assertIn(self._elf.dump_section_contents(r'.test_section_\d'),
contents)

def test_read_values(self):
def test_read_values(self) -> None:
address = self._section('.test_section_1').address
self.assertEqual(self._elf.read_value(address), b'You cannot pass')

int32_address = self._section('.test_section_2').address
self.assertEqual(self._elf.read_value(int32_address, 4),
b'\xef\xbe\xed\xfe')

def test_read_string(self):
def test_read_string(self) -> None:
bytes_io = io.BytesIO(
b'This is a null-terminated string\0No terminator!')
self.assertEqual(elf_reader.read_c_string(bytes_io),
b'This is a null-terminated string')
self.assertEqual(elf_reader.read_c_string(bytes_io), b'No terminator!')
self.assertEqual(elf_reader.read_c_string(bytes_io), b'')

def test_compatible_file_for_elf(self):
def test_compatible_file_for_elf(self) -> None:
self.assertTrue(elf_reader.compatible_file(self._elf_file))
self.assertTrue(elf_reader.compatible_file(io.BytesIO(b'\x7fELF')))

def test_compatible_file_for_elf_start_at_offset(self):
def test_compatible_file_for_elf_start_at_offset(self) -> None:
self._elf_file.seek(13) # Seek ahead to get out of sync
self.assertTrue(elf_reader.compatible_file(self._elf_file))
self.assertEqual(13, self._elf_file.tell())

def test_compatible_file_for_invalid_elf(self):
def test_compatible_file_for_invalid_elf(self) -> None:
self.assertFalse(elf_reader.compatible_file(io.BytesIO(b'\x7fELVESF')))


Expand All @@ -181,7 +181,7 @@ def _archive_file(data: bytes) -> bytes:

class ArchiveTest(unittest.TestCase):
"""Tests reading from archive files."""
def setUp(self):
def setUp(self) -> None:
super().setUp()

with open(TEST_ELF_PATH, 'rb') as fd:
Expand All @@ -193,37 +193,37 @@ def setUp(self):
_archive_file(f) for f in self._archive_entries)
self._archive = io.BytesIO(self._archive_data)

def test_compatible_file_for_archive(self):
def test_compatible_file_for_archive(self) -> None:
self.assertTrue(elf_reader.compatible_file(io.BytesIO(b'!<arch>\n')))
self.assertTrue(elf_reader.compatible_file(self._archive))

def test_compatible_file_for_invalid_archive(self):
def test_compatible_file_for_invalid_archive(self) -> None:
self.assertFalse(elf_reader.compatible_file(io.BytesIO(b'!<arch>')))

def test_iterate_over_files(self):
def test_iterate_over_files(self) -> None:
for expected, size in zip(self._archive_entries,
elf_reader.files_in_archive(self._archive)):
self.assertEqual(expected, self._archive.read(size))

def test_iterate_over_empty_archive(self):
def test_iterate_over_empty_archive(self) -> None:
with self.assertRaises(StopIteration):
next(iter(elf_reader.files_in_archive(io.BytesIO(b'!<arch>\n'))))

def test_iterate_over_invalid_archive(self):
def test_iterate_over_invalid_archive(self) -> None:
with self.assertRaises(elf_reader.FileDecodeError):
for _ in elf_reader.files_in_archive(
io.BytesIO(b'!<arch>blah blahblah')):
pass

def test_extra_newline_after_entry_is_ignored(self):
def test_extra_newline_after_entry_is_ignored(self) -> None:
archive = io.BytesIO(elf_reader.ARCHIVE_MAGIC +
_archive_file(self._elf_data) + b'\n' +
_archive_file(self._elf_data))

for size in elf_reader.files_in_archive(archive):
self.assertEqual(self._elf_data, archive.read(size))

def test_two_extra_newlines_parsing_fails(self):
def test_two_extra_newlines_parsing_fails(self) -> None:
archive = io.BytesIO(elf_reader.ARCHIVE_MAGIC +
_archive_file(self._elf_data) + b'\n\n' +
_archive_file(self._elf_data))
Expand All @@ -232,7 +232,7 @@ def test_two_extra_newlines_parsing_fails(self):
for size in elf_reader.files_in_archive(archive):
self.assertEqual(self._elf_data, archive.read(size))

def test_iterate_over_archive_with_invalid_size(self):
def test_iterate_over_archive_with_invalid_size(self) -> None:
data = elf_reader.ARCHIVE_MAGIC + _archive_file(b'$' * 3210)
file = io.BytesIO(data)

Expand All @@ -246,21 +246,32 @@ def test_iterate_over_archive_with_invalid_size(self):
io.BytesIO(data.replace(b'3210', b'0x99'))):
pass

def test_elf_reader_dump_single_section(self):
def test_elf_reader_dump_single_section(self) -> None:
elf = elf_reader.Elf(self._archive)
self.assertEqual(elf.dump_section_contents(r'\.test_section_1'),
b'You cannot pass\0')
self.assertEqual(elf.dump_section_contents(r'\.test_section_2'),
b'\xef\xbe\xed\xfe')

def test_elf_reader_read_values(self):
def test_elf_reader_read_values(self) -> None:
elf = elf_reader.Elf(self._archive)
address = next(elf.sections_with_name('.test_section_1')).address
address = next(iter(elf.sections_with_name('.test_section_1'))).address
self.assertEqual(elf.read_value(address), b'You cannot pass')

int32_address = next(elf.sections_with_name('.test_section_2')).address
int32_address = next(iter(
elf.sections_with_name('.test_section_2'))).address
self.assertEqual(elf.read_value(int32_address, 4), b'\xef\xbe\xed\xfe')

def test_elf_reader_duplicate_sections_are_concatenated(self) -> None:
archive_data = elf_reader.ARCHIVE_MAGIC + b''.join(
_archive_file(f) for f in [self._elf_data, self._elf_data])
elf = elf_reader.Elf(io.BytesIO(archive_data))

self.assertEqual(elf.dump_section_contents(r'\.test_section_1'),
b'You cannot pass\0You cannot pass\0')
self.assertEqual(elf.dump_section_contents(r'\.test_section_2'),
b'\xef\xbe\xed\xfe' * 2)


if __name__ == '__main__':
unittest.main()
26 changes: 19 additions & 7 deletions pw_tokenizer/py/pw_tokenizer/elf_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
"""

import argparse
import collections
from pathlib import Path
import re
import struct
import sys
from typing import BinaryIO, Dict, Iterable, NamedTuple, Optional
from typing import Pattern, Tuple, Union
from typing import (BinaryIO, Iterable, Mapping, NamedTuple, Optional, Pattern,
Tuple, Union)

ARCHIVE_MAGIC = b'!<arch>\n'
ELF_MAGIC = b'\x7fELF'
Expand Down Expand Up @@ -195,7 +196,7 @@ def __init__(self, elf: BinaryIO):
else:
raise FileDecodeError('Unknown size {!r}'.format(size_field))

def _determine_integer_format(self) -> Dict[int, struct.Struct]:
def _determine_integer_format(self) -> Mapping[int, struct.Struct]:
"""Returns a dict of structs used for converting bytes to integers."""
endianness_byte = self._elf.read(1) # e_ident[EI_DATA] (endianness)
if endianness_byte == b'\x01':
Expand Down Expand Up @@ -305,20 +306,31 @@ def read_value(self,
return self._elf.read(size)

def dump_sections(self, name: Union[str,
Pattern[str]]) -> Dict[str, bytes]:
"""Dumps a binary string containing the sections matching the regex."""
Pattern[str]]) -> Mapping[str, bytes]:
"""Returns a mapping of section names to section contents.
If processing an archive with multiple object files, the contents of
sections with duplicate names are concatenated in the order they appear
in the archive.
"""
name_regex = re.compile(name)

sections: Dict[str, bytes] = {}
sections: Mapping[str, bytearray] = collections.defaultdict(bytearray)
for section in self.sections:
if name_regex.match(section.name):
self._elf.seek(section.file_offset + section.offset)
sections[section.name] = self._elf.read(section.size)
sections[section.name].extend(self._elf.read(section.size))

return sections

def dump_section_contents(
self, name: Union[str, Pattern[str]]) -> Optional[bytes]:
"""Dumps a binary string containing the sections matching the regex.
If processing an archive with multiple object files, the contents of
sections with duplicate names are concatenated in the order they appear
in the archive.
"""
sections = self.dump_sections(name)
return b''.join(sections.values()) if sections else None

Expand Down

0 comments on commit 567b398

Please sign in to comment.