Files
Addon-Odoo19/.venv/Lib/site-packages/ifcopenshell/express/rule_executor.py
T
2026-05-31 10:17:09 +07:00

300 lines
10 KiB
Python

import os
import re
import ast
import collections
import ifcopenshell
from logging import Logger
from dataclasses import dataclass
from codegen import indent
def reverse_compile(s):
return re.sub(
r"\bself\b",
"SELF",
re.sub(
r"\s*\-\s*EXPRESS_ONE_BASED_INDEXING",
"",
re.sub(
", )?+.(, INDETERMINATE)\\"[::-1],
"]\\1[",
re.sub(
r", '(\w+)', INDETERMINATE\)",
".\\1",
s.strip()
.replace("len(", "SIZEOF(")
.replace("assert ", "")
.replace(" is not False", "")
.replace("express_getattr(", "")
.replace("express_getitem(", ""),
)[::-1],
)[::-1],
),
)
@dataclass
class error(Exception):
rule_name: str
rule_definition: str
violation: str
instance: ifcopenshell.entity_instance = None
def __str__(self):
inst = ""
if self.instance:
inst = f"On instance:\n{indent(4, str(self.instance))}\n"
if self.rule_name:
return f"{inst}Rule {self.rule_name}:\n{indent(4, self.rule_definition)}\nViolated by:\n{indent(4, self.violation)}"
else:
if inst:
inst = f"\n\n{inst}"
return f"{self.rule_definition}\n\nViolated by:\n{indent(4, self.violation)}{inst}"
def fix_type(v):
if isinstance(v, (list, tuple)):
# 1-based indexing:
#
# @todo this is not the best way, because it still allows to index the 0-th element,
# but given the existing body of rules this should be sufficient.
# return type(v)([None]) + type(v)(map(fix_type, v))
# We don't do this anymore, because it doesn't fix instance attribute lookups
# We now instead perform a -1 on the index qualifier in the code generation
pass
# @todo enrich entity instances with code to evaluate derived attributes
return v
def run(f: ifcopenshell.file, logger: Logger) -> None:
from _pytest import assertion
if hasattr(logger, "set_instance"):
# when using the json logger, we notify it of the relevant instance
pre_annotate_instance = lambda instance: (
logger.set_state("instance", instance) if hasattr(logger, "set_state") else None
)
post_annotate_instance = lambda instance: instance
pre_annotate_attribute = lambda attribute: (
logger.set_state("attribute", attribute) if hasattr(logger, "set_state") else None
)
post_annotate_attribute = lambda attribute: None
else:
# when using the normal text logger the instance is appended to the method
pre_annotate_instance = lambda instance: None
post_annotate_instance = lambda instance: instance
pre_annotate_attribute = lambda attribute: None
post_annotate_attribute = lambda attribute: attribute
orig = ifcopenshell.settings.unpack_non_aggregate_inverses
ifcopenshell.settings.unpack_non_aggregate_inverses = True
fn = os.path.join(os.path.dirname(__file__), "rules", f"{f.schema_identifier}.py")
try:
source = open(fn, "r").read()
except FileNotFoundError as e:
import sys
import time
import subprocess
current_dir_files = {fn.lower(): fn for fn in os.listdir(".")}
schema_name = str(f.schema_identifier).split(" ")[-1].lower()
schema_path = current_dir_files.get(schema_name + ".exp")
fn = schema_path[:-4] + ".py"
if not os.path.exists(fn):
subprocess.run([sys.executable, "-m", "ifcopenshell.express.rule_compiler", schema_path, fn], check=True)
time.sleep(1.0)
source = open(fn, "r").read()
a = ast.parse(source)
assertion.rewrite.rewrite_asserts(mod=a, source=source)
cd = compile(a, f"{f.schema_identifier}.py", "exec")
scope = {}
exec(cd, scope)
S = ifcopenshell.ifcopenshell_wrapper.schema_by_name(f.schema_identifier)
rules = list(filter(lambda x: hasattr(x, "SCOPE"), scope.values()))
if hasattr(logger, "set_state"):
logger.set_state("type", "global_rule")
for R in [r for r in rules if r.SCOPE == "file"]:
try:
R()(f)
except RecursionError as e:
logger.info(str(e))
except Exception as e:
ln = e.__traceback__.tb_next.tb_lineno
pre_annotate_attribute(R.__name__)
logger.error(
str(
error(
post_annotate_attribute(R.__name__),
reverse_compile(source.split("\n")[ln - 1]),
reverse_compile(e.args[0]),
)
)
)
if hasattr(logger, "set_state"):
logger.set_state("type", "simpletype_rule")
types = {}
subtypes = collections.defaultdict(list)
for d in S.declarations():
if isinstance(d, ifcopenshell.ifcopenshell_wrapper.type_declaration):
types[d.name()] = d
if isinstance(d.declared_type(), ifcopenshell.ifcopenshell_wrapper.named_type):
subtypes[d.declared_type().declared_type().name()].append(d.name())
D = collections.defaultdict(list)
for r in rules:
if r.SCOPE == "type":
def visit(nm):
D[nm].append(r)
for nm2 in subtypes[nm]:
visit(nm2)
visit(r.TYPE_NAME)
def type_name(ty):
if isinstance(ty, ifcopenshell.ifcopenshell_wrapper.named_type):
return type_name(ty.declared_type())
elif isinstance(ty, ifcopenshell.ifcopenshell_wrapper.aggregation_type):
# breakpoint()
pass
elif isinstance(ty, ifcopenshell.ifcopenshell_wrapper.simple_type):
pass
else:
return ty.name()
def check(value, type, instance):
if value is None:
return
if type_name(type) in D:
for R in D[type_name(type)]:
try:
R()(fix_type(value))
except RecursionError as e:
logger.info(str(e))
except Exception as e:
ln = e.__traceback__.tb_next.tb_lineno
pre_annotate_instance(instance)
pre_annotate_attribute(f"{R.TYPE_NAME}.{R.RULE_NAME}")
logger.error(
str(
error(
post_annotate_attribute(f"{R.TYPE_NAME}.{R.RULE_NAME}"),
reverse_compile(source.split("\n")[ln - 1]),
reverse_compile(e.args[0]),
post_annotate_instance(instance),
)
)
)
# @nb something can be a named type with rules and still be an aggregation.
# case in point IfcCompoundPlaneAngleMeasure. Therefore only unpack named
# type references from this point onwards.
while isinstance(
type,
(
ifcopenshell.ifcopenshell_wrapper.named_type,
ifcopenshell.ifcopenshell_wrapper.type_declaration,
),
):
type = type.declared_type()
if isinstance(value, (list, tuple)):
if isinstance(type, ifcopenshell.ifcopenshell_wrapper.aggregation_type):
ty = type.type_of_element()
for v in value:
check(v, ty, instance=inst)
else:
# Let's hope a schema validation error was reported for this case
pass
elif isinstance(value, ifcopenshell.entity_instance):
if isinstance(
S.declaration_by_name(value.is_a()),
ifcopenshell.ifcopenshell_wrapper.entity,
):
# top level entity instances will be checked on their own
pass
else:
# unpack the type instance
check(value[0], S.declaration_by_name(value.is_a()), instance=inst)
for inst in f:
try:
values = list(inst)
except RecursionError as e:
logger.info(str(e))
except Exception as e:
if hasattr(logger, "set_state"):
logger.error(str(e))
else:
logger.error("For instance:\n %s\n%s", inst, e)
continue
entity = S.declaration_by_name(inst.is_a())
attrs = entity.all_attributes()
for i, (attr, val, is_derived) in enumerate(zip(attrs, values, entity.derived())):
if is_derived:
# @todo
pass
else:
check(val, attr.type_of_attribute(), instance=inst)
if hasattr(logger, "set_state"):
logger.set_state("type", "entity_rule")
for R in [r for r in rules if r.SCOPE == "entity"]:
for inst in f.by_type(R.TYPE_NAME):
try:
R()(inst)
except RecursionError as e:
logger.info(str(e))
except Exception as e:
ln = e.__traceback__.tb_next.tb_lineno
pre_annotate_instance(inst)
pre_annotate_attribute(f"{R.TYPE_NAME}.{R.RULE_NAME}")
logger.error(
str(
error(
post_annotate_attribute(f"{R.TYPE_NAME}.{R.RULE_NAME}"),
reverse_compile(source.split("\n")[ln - 1]),
reverse_compile(e.args[0]),
post_annotate_instance(inst),
)
)
)
ifcopenshell.settings.unpack_non_aggregate_inverses = orig
if __name__ == "__main__":
import sys
import json
import logging
import ifcopenshell
from ifcopenshell.validate import json_logger
filenames = [x for x in sys.argv[1:] if not x.startswith("--")]
flags = set(x for x in sys.argv[1:] if x.startswith("--"))
for fn in filenames:
if "--json" in flags:
logger = json_logger()
else:
logger = logging.getLogger("validate")
logger.setLevel(logging.DEBUG)
f = ifcopenshell.open(fn)
run(f, logger)
if "--json" in flags:
print("\n".join(json.dumps(x, default=str) for x in logger.statements))