Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge CTerm #4621

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions pyk/src/pyk/cterm/cterm.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ def is_bottom(self) -> bool:
"""Check if a given `CTerm` is trivially empty."""
return is_bottom(self.config, weak=True) or any(is_bottom(cterm, weak=True) for cterm in self.constraints)

@property
def constraint(self) -> KInner:
"""Return the set of constraints as a single flattened constraint using `mlAnd`."""
return mlAnd(self.constraints)

@staticmethod
def _constraint_sort_key(term: KInner) -> tuple[int, str]:
term_str = str(term)
Expand Down Expand Up @@ -253,6 +258,46 @@ def remove_useless_constraints(self, keep_vars: Iterable[str] = ()) -> CTerm:
return CTerm(self.config, new_constraints)


def merge_cterms(t1: CTerm, t2: CTerm) -> CTerm | None:
"""Return a `CTerm` which is the merge of the two input `CTerm` instances.

Args:
t1: First `CTerm` to merge.
t2: Second `CTerm` to merge.

Returns:
A `CTerm` which is the merge of the two input `CTerm` instances.
"""
# check all cells in t1 and t1, if they are the same, keep them, otherwise, create a new free variable for them
t1_config, t1_subst = split_config_from(t1.config)
t2_config, t2_subst = split_config_from(t2.config)

if t1_config != t2_config:
# cannot merge two configurations with different structure
return None

new_subst = Subst({})
new_t1_subst = Subst({})
new_t2_subst = Subst({})

for cell in t1_subst:
if t1_subst[cell] == t2_subst[cell]:
# keep the cell if it is the same
new_subst = new_subst * Subst({cell: t1_subst[cell]})
else:
# create a new free variable for the cell
new_t1_subst = new_t1_subst * Subst({cell: t1_subst[cell]})
new_t2_subst = new_t2_subst * Subst({cell: t2_subst[cell]})
new_config = new_subst(t1_config)

new_constraints: list[KInner] = []
for new_subst, t in [(new_t1_subst, t1), (new_t2_subst, t2)]:
if new_subst:
new_constraints.append(mlImplies(new_subst.ml_pred, t.constraint))

return CTerm(new_config, new_constraints)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main implementation of merge_cterms.

  1. Return None if the configuration structures are different.
  2. Replace cell content into a new free variable if there are free variables or different between them.
  3. The constraints will be CELL_NAME == SUBST /\ ... -> source.constraint /\ ...

Comment on lines +261 to +298
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a less powerful version of CTerm.anti_unify. Basically, instead of merge_cterms(cterm1, cterm2), you can do:

cterm, csubst1, csubst2 = cterm1.anti_unify(cterm2)

You'll have that csubst1(cterm) == cterm1, and csubst2(cterm) == cterm2, and you can use the data returned there to compute what you need for the merged nodes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are some tests of antiunification:

def test_anti_unify_keep_values(

def test_anti_unify_forget_values(

Copy link
Contributor Author

@Stevengre Stevengre Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I looked at the code and tests of anti_unify and feel that anti_unify can indeed be directly used. It's just that the generated csubst I can't use. Perhaps I can close this PR.



def anti_unify(state1: KInner, state2: KInner, kdef: KDefinition | None = None) -> tuple[KInner, Subst, Subst]:
"""Return a generalized state over the two input states.

Expand Down
104 changes: 103 additions & 1 deletion pyk/src/tests/unit/test_cterm.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import pytest

from pyk.cterm import CTerm, cterm_build_claim, cterm_build_rule
from pyk.cterm.cterm import merge_cterms
from pyk.kast import Atts, KAtt
from pyk.kast.inner import KApply, KLabel, KRewrite, KSequence, KSort, KVariable
from pyk.kast.outer import KClaim
from pyk.prelude.k import GENERATED_TOP_CELL
from pyk.prelude.kint import INT, intToken
from pyk.prelude.ml import mlAnd, mlEqualsTrue
from pyk.prelude.ml import mlAnd, mlEqualsTrue, mlImplies, mlEquals, mlTop

from .utils import a, b, c, f, g, h, k, x, y, z

Expand Down Expand Up @@ -186,3 +187,104 @@ def test_from_kast(test_id: str, kast: KInner, expected: CTerm) -> None:

# Then
assert cterm == expected


MERGE_TEST_DATA: Final = (
(CTerm.top(), CTerm.top(), CTerm.top()),
(CTerm.bottom(), CTerm.top(), None),
(CTerm(k(intToken(1))), CTerm(k(intToken(1))), CTerm(k(intToken(1)))),
(
CTerm(k(KVariable('X'))),
CTerm(k(KVariable('X'))),
CTerm(k(KVariable('X'))),
),
(
CTerm(k(KVariable('X'))),
CTerm(k(KVariable('Y'))),
CTerm(
k(KVariable('K_CELL')),
[
mlImplies(mlEquals(KVariable('K_CELL'), KVariable('X')), mlTop()),
mlImplies(mlEquals(KVariable('K_CELL'), KVariable('Y')), mlTop()),
],
),
),
# (
# CTerm(k(intToken(1))),
# CTerm(k(intToken(2))),
# CTerm(
# k(KVariable('K_CELL')),
# [
# mlImplies(mlEquals(KVariable('TOP_CELL'), intToken(1)), mlTop()),
# mlImplies(mlEquals(KVariable('TOP_CELL'), intToken(2)), mlTop()),
# ],
# ),
# ),
# (
# CTerm(
# k(KVariable('X')),
# [ge_ml('X', 0)],
# ),
# CTerm(
# k(KVariable('X')),
# [ge_ml('X', 0)],
# ),
# CTerm(
# k(KVariable('K_CELL')),
# [
# mlImplies(mlEquals(KVariable('TOP_CELL'), KVariable('X')), ge_ml('X', 0)),
# ],
# ),
# ),
# (
# CTerm(
# k(KVariable('X')),
# [ge_ml('X', 0), lt_ml('X', 3)],
# ),
# CTerm(
# k(KVariable('X')),
# [ge_ml('X', 0), lt_ml('X', 5)],
# ),
# CTerm(
# k(KVariable('K_CELL')),
# [
# mlImplies(mlEquals(KVariable('TOP_CELL'), KVariable('X')), mlAnd([lt_ml('X', 3), ge_ml('X', 0)])),
# mlImplies(
# mlEquals(KVariable('TOP_CELL'), KVariable('X')),
# mlAnd(
# [
# lt_ml('X', 5),
# ge_ml('X', 0),
# ]
# ),
# ),
# ],
# ),
# ),
# (
# CTerm(
# k(KVariable('X')),
# [ge_ml('X', 0), lt_ml('X', 3)],
# ),
# CTerm(
# k(KVariable('Y')),
# [ge_ml('Y', 0), lt_ml('Y', 5)],
# ),
# CTerm(
# k(KVariable('K_CELL')),
# [
# mlImplies(mlEquals(KVariable('TOP_CELL'), KVariable('X')), mlAnd([lt_ml('X', 3), ge_ml('X', 0)])),
# mlImplies(mlEquals(KVariable('TOP_CELL'), KVariable('Y')), mlAnd([lt_ml('Y', 5), ge_ml('Y', 0)])),
# ],
# ),
# ),
)


@pytest.mark.parametrize('t1,t2,expected', MERGE_TEST_DATA, ids=count())
def test_cterm_merge(t1: CTerm, t2: CTerm, expected: CTerm) -> None:
# When
merged = merge_cterms(t1, t2)

# Then
assert merged == expected
tothtamas28 marked this conversation as resolved.
Show resolved Hide resolved
10 changes: 10 additions & 0 deletions pyk/src/tests/unit/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing import TYPE_CHECKING

from pyk.kast.inner import KApply, KLabel, KVariable
from pyk.prelude.kint import geInt, intToken, ltInt
from pyk.prelude.ml import mlEqualsTrue

if TYPE_CHECKING:
from typing import Final
Expand All @@ -16,3 +18,11 @@
f, g, h = map(KLabel, ('f', 'g', 'h'))

k = KLabel('<k>')


def lt_ml(var: str, n: int) -> KApply:
return mlEqualsTrue(ltInt(KVariable(var), intToken(n)))


def ge_ml(var: str, n: int) -> KApply:
return mlEqualsTrue(geInt(KVariable(var), intToken(n)))
Loading