Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add python CSA DM XML parsing support for derived clusters #30036

Merged
merged 22 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
68a44ac
Start preparing to store derived clusters
andreilitvin Oct 26, 2023
299850e
reformat, make sure parsing works
andreilitvin Oct 26, 2023
485a3a6
More leniency to allow ModeBase parsing
andreilitvin Oct 26, 2023
74a901b
More leniency and documentation, be ready to attach base clusters
andreilitvin Oct 26, 2023
5a1334c
Refactor to add some separate derivation logic
andreilitvin Oct 26, 2023
c60c24c
Restyle
andreilitvin Oct 26, 2023
fcee09c
More work on actually handling inheritance
andreilitvin Oct 26, 2023
7f77ff6
Restyle
andreilitvin Oct 26, 2023
3b9496b
Merge branch 'master' into support_derived_clusters
andreilitvin Oct 26, 2023
5d8e556
Implement actual base class derivation
andreilitvin Oct 26, 2023
84a1dea
Add unit test for derived, make diffs a LOT better
andreilitvin Oct 26, 2023
5771a43
Restyle
andreilitvin Oct 26, 2023
50c9514
Switch to unified diff for a nicer diff view
andreilitvin Oct 26, 2023
adfaee9
Return after the first assert
andreilitvin Oct 26, 2023
f7507e4
Make type checker happy at places
andreilitvin Oct 26, 2023
fef1f28
Make mypy happy even on an edge case
andreilitvin Oct 26, 2023
7b9a6fc
Fix linter errors
andreilitvin Oct 26, 2023
f50e4fb
Restyle
andreilitvin Oct 26, 2023
3ae16f9
Merge branch 'master' into support_derived_clusters
andy31415 Oct 27, 2023
16b5658
more typing for attrs
andy31415 Oct 27, 2023
4ab4ee4
Some name changes for base clusters: use abstract to make it clear th…
andy31415 Oct 27, 2023
776a0eb
Also change variable name
andy31415 Oct 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scripts/py_matter_idl/files.gni
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ matter_idl_generator_sources = [
"${chip_root}/scripts/py_matter_idl/matter_idl/data_model_xml/handlers/__init__.py",
"${chip_root}/scripts/py_matter_idl/matter_idl/data_model_xml/handlers/base.py",
"${chip_root}/scripts/py_matter_idl/matter_idl/data_model_xml/handlers/context.py",
"${chip_root}/scripts/py_matter_idl/matter_idl/data_model_xml/handlers/derivation.py",
"${chip_root}/scripts/py_matter_idl/matter_idl/data_model_xml/handlers/handlers.py",
"${chip_root}/scripts/py_matter_idl/matter_idl/data_model_xml/handlers/parsing.py",
"${chip_root}/scripts/py_matter_idl/matter_idl/generators/__init__.py",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from xml.sax.xmlreader import AttributesImpl

from matter_idl.matter_idl_types import Idl

from .base import BaseHandler
from .context import Context
from .handlers import ClusterHandler
from .parsing import NormalizeName

LOGGER = logging.getLogger('data-model-xml-data-parsing')


def contains_valid_cluster_id(attrs: AttributesImpl) -> bool:
# Does not check numeric format ... assuming scraper is smart enough for that
return 'id' in attrs and len(attrs['id']) > 0


class DataModelXmlHandler(BaseHandler):
Expand All @@ -27,8 +38,14 @@ def __init__(self, context: Context, idl: Idl):
super().__init__(context)
self._idl = idl

def GetNextProcessor(self, name, attrs):
def GetNextProcessor(self, name, attrs: AttributesImpl):
if name.lower() == 'cluster':
return ClusterHandler(self.context, self._idl, attrs)
if contains_valid_cluster_id(attrs):
return ClusterHandler.ForAttributes(self.context, self._idl, attrs)

LOGGER.info(
"Found an abstract base cluster (no id): '%s'", attrs['name'])

return ClusterHandler.IntoCluster(self.context, self._idl, self.context.AddAbstractBaseCluster(NormalizeName(attrs['name']), self.context.GetCurrentLocationMeta()))
else:
return BaseHandler(self.context)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import xml.sax.xmlreader
from typing import List, Optional

from matter_idl.matter_idl_types import Idl, ParseMetaData
from matter_idl.matter_idl_types import Cluster, ClusterSide, Idl, ParseMetaData


class IdlPostProcessor:
Expand Down Expand Up @@ -82,6 +82,19 @@ def __init__(self, locator: Optional[xml.sax.xmlreader.Locator] = None):
self.file_name = None
self._not_handled: set[str] = set()
self._idl_post_processors: list[IdlPostProcessor] = []
self.abstract_base_clusters: dict[str, Cluster] = {}

def AddAbstractBaseCluster(self, name: str, parse_meta: Optional[ParseMetaData] = None) -> Cluster:
"""Creates a new cluster entry for the given name in the list of known
base clusters.
"""
assert name not in self.abstract_base_clusters # be unique

cluster = Cluster(side=ClusterSide.CLIENT, name=name,
andy31415 marked this conversation as resolved.
Show resolved Hide resolved
code=-1, parse_meta=parse_meta)
self.abstract_base_clusters[name] = cluster

return cluster

def GetCurrentLocationMeta(self) -> Optional[ParseMetaData]:
if not self.locator:
Expand Down
173 changes: 173 additions & 0 deletions scripts/py_matter_idl/matter_idl/data_model_xml/handlers/derivation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
#
# Copyright (c) 2023 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
from typing import Iterable, Optional, Protocol, TypeVar

from matter_idl.matter_idl_types import Attribute, Bitmap, Cluster, Command, Enum, Event, Idl, Struct

from .context import Context, IdlPostProcessor
from .parsing import NormalizeName

LOGGER = logging.getLogger('data-model-xml-data-parsing')

T = TypeVar("T")


class HasName(Protocol):
name: str


NAMED = TypeVar('NAMED', bound=HasName)


def get_item_with_name(items: Iterable[NAMED], name: str) -> Optional[NAMED]:
"""Find an item with the given name.

Returns none if that item does not exist
"""
for item in items:
if item.name == name:
return item
return None


def merge_enum_into(e: Enum, cluster: Cluster):
existing = get_item_with_name(cluster.enums, e.name)

if existing:
# Remove existing but merge constants into e
cluster.enums.remove(existing)
for value in existing.entries:
if not get_item_with_name(e.entries, value.name):
e.entries.append(value)

cluster.enums.append(e)


def merge_bitmap_into(b: Bitmap, cluster: Cluster):
existing = get_item_with_name(cluster.bitmaps, b.name)

if existing:
# Remove existing but merge constants into e
cluster.bitmaps.remove(existing)
for value in existing.entries:
if not get_item_with_name(b.entries, value.name):
b.entries.append(value)

cluster.bitmaps.append(b)


def merge_event_into(e: Event, cluster: Cluster):
existing = get_item_with_name(cluster.events, e.name)
if existing:
LOGGER.error("TODO: Do not know how to merge event for %s::%s",
cluster.name, existing.name)
cluster.events.remove(existing)

cluster.events.append(e)


def merge_attribute_into(a: Attribute, cluster: Cluster):
existing: Optional[Attribute] = None
for existing_a in cluster.attributes:
if existing_a.definition.name == a.definition.name:
existing = existing_a
break

if existing:
# Do not provide merging as it seems only conformance is changed from
# the base cluster
#
# This should fix the correct types
#
# LOGGER.error("TODO: Do not know how to merge attribute for %s::%s", cluster.name, existing.definition.name)
cluster.attributes.remove(existing)

cluster.attributes.append(a)


def merge_struct_into(s: Struct, cluster: Cluster):
existing = get_item_with_name(cluster.structs, s.name)
if existing:
# Do not provide merging as it seems XML only adds
# constraints and conformance to struct elements
#
# TODO: at some point we may be able to merge some things,
# if we find that derived clusters actually add useful things here
#
# LOGGER.error("TODO: Do not know how to merge structs for %s::%s", cluster.name, existing.name)
cluster.structs.remove(existing)

cluster.structs.append(s)


def merge_command_into(c: Command, cluster: Cluster):
existing = get_item_with_name(cluster.commands, c.name)

if existing:
LOGGER.error("TODO: Do not know how to merge command for %s::%s",
cluster.name, existing.name)
cluster.commands.remove(existing)

cluster.commands.append(c)


def inherit_cluster_data(from_cluster: Cluster, into_cluster: Cluster):
for e in from_cluster.enums:
merge_enum_into(e, into_cluster)

for b in from_cluster.bitmaps:
merge_bitmap_into(b, into_cluster)

for ev in from_cluster.events:
merge_event_into(ev, into_cluster)

for a in from_cluster.attributes:
merge_attribute_into(a, into_cluster)

for s in from_cluster.structs:
merge_struct_into(s, into_cluster)

for c in from_cluster.commands:
merge_command_into(c, into_cluster)


class AddBaseInfoPostProcessor(IdlPostProcessor):
def __init__(self, destination_cluster: Cluster, source_cluster_name: str, context: Context):
self.destination = destination_cluster
self.source_name = NormalizeName(source_cluster_name)
self.context = context

def FinalizeProcessing(self, idl: Idl):
# attempt to find the base. It may be in the "names without ID" however it may also be inside
# existing clusters (e.g. Basic Information)
base: Optional[Cluster] = None
if self.source_name in self.context.abstract_base_clusters:
base = self.context.abstract_base_clusters[self.source_name]
else:
for c in idl.clusters:
if c.name == self.source_name:
base = c
break

if not base:
LOGGER.error(
"Could not find the base cluster named '%s'", self.source_name)
return

LOGGER.info("Copying base data from '%s' into '%s'",
base.name, self.destination.name)
inherit_cluster_data(from_cluster=base, into_cluster=self.destination)
Loading