import io import os import csv import platform import tabulate import operator import itertools import subprocess import ifcopenshell from collections import defaultdict import mvdxml_expression def camel(s): """ Camel case conversion function :param s: str :return: camel case formatted string """ s = s.title().replace(" ", "") if s.endswith("s"): s = s[:-1] return s[0].lower() + s[1:] STANDARD_PREFIXES = { 'rdf': '', 'owl': '', 'xsd': '', 'list': '', 'ifcowl': '', 'express': '', } def derive_prefix(ttlfn): with open(ttlfn, "r") as f: for ln in f: ln.strip() if ln.startswith("@prefix ifcowl"): uri = ln.split(':', 1)[1].strip()[:-1].strip() print("Detected ifcowl prefix", uri) STANDARD_PREFIXES['ifcowl'] = uri break def withschema(fn): """ Decorator that takes a function and adds an IFC latebound schema definition in the first parameter. The schema identifier is looked up based on the global ifcOwl prefix. :param fn: input function :return: decorated function """ def _(*args, **kwargs): schema_name = STANDARD_PREFIXES['ifcowl'].split('/')[-1][:-2] if "_" in schema_name: schema_name = schema_name.split('_')[0] S = ifcopenshell.ifcopenshell_wrapper.schema_by_name(schema_name) return fn(S, *args, **kwargs) return _ noop = lambda *args: None class rule_binding(object): """ Object for mapping rules to generated SPARQL variables """ def __init__(self): pass class builder(object): """ A helper class for dealing with SPARQL query statements """ def __init__(self): self.prefixes = {"express": "", "ifcowl": ""} self.statements = [] def append(self, *stmt): if len(stmt) == 3: for i, pos in enumerate(stmt[1:]): for po in pos.split("/"): if ":" in pos: a, b = po.split(':') self.prefixes[a] = '' self.statements.append(stmt) def bind(self, di): for k in set(self.prefixes.keys()) & set(di.keys()): self.prefixes[k] = di[k] def x(self): return len(self.statements) def __repr__(self): def f(s): S = " ".join(s) if len(s) == 3: S += ' .' return S def g(s): return "PREFIX %s: %s" % s return "\n".join(itertools.chain( (g(s) for s in self.prefixes.items()), (f(s) for s in self.statements) )) class ifcOwl(object): """ Helper class with static function for dealing with ifcOwl attribute names """ @staticmethod @withschema def supertypes(S, entity): """ Yields ifcOwl subtypes for the supplied entity name :param S: schema definition (from decorator) :param entity: entity name string :return: """ a, b = entity.split('#') try: en = S.declaration_by_name(b) if en.__class__.__name__ == "entity": while en.supertype(): yield "%s#%s" % (a, en.supertype().name()) en = en.supertype() except: pass @staticmethod def get_names(e, c): """ Returns inverse or forward attribute names for latebound entity definition :param e: latebound entity definition :param c: either 'all_attributes' or 'all_inverse_attributes' :return: set of attribute names """ return set(map(lambda a: a.name(), getattr(e, c)())) @staticmethod @withschema def is_boxed(S, entity, attribute, predCount=0): """ Returns whether the entity attribute should be boxed in ifcOwl. Which means that there is an additional indirection. inst:IfcRelDefinesByType_21937 ifcowl:globalId_IfcRoot inst:IfcGloballyUniqueId_117684 ; inst:IfcGloballyUniqueId_117684 rdf:type ifcowl:IfcGloballyUniqueId ; express:hasString "2P9FPkykn0r8rCpmBxZH0w" . :param S: schema definition (from decorator) :param entity: entity name string :param attribute: attribute name string :param predCount: numeric identifier to postfix predicate identifier in case of SELECT types :return: either a predicate from the express namespace or a variable postfixed with predCount """ en = S.declaration_by_name(entity) attr = [a for a in en.all_attributes() if a.name() == attribute][0] ty = attr.type_of_attribute() is_boxed = False while isinstance(ty, ifcopenshell.ifcopenshell_wrapper.named_type): ty = ty.declared_type() if isinstance(ty, ifcopenshell.ifcopenshell_wrapper.select_type): # Just assume there is going to be some boxed type in here. # It could be all instance references, but fact is we don't know at this moment. # It's likely that mvdXML will only bind to literals? return "?pred%d" % predCount else: while isinstance(ty, ifcopenshell.ifcopenshell_wrapper.type_declaration): is_boxed = True ty = ty.declared_type() if is_boxed and isinstance(ty, ifcopenshell.ifcopenshell_wrapper.simple_type): ty = ty.declared_type() return "express:has%s%s" % (ty[0].upper(), ty[1:]) return False @staticmethod @withschema def name(S, entity, attribute): """ Names the entity attribute according to ifcOwl :param S: :param entity: :param attribute: :return: """ en = S.declaration_by_name(entity) while True: st = en.supertype() attribute_names = ifcOwl.get_names(en, "all_attributes") | \ ifcOwl.get_names(en, "all_inverse_attributes") if st: attribute_names -= ifcOwl.get_names(st, "all_attributes") | \ ifcOwl.get_names(st, "all_inverse_attributes") if attribute in attribute_names: return "ifcowl:" + attribute[0].lower() + attribute[1:] + "_" + en.name() en = st if en is None: raise AttributeError("%s not found on %s" % (attribute, entity)) @staticmethod @withschema def is_select(S, decl_name): """ Returns True when the declaration is a select type :param S: :param decl_name: :return: """ decl = S.declaration_by_name(decl_name) return isinstance(decl, ifcopenshell.ifcopenshell_wrapper.select_type) @staticmethod @withschema def is_inverse(S, entity, attribute): """ When entity attribute is an INVERSE attribute, returns the opposite forward entity and attribute name. Otherwise returns (False, False) :param S: :param entity: :param attribute: :return: """ en = S.declaration_by_name(entity) attrs = [a for a in en.all_inverse_attributes() if a.name() == attribute] if not attrs: return False, False a = attrs[0] assert a.type_of_aggregation_string() == "set" entity = a.entity_reference().name() attr = a.attribute_reference().name() return "ifcowl:" + entity, ifcOwl.name(entity, attr) class convertor(object): @staticmethod def convert(item, *args, **kwargs): return getattr(convertor, item.__class__.__name__)(item, *args, **kwargs) @staticmethod def concept_or_applicability(concept): """ Convert the Template (SELECT ... WHERE {}) structure and TemplateRule (FILTER) :param qtype: 0 or 1, 0 for a general query matching the applicableRootEntity :return: """ bld = builder() t = concept.template() bld.append("# %s" % camel(concept.root.name)) convertor.template(t, bld, concept.root.entity) bld.bind(STANDARD_PREFIXES) return bld @staticmethod def root(rootEntity): args = ["URI", "GlobalId"] b = builder() b.args = args b.append("SELECT " + " ".join("?" + a for a in args) + " WHERE {") b.append("?URI", "rdf:type", "ifcowl:%s" % rootEntity) b.append("?URI", "ifcowl:globalId_IfcRoot/express:hasString", "?GlobalId") b.append("}") b.bind(STANDARD_PREFIXES) return b @staticmethod def template(template, bld = None, rootEntity = None): if bld is None: bld = builder() if rootEntity is None: rootEntity = template.entity args = ["URI", "GlobalId"] def enumerate(rule, **kwargs): if rule.bind: args.append(rule.bind) template.traverse(enumerate) bld.args = args bld.append("SELECT " + " ".join("?" + a for a in args) + " WHERE {") args = set(args) bld.append("?URI", "rdf:type", "ifcowl:%s" % rootEntity) bld.append("?URI", "ifcowl:globalId_IfcRoot/express:hasString", "?GlobalId") nm = "?URI" ROOT = type('_', (), {'attribute': rootEntity})() # rule_stack = [ROOT] # name_stack = [nm] # callback_stack = [noop] # G = type('_', (object,), dict(indent = 0, nm = nm, next_nm=None, first=True)) rule_mapping = defaultdict(rule_binding) rule_mapping[ROOT].name = nm def build(rule, parents): # print "AAA", rule.tag, parent.tag if parent else parent # print map(id, rule_stack) # print name_stack INDENT = " " * (len(parents) * 2) return_value = None if rule.optional: bld.append(INDENT + "OPTIONAL {") return_value = lambda: bld.append(INDENT + "}") if rule.tag == "EntityRule": # G.nm = G.next_nm if not ifcOwl.is_select(rule.attribute): # SELECT types should never be qualified as they cannot be inferred bld.append(INDENT + rule_mapping[parents[-1]].name, "rdf:type", "ifcowl:" + rule.attribute) # propagate binding name rule_mapping[rule].name = rule_mapping[parents[-1]].name else: # if rule_stack[-1] is parent: # # print "sl", id(parent), id(rule) # # same level # pass # elif parent in rule_stack: # # print "up", id(parent), id(rule) # while rule_stack[-1] is not parent: # # rule_stack.pop() # name_stack.pop() # callback_stack.pop()() # else: # # print "dn", id(parent), id(rule) # pass indirect = False if rule.bind: if len(rule.nodes) == 1: indirect = ifcOwl.is_boxed(parents[-1].attribute, rule.attribute, predCount=bld.x()) if rule.bind and not indirect: next_nm = "?" + rule.bind else: next_nm = "?var%d" % bld.x() rule_mapping[rule].name = next_nm inventy, invattr = ifcOwl.is_inverse(parents[-1].attribute, rule.attribute) if invattr: # This seems not to be necessary, because the entity name is also stated in mvdXML # q.append( # next_nm, # "rdf:type", # inventy # ) bld.append( INDENT + next_nm, invattr, rule_mapping[parents[-1]].name ) else: bld.append( INDENT + rule_mapping[parents[-1]].name, ifcOwl.name(parents[-1].attribute, rule.attribute), next_nm ) if rule.bind and indirect: # For boxed literals, only strings atm bld.append(INDENT + next_nm, indirect, "?" + rule.bind) # rule_stack.append(rule) # name_stack.append(next_nm) # if rule.optional: # callback_stack.append(lambda: q.append(INDENT+"}")) # else: # callback_stack.append(noop) # print(q.statements[-1]) return return_value template.traverse(build, root=ROOT, with_parents=True) # while callback_stack: # callback_stack.pop()() if template.params: bld.append(convertor.build_filter(template)) bld.append("}") return bld @staticmethod def build_filter(self): def v(p): if isinstance(p, mvdxml_expression.node): if p.b == "Value": if p.c.lower() in {'true', 'false'}: yield "(%s?%s)" % ("!" if p.c.lower() == "false" else "", p.a) else: yield "(?%s = %s)" % (p.a, p.c) elif p.b == "Exists": yield "(!isBLANK(?%s))" % p.a else: raise Exception("Unsupported " + p.b) elif isinstance(p, str): yield { "and": "&&", "or": "||", "not": "&& !" }[p.lower()] else: yield "(" for q in p: yield from v(q) yield ")" return "FILTER(%s)" % " ".join(v(self.params)) def infer_subtypes(ttlfn): # Disabled currently return ttlfn if not os.path.exists(ttlfn + ".subclass.nt"): print("Inferring supertype relationships") import hashlib import rdflib a = rdflib.namespace.RDF.type # Hardly possible on Windows # graph = rdflib.Graph("Sleepycat") # graph.open("store", create=True) # graph.parse(ttlfn) from sqlalchemy import create_engine from rdflib_sqlalchemy.store import SQLAlchemy if os.path.exists("db.sqlite"): os.unlink("db.sqlite") uri = rdflib.Literal("sqlite:///%(here)s/db.sqlite" % {"here": os.getcwd()}) ident = rdflib.URIRef(hashlib.sha1(ttlfn.encode()).hexdigest()) engine = create_engine(uri) store = SQLAlchemy( identifier=ident, engine=engine, ) graph = rdflib.Graph( store, identifier=ident, ) graph.open(uri, create=True) graph.parse(ttlfn, format="ttl") # print out all the triples in the graph def _(): for subject, predicate, object in graph: if predicate == a: for sup in ifcOwl.supertypes(object): yield subject, a, rdflib.URIRef(sup) for stmt in list(_()): graph.add(stmt) graph.serialize(destination=ttlfn + ".subclass.nt", format="nt") ttlfn += ".subclass.nt" if platform.system() == "Windows": JENA_SPARQL = os.path.join(os.environ.get("JENA_HOME"), "bat", "sparql.bat") else: JENA_SPARQL = "sparql" class executor(object): @staticmethod def run(CR, fn, ttlfn): """ Generates SPARQL queries for the parsed MVD and executes on the building model :param CR: A parsed concept root :param fn: A filename used as the prefix to store generate SPARQL queries to disk :param ttlfn: A filename for the LD representation of an IFC model :return: """ def dict_to_list(headers): return lambda di: [di[h] for h in headers] def execute(query, *args): sparqlfn = ".".join(itertools.chain([fn], map(str, args))) + ".sparql" with open(sparqlfn, "w") as f: print(query, file=f) proc = subprocess.Popen( [JENA_SPARQL, "--data=" + ttlfn, "--query=" + sparqlfn, "--results=CSV"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = proc.communicate() csvf = io.StringIO(stdout.decode('utf-8')) return list(csv.DictReader(csvf)) root_query = convertor.root(CR.entity) roots = execute(root_query, 0) print("\nFile contains %d elements of type %s" % (len(roots), CR.entity)) passing_all = {} # for summary below num_columns = 0 try: # Full MVD with multiple concepts is_template = False concept_enumerator = list(itertools.chain([CR.applicability()], CR.concepts())) except: is_template = True concept_enumerator = [CR] for ci, C in enumerate(concept_enumerator): num_columns += 1 if is_template or ci > 1: print("\n%s" % C.name) else: print("\nApplicability") query = convertor.convert(C) print("\nSPARQL query") print("============") print(query) passing = execute(query, ci, 1) passing_guids = set(r['GlobalId'] for r in passing) print("\nElements passing") print(tabulate.tabulate(list(map(dict_to_list(query.args), passing)), query.args, tablefmt="grid")) print("\nElements failing concept") hd = ["URI", "GlobalId"] print(tabulate.tabulate( list(map(dict_to_list(hd), [r for r in roots if r["GlobalId"] not in passing_guids])), hd, tablefmt="grid")) passing_all[ci] = passing_guids print("\nSummary") for ci, C in enumerate(concept_enumerator): print("(%d) %s" % (ci+(0 if is_template else 0), C.name)) def get_stats(guid): v = lambda i: guid in passing_all[i] st = [guid] + ["x" if v(i) else "" for i in range(num_columns)] if not is_template: st += ["x" if not v(0) or all(v(i) for i in range(1, num_columns)) else " "] return st hd = ["GlobalId"] + list(map(str, range(num_columns))) if not is_template: hd += ["Valid"] print(tabulate.tabulate(list(map(get_stats, map(operator.itemgetter("GlobalId"), roots))), hd, tablefmt="grid"))