diff --git a/src/classy_blocks/grading/autograding/grader.py b/src/classy_blocks/grading/autograding/grader.py index e1ede0bd..f56fdcbd 100644 --- a/src/classy_blocks/grading/autograding/grader.py +++ b/src/classy_blocks/grading/autograding/grader.py @@ -1,12 +1,10 @@ -from typing import get_args - -from classy_blocks.grading.autograding.params import ( - ChopParams, - FixedCountParams, - InflationGraderParams, - SimpleGraderParams, - SmoothGraderParams, -) +from typing import Tuple, get_args + +from classy_blocks.grading.autograding.params.base import ChopParams +from classy_blocks.grading.autograding.params.fixed import FixedCountGraderParams +from classy_blocks.grading.autograding.params.inflation import InflationGraderParams +from classy_blocks.grading.autograding.params.simple import SimpleGraderParams +from classy_blocks.grading.autograding.params.smooth import SmoothGraderParams from classy_blocks.grading.autograding.probe import Probe from classy_blocks.grading.autograding.row import Row from classy_blocks.grading.chop import Chop @@ -38,28 +36,33 @@ def __init__(self, mesh: Mesh, params: ChopParams): self.mesh.assemble() self.probe = Probe(self.mesh) + def check_at_wall(self, row: Row) -> Tuple[bool, bool]: + """Returns True if any block on given row has a wall patch + (at start and/or end, respectively).""" + start = False + end = False + + # Check if there are blocks at the wall; + for entry in row.entries: + for wire in entry.wires: + # TODO: cache WireInfo + info = self.probe.get_wire_info(wire, entry.block) + if info.starts_at_wall: + start = True + if info.ends_at_wall: + end = True + + return start, end + def set_counts(self, row: Row, take: ChopTakeType) -> None: if row.count > 0: # stuff, pre-defined by the user return - # at_wall: List[Entry] = [] - - # Check if there are blocks at the wall; - # for entry in row.entries: - # for wire in entry.wires: - # # TODO: cache WireInfo - # info = self.probe.get_wire_info(wire, entry.block) - # if info.starts_at_wall or info.ends_at_wall: - # at_wall.append(entry) - length = row.get_length(take) + start_at_wall, end_at_wall = self.check_at_wall(row) - # if len(at_wall) > 0: - # # find out whether one or two sides are to be counted - # pass - - row.count = self.params.get_count(length) + row.count = self.params.get_count(length, start_at_wall, end_at_wall) def grade_squeezed(self, row: Row) -> None: for entry in row.entries: @@ -118,7 +121,7 @@ class FixedCountGrader(GraderBase): useful during mesh building and some tutorial cases""" def __init__(self, mesh: Mesh, count: int = 8): - super().__init__(mesh, FixedCountParams(count)) + super().__init__(mesh, FixedCountGraderParams(count)) class SimpleGrader(GraderBase): diff --git a/src/classy_blocks/grading/autograding/params.py b/src/classy_blocks/grading/autograding/params.py deleted file mode 100644 index 9c604892..00000000 --- a/src/classy_blocks/grading/autograding/params.py +++ /dev/null @@ -1,209 +0,0 @@ -import abc -import dataclasses -import warnings -from typing import List, Optional, Tuple - -import scipy.optimize - -import classy_blocks.grading.relations as gr -from classy_blocks.grading.autograding.probe import WireInfo -from classy_blocks.grading.chop import Chop - -CellSizeType = Optional[float] - - -def sum_length(start_size: float, count: int, c2c_expansion: float) -> float: - """Returns absolute length of the chop""" - length = 0.0 - size = start_size - - for _ in range(count): - length += size - size *= c2c_expansion - - return length - - -class ChopParams(abc.ABC): - @abc.abstractmethod - def get_count(self, length: float) -> int: - """Calculates count based on given length and position""" - - @abc.abstractmethod - def is_squeezed(self, count: int, info: WireInfo) -> bool: - """Returns True if cells have to be 'squished' together (thinner than prescribed in params)""" - - @abc.abstractmethod - def get_chops(self, count: int, info: WireInfo) -> List[Chop]: - """Fixes cell count but modifies chops so that proper cell sizing will be obeyed""" - - -@dataclasses.dataclass -class FixedCountParams(ChopParams): - count: int = 8 - - def get_count(self, _length): - return self.count - - def is_squeezed(self, _count, _info) -> bool: - return True # grade everything in first pass - - def get_chops(self, count, _info) -> List[Chop]: - return [Chop(count=count)] - - -@dataclasses.dataclass -class SimpleGraderParams(ChopParams): - cell_size: float - - def get_count(self, length: float): - return int(length / self.cell_size) - - def is_squeezed(self, _count, _info) -> bool: - return True - - def get_chops(self, count, _info: WireInfo): - return [Chop(count=count)] - - -@dataclasses.dataclass -class SmoothGraderParams(ChopParams): - cell_size: float - - def get_count(self, length: float): - # the first chop defines the count; - count = int(length / self.cell_size) - # it must be divisible by 2 - if count % 2 != 0: - count += 1 - - return count - - def is_squeezed(self, count, info) -> bool: - return info.length <= self.cell_size * count - - def define_sizes(self, size_before: CellSizeType, size_after: CellSizeType) -> Tuple[float, float]: - """Defines start and end cell size. - size_before and size_after are taken from preceding/following wires; - when a size is None, this is the last/first wire.""" - if size_before == 0 or size_after == 0: - # until all counts/sizes are defined - # (the first pass with uniform grading), - # there's no point in doing anything - raise RuntimeError("Undefined grading encountered!") - - if size_before is None: - size_before = self.cell_size - - if size_after is None: - size_after = self.cell_size - - return size_before, size_after - - def get_chops(self, count, info): - halfcount = count // 2 - - size_before, size_after = self.define_sizes(info.size_before, info.size_after) - - # choose length ratio so that cells at the middle of blocks - # (between the two chops) have the same size - def fobj(lratio): - chop_1 = Chop(length_ratio=lratio, count=halfcount, start_size=size_before) - data_1 = chop_1.calculate(info.length) - - chop_2 = Chop(length_ratio=1 - lratio, count=halfcount, end_size=size_after) - data_2 = chop_2.calculate(info.length) - - ratio = (data_1.end_size - data_2.start_size) ** 2 - - return ratio, [chop_1, chop_2] - - # it's not terribly important to minimize until the last dx - tol = min(size_before, size_after, self.cell_size) * 0.1 - results = scipy.optimize.minimize_scalar(lambda r: fobj(r)[0], bounds=[0.1, 0.9], options={"xatol": tol}) - if not results.success: # type:ignore - warnings.warn("Could not determine optimal grading", stacklevel=1) - - return fobj(results.x)[1] # type:ignore - - -# INVALID! Next on list -@dataclasses.dataclass -class InflationGraderParams(ChopParams): - """See description of InflationGrader""" - - first_cell_size: float - bulk_cell_size: float - - c2c_expansion: float = 1.2 - bl_thickness_factor: int = 30 - buffer_expansion: float = 2 - - def is_squeezed(self, _count: int, _info: WireInfo) -> bool: - return False - - @property - def inflation_layer_thickness(self) -> float: - return self.first_cell_size * self.bl_thickness_factor - - def _get_inflation_chop(self, length: float) -> Tuple[Chop, float]: - """Creates a Chop for the inflation layer; returns size of the last cell""" - near_wall = Chop( - length_ratio=self.inflation_layer_thickness / length, - start_size=self.first_cell_size, - c2c_expansion=self.c2c_expansion, - ) - data = near_wall.calculate(length) - return (near_wall, data.end_size) - - def _get_buffer_chop(self, start_size: float) -> Tuple[Chop, float]: - """Creates a chop between the last cell of inflation layer - and the first cell of bulk flow; returns length of the chop""" - buffer_count = gr.get_count__total_expansion__c2c_expansion( - 1, self.bulk_cell_size / start_size, self.buffer_expansion - ) - buffer_size = sum_length(start_size, buffer_count, self.buffer_expansion) - buffer = Chop(start_size=start_size, c2c_expansion=self.buffer_expansion, count=buffer_count) - - return buffer, buffer_size - - def _get_bulk_chop(self, remaining_size: float) -> Chop: - count = max(1, int(remaining_size / self.bulk_cell_size)) - return Chop(count=count) - - def get_count(self, length: float): - chops: List[Chop] = [] - - if length < self.inflation_layer_thickness: - warnings.warn("Inflation layer is thicker than block size!", stacklevel=1) - - # near-wall sizes: - near_wall, last_bl_size = self._get_inflation_chop(length) - remaining_length = length - self.inflation_layer_thickness - chops.append(near_wall) - - if remaining_length <= 0: - warnings.warn("Stopping chops at inflation layer (not enough space)!", stacklevel=1) - # return chops - return 0 - - # buffer - buffer, buffer_size = self._get_buffer_chop(last_bl_size) - buffer.length_ratio = buffer_size / length - chops.append(buffer) - if buffer_size >= remaining_length: - warnings.warn("Stopping chops at buffer layer (not enough space)!", stacklevel=1) - # return chops - return 1 - - # bulk - remaining_length = remaining_length - buffer_size - bulk = self._get_bulk_chop(remaining_length) - bulk.length_ratio = remaining_length / length - chops.append(bulk) - - # return chops - return 1 - - def get_chops(self, count, length, size_before=0, size_after=0) -> List[Chop]: - raise NotImplementedError("TODO!") diff --git a/src/classy_blocks/grading/autograding/params/__init__.py b/src/classy_blocks/grading/autograding/params/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/classy_blocks/grading/autograding/params/base.py b/src/classy_blocks/grading/autograding/params/base.py new file mode 100644 index 00000000..54eefacb --- /dev/null +++ b/src/classy_blocks/grading/autograding/params/base.py @@ -0,0 +1,33 @@ +import abc +from typing import List, Optional + +from classy_blocks.grading.autograding.probe import WireInfo +from classy_blocks.grading.chop import Chop + +CellSizeType = Optional[float] + + +def sum_length(start_size: float, count: int, c2c_expansion: float) -> float: + """Returns absolute length of the chop""" + length = 0.0 + size = start_size + + for _ in range(count): + length += size + size *= c2c_expansion + + return length + + +class ChopParams(abc.ABC): + @abc.abstractmethod + def get_count(self, length: float, start_at_wall: bool, end_at_wall: bool) -> int: + """Calculates count based on given length and position""" + + @abc.abstractmethod + def is_squeezed(self, count: int, info: WireInfo) -> bool: + """Returns True if cells have to be 'squished' together (thinner than prescribed in params)""" + + @abc.abstractmethod + def get_chops(self, count: int, info: WireInfo) -> List[Chop]: + """Fixes cell count but modifies chops so that proper cell sizing will be obeyed""" diff --git a/src/classy_blocks/grading/autograding/params/fixed.py b/src/classy_blocks/grading/autograding/params/fixed.py new file mode 100644 index 00000000..11a6cd74 --- /dev/null +++ b/src/classy_blocks/grading/autograding/params/fixed.py @@ -0,0 +1,19 @@ +import dataclasses +from typing import List + +from classy_blocks.grading.autograding.params.base import ChopParams +from classy_blocks.grading.chop import Chop + + +@dataclasses.dataclass +class FixedCountGraderParams(ChopParams): + count: int = 8 + + def get_count(self, _length, _start_at_wall, _end_at_wall): + return self.count + + def is_squeezed(self, _count, _info) -> bool: + return True # grade everything in first pass + + def get_chops(self, count, _info) -> List[Chop]: + return [Chop(count=count)] diff --git a/src/classy_blocks/grading/autograding/params/inflation.py b/src/classy_blocks/grading/autograding/params/inflation.py new file mode 100644 index 00000000..615b6d0e --- /dev/null +++ b/src/classy_blocks/grading/autograding/params/inflation.py @@ -0,0 +1,225 @@ +import abc +from typing import List + +import classy_blocks.grading.relations as gr +from classy_blocks.grading.autograding.params.base import sum_length +from classy_blocks.grading.autograding.params.smooth import SmoothGraderParams +from classy_blocks.grading.autograding.probe import WireInfo +from classy_blocks.grading.chop import Chop + + +class Layer(abc.ABC): + """A common interface to all layers of a grading (inflation, buffer, bulk)""" + + # Defines one chop and tools to handle it + start_size: float + c2c_expansion: float + + def __init__(self, max_length: float): + self.max_length = max_length + + @property + @abc.abstractmethod + def length(self) -> float: + """Returns overall length of this layer""" + + @property + def count(self) -> int: + """Returns cell count in this layer""" + length = min(self.length, self.max_length) + return gr.get_count__start_size__c2c_expansion(length, self.start_size, self.c2c_expansion) + + @property + def end_size(self) -> float: + """Size of the last cell in this layer""" + return self.start_size * self.c2c_expansion**self.count + + # @abc.abstractmethod + # def get_chop(self, invert: bool) -> Chop: + # """Prepare a Chop for grader params""" + + @property + def is_final(self) -> bool: + """Returns True if this layer is the last (no more space for additional ones)""" + return self.length >= self.max_length + + def __repr__(self): + return f"{self.length}-{self.count}" + + +class InflationLayer(Layer): + def __init__(self, wall_size: float, c2c_expansion: float, thickness_factor: int, max_length: float): + self.start_size = wall_size + self.c2c_expansion = c2c_expansion + self.thickness_factor = thickness_factor + + super().__init__(max_length) + + @property + def length(self): + return min(self.max_length, self.start_size * self.thickness_factor) + + +class BufferLayer(Layer): + def __init__(self, start_size: float, c2c_expansion: float, bulk_size: float, max_length: float): + self.start_size = start_size # *c2c_expansion (the first cell is already bigger?) + self.c2c_expansion = c2c_expansion + self.bulk_size = bulk_size + + self.total_expansion = self.bulk_size / self.start_size + + # manually sum up those few cells that lead from start to bulk size + count = 1 + size = self.start_size + length = 0.0 + + while size <= self.bulk_size: + if length > max_length: + break + + length += size + count += 1 + size *= self.c2c_expansion + + self._count = count + self._last_size = size + + super().__init__(max_length) + + @property + def count(self): + return self._count + + @property + def length(self): + return sum_length(self.start_size, self.count, self.c2c_expansion) + + @property + def last_size(self): + return self._last_size + + +class BulkLayer(Layer): + def __init__(self, cell_size: float, remaning_length: float): + self.start_size = cell_size + + self.cell_size = cell_size + self.c2c_expansion = 1 + + super().__init__(remaning_length) + + @property + def length(self): + return self.max_length + + @property + def last_size(self): + return self.cell_size + + +class LayerStack: + """A collection of one, two or three layers (chops) for InflationGrader""" + + def __init__(self, length: float): + self.length = length + self.layers: List[Layer] = [] + + @property + def count(self) -> int: + return sum(layer.count for layer in self.layers) + + @property + def remaining_length(self) -> float: + return self.length - sum(layer.length for layer in self.layers) + + def add(self, layer: Layer) -> None: + """Returns True when the added layer was the final one""" + self.layers.append(layer) + + @property + def is_done(self) -> bool: + """Returns True if no more layers need to be added""" + if len(self.layers) == 3: + # nothing more to be added? + return True + + return self.remaining_length <= 0 + + +class InflationGraderParams(SmoothGraderParams): + """See description of InflationGrader""" + + # TODO: refactor to a reasonable number of 'if' clauses + + def __init__( + self, + first_cell_size: float, + bulk_cell_size: float, + c2c_expansion: float = 1.2, + bl_thickness_factor: int = 30, + buffer_expansion: float = 2, + ): + self.first_cell_size = first_cell_size + self.bulk_cell_size = bulk_cell_size + self.c2c_expansion = c2c_expansion + self.bl_thickness_factor = bl_thickness_factor + self.buffer_expansion = buffer_expansion + + self.cell_size = self.bulk_cell_size + + def get_inflation_layer(self, max_length: float) -> InflationLayer: + return InflationLayer(self.first_cell_size, self.c2c_expansion, self.bl_thickness_factor, max_length) + + def get_buffer_layer(self, start_size, max_length: float) -> BufferLayer: + return BufferLayer(start_size, self.buffer_expansion, self.bulk_cell_size, max_length) + + def get_bulk_layer(self, remaining_length: float) -> BulkLayer: + return BulkLayer(self.bulk_cell_size, remaining_length) + + def get_stack(self, length: float) -> LayerStack: + stack = LayerStack(length) + + inflation = self.get_inflation_layer(length) + stack.add(inflation) + if stack.is_done: + return stack + + buffer = self.get_buffer_layer(stack.layers[0].end_size, stack.remaining_length) + stack.add(buffer) + if stack.is_done: + return stack + + bulk = self.get_bulk_layer(stack.remaining_length) + stack.add(bulk) + + return stack + + def get_count(self, length: float, starts_at_wall: bool, ends_at_wall: bool): + if not (starts_at_wall or ends_at_wall): + return super().get_count(length, False, False) + + if starts_at_wall and ends_at_wall: + # this will produce 1 extra chop (the middle one could be + # common to both bulk chops) but it doesn't matter at this moment + stack = self.get_stack(length / 2) + return 2 * stack.count + + stack = self.get_stack(length) + return stack.count + + def is_squeezed(self, count: int, info: WireInfo) -> bool: + if not (info.starts_at_wall or info.ends_at_wall): + return super().is_squeezed(count, info) + + stack = self.get_stack(info.length) + + if len(stack.layers) == 3: + return stack.count < count + + return True + + def get_chops(self, count, info: WireInfo) -> List[Chop]: + if not (info.starts_at_wall or info.ends_at_wall): + return super().get_chops(count, info) + + raise NotImplementedError diff --git a/src/classy_blocks/grading/autograding/params/simple.py b/src/classy_blocks/grading/autograding/params/simple.py new file mode 100644 index 00000000..7e9de233 --- /dev/null +++ b/src/classy_blocks/grading/autograding/params/simple.py @@ -0,0 +1,19 @@ +import dataclasses + +from classy_blocks.grading.autograding.params.base import ChopParams +from classy_blocks.grading.autograding.probe import WireInfo +from classy_blocks.grading.chop import Chop + + +@dataclasses.dataclass +class SimpleGraderParams(ChopParams): + cell_size: float + + def get_count(self, length: float, _start_at_wall, _end_at_wall): + return int(length / self.cell_size) + + def is_squeezed(self, _count, _info) -> bool: + return True + + def get_chops(self, count, _info: WireInfo): + return [Chop(count=count)] diff --git a/src/classy_blocks/grading/autograding/params/smooth.py b/src/classy_blocks/grading/autograding/params/smooth.py new file mode 100644 index 00000000..c0f3da03 --- /dev/null +++ b/src/classy_blocks/grading/autograding/params/smooth.py @@ -0,0 +1,69 @@ +import dataclasses +import warnings +from typing import Tuple + +import scipy.optimize + +from classy_blocks.grading.autograding.params.base import CellSizeType, ChopParams +from classy_blocks.grading.chop import Chop + + +@dataclasses.dataclass +class SmoothGraderParams(ChopParams): + cell_size: float + + def get_count(self, length: float, _start_at_wall, _end_at_wall): + # the first chop defines the count; + count = int(length / self.cell_size) + # it must be divisible by 2 + if count % 2 != 0: + count += 1 + + return count + + def is_squeezed(self, count, info) -> bool: + return info.length <= self.cell_size * count + + def define_sizes(self, size_before: CellSizeType, size_after: CellSizeType) -> Tuple[float, float]: + """Defines start and end cell size. + size_before and size_after are taken from preceding/following wires; + when a size is None, this is the last/first wire.""" + if size_before == 0 or size_after == 0: + # until all counts/sizes are defined + # (the first pass with uniform grading), + # there's no point in doing anything + raise RuntimeError("Undefined grading encountered!") + + if size_before is None: + size_before = self.cell_size + + if size_after is None: + size_after = self.cell_size + + return size_before, size_after + + def get_chops(self, count, info): + halfcount = count // 2 + + size_before, size_after = self.define_sizes(info.size_before, info.size_after) + + # choose length ratio so that cells at the middle of blocks + # (between the two chops) have the same size + def fobj(lratio): + chop_1 = Chop(length_ratio=lratio, count=halfcount, start_size=size_before) + data_1 = chop_1.calculate(info.length) + + chop_2 = Chop(length_ratio=1 - lratio, count=halfcount, end_size=size_after) + data_2 = chop_2.calculate(info.length) + + ratio = (data_1.end_size - data_2.start_size) ** 2 + + return ratio, [chop_1, chop_2] + + # it's not terribly important to minimize until the last dx + tol = min(size_before, size_after, self.cell_size) * 0.1 + results = scipy.optimize.minimize_scalar(lambda r: fobj(r)[0], bounds=[0.1, 0.9], options={"xatol": tol}) + if not results.success: # type:ignore + warnings.warn("Could not determine optimal grading", stacklevel=1) + + return fobj(results.x)[1] # type:ignore diff --git a/tests/test_grading/test_params.py b/tests/test_grading/test_params.py new file mode 100644 index 00000000..c5608904 --- /dev/null +++ b/tests/test_grading/test_params.py @@ -0,0 +1,94 @@ +import unittest + +from parameterized import parameterized + +from classy_blocks.grading.autograding.params.base import WireInfo +from classy_blocks.grading.autograding.params.inflation import InflationGraderParams +from classy_blocks.items.vertex import Vertex +from classy_blocks.items.wires.wire import Wire + + +class InflationParamsTests(unittest.TestCase): + def setUp(self): + self.params = InflationGraderParams(1e-3, 0.1, 1.2, 30, 2) + + def get_info(self, length, starts_at_wall, ends_at_wall) -> WireInfo: + wire = Wire([Vertex([0, 0, 0], 0), Vertex([length, 0, 0], 1)], 0, 0, 1) + return WireInfo(wire, starts_at_wall, ends_at_wall) + + def test_get_count_bulk(self): + count = self.params.get_count(1, False, False) + + self.assertEqual(count, 10) + + @parameterized.expand( + ( + (1, 24), + (2, 34), + (3, 44), + (0.1, 16), + (0.05, 14), + (0.01, 7), + (0.002, 2), + (0.001, 2), + (0.0005, 1), + ) + ) + def test_get_count_wall(self, length, count): + # count calculation when one vertex of the wire is at wall; + # numbers checked with manual spreadsheet calculation + self.assertEqual(self.params.get_count(length, True, False), count) + + @parameterized.expand( + ( + (1, 38), # 0 + (2, 48), # 1 + (3, 58), # 2 + (0.1, 28), # 3 + (0.05, 20), # 4 + (0.01, 8), # 5 + (0.002, 4), # 6 + (0.001, 2), # 7 + (0.0005, 2), # 8 + ) + ) + def test_get_count_double_wall(self, length, count): + # count when both vertices are at wall + # numbers checked with manual spreadsheet calculation + half_count = self.params.get_count(length / 2, True, False) + + self.assertEqual(2 * half_count, count) + + @parameterized.expand( + ( + # not squeezed - enough room, low cell count + (1, 24, False), # 0 + (2, 34, False), # 1 + (3, 44, False), # 2 + (0.3, 16, False), # 3 + # squeezed: enough room, high cell count + (1, 25, True), # 4 + (2, 35, True), # 5 + (3, 45, True), # 6 + # squeezed: not enough room, cell count doesn't matter + (0.2, 16, True), # 7 + (0.2, 0, True), # 8 + ) + ) + def test_is_squeezed_wall(self, length, count, squeezed): + info = self.get_info(length, True, False) + + self.assertEqual(self.params.is_squeezed(count, info), squeezed) + + @parameterized.expand( + ( + (1, 9, False), + (1, 0, False), + (1, 11, True), + ) + ) + def test_is_squeezed_bulk(self, length, count, squeezed): + # a test of SmoothGrader, actually + info = self.get_info(length, False, False) + + self.assertEqual(self.params.is_squeezed(count, info), squeezed)