"""
XML representers.
This file is part of the everest project.
See LICENSE.txt for licensing, CONTRIBUTORS.txt for contributor information.
Created on May 19, 2011.
"""
from everest.mime import XmlMime
from everest.representers.base import RepresentationGenerator
from everest.representers.base import RepresentationParser
from everest.representers.base import ResourceRepresenter
from everest.representers.config import RepresenterConfiguration
from everest.representers.converters import BooleanConverter
from everest.representers.converters import ConverterRegistry
from everest.representers.converters import DateTimeConverter
from everest.representers.dataelements import CollectionDataElement
from everest.representers.dataelements import LinkedDataElement
from everest.representers.dataelements import MemberDataElement
from everest.representers.interfaces import ILinkedDataElement
from everest.representers.mapping import MappingRegistry
from everest.representers.utils import get_mapping_registry
from everest.resources.attributes import ResourceAttributeKinds
from everest.resources.kinds import ResourceKinds
from everest.resources.link import Link
from everest.resources.utils import get_collection_class
from everest.resources.utils import get_member_class
from everest.resources.utils import provides_member_resource
from everest.url import resource_to_url
from lxml import etree
from lxml import objectify
from pkg_resources import resource_filename # pylint: disable=E0611
from zope.interface import providedBy as provided_by # pylint: disable=E0611,F0401
import datetime
__docformat__ = 'reStructuredText en'
__all__ = ['XmlCollectionDataElement',
'XmlConverterRegistry',
'XmlLinkedDataElement',
'XmlMappingRegistry',
'XmlMemberDataElement',
'XmlParserFactory',
'XmlRepresentationGenerator',
'XmlRepresentationParser',
'XmlRepresenterConfiguration',
'XmlResourceRepresenter',
]
XML_NS_OPEN_SEARCH = 'http://a9.com/-/spec/opensearch/1.1/'
XML_NS_XSI = 'http://www.w3.org/2001/XMLSchema-instance'
XML_TAG_OPTION = 'xml_tag'
XML_SCHEMA_OPTION = 'xml_schema'
XML_NAMESPACE_OPTION = 'xml_ns'
XML_PREFIX_OPTION = 'xml_prefix'
NAMESPACE_MAPPING_OPTION = 'namespace'
class XmlConverterRegistry(ConverterRegistry):
pass
XmlConverterRegistry.register(datetime.datetime, DateTimeConverter)
XmlConverterRegistry.register(bool, BooleanConverter)
class XmlRepresentationParser(RepresentationParser):
def run(self):
# Create an XML schema.
schema_loc = self.get_option('schema_location')
parser = XmlParserFactory.create(schema_location=schema_loc)
try:
tree = objectify.parse(self._stream, parser)
except etree.XMLSyntaxError, err:
raise SyntaxError('Could not parse XML document for schema %s.'
'\n%s' % (schema_loc, err.msg))
return tree.getroot()[0]
class XmlRepresentationGenerator(RepresentationGenerator):
def run(self, data_element):
objectify.deannotate(data_element)
etree.cleanup_namespaces(data_element)
encoding = self.get_option('encoding')
self._stream.write(etree.tostring(data_element,
pretty_print=True,
encoding=encoding,
xml_declaration=True))
class XmlParserFactory(object):
__parser = None
@classmethod
def create(cls, schema_location=None):
if not schema_location is None:
schema = cls.__get_xml_schema(schema_location)
parser = objectify.makeparser(schema=schema)
else:
parser = objectify.makeparser()
# Get the class lookup from the mapping registry.
mp_reg = get_mapping_registry(XmlMime)
parser.set_element_class_lookup(mp_reg.parsing_lookup)
return parser
@classmethod
def __get_xml_schema(cls, xml_schema_path):
try:
doc = etree.parse(resource_filename(*xml_schema_path.split(':')))
except etree.XMLSyntaxError, err:
raise SyntaxError('Could not parse XML schema %s.\n%s' %
(xml_schema_path, err.msg))
try:
schema = etree.XMLSchema(doc)
except etree.XMLSchemaParseError, err:
raise SyntaxError('Invalid XML schema.\n Parser message: %s'
% err.message)
return schema
class XmlResourceRepresenter(ResourceRepresenter):
content_type = XmlMime
#: The encoding to use for reading and writing XML.
ENCODING = 'utf-8'
@classmethod
def make_mapping_registry(cls):
return XmlMappingRegistry()
def _make_representation_parser(self, stream, resource_class, mapping):
parser = XmlRepresentationParser(stream, resource_class, mapping)
mp = self._mapping
xml_schema = mp.configuration.get_option(XML_SCHEMA_OPTION)
parser.set_option('schema_location', xml_schema)
return parser
def _make_representation_generator(self, stream, resource_class, mapping):
generator = XmlRepresentationGenerator(stream, resource_class, mapping)
generator.set_option('encoding', self.ENCODING)
return generator
class _XmlDataElementMixin(object):
@classmethod
def create(cls, ns_map=None):
return cls._create(ns_map)
@classmethod
def create_from_resource(cls, resource, ns_map=None): # ignore resource pylint:disable=W0613,W0221
return cls._create(ns_map)
@classmethod
def _create(cls, ns_map):
if ns_map is None:
mp_reg = get_mapping_registry(XmlMime)
ns_map = mp_reg.namespace_map
cls_xml_tag = cls.mapping.configuration.get_option(XML_TAG_OPTION)
if cls_xml_tag is None:
raise ValueError('No XML tag registered for mapped class '
'%s.' % cls.mapping.mapped_class)
cls_xml_ns = \
cls.mapping.configuration.get_option(XML_NAMESPACE_OPTION)
if not cls_xml_ns is None:
tag = "{%s}%s" % (cls_xml_ns, cls_xml_tag)
# FIXME: is this really necessary?
ns_map[None] = cls_xml_ns
else:
tag = cls_xml_tag
el_fac = XmlParserFactory.create().makeelement
return el_fac(tag, nsmap=ns_map)
class XmlMemberDataElement(objectify.ObjectifiedElement,
_XmlDataElementMixin, MemberDataElement):
def get_mapped_nested(self, attr):
# We only allow *one* child with the given name.
q_tag = self.__get_q_tag(attr)
child_it = self.iterchildren(q_tag)
try:
child = child_it.next()
except StopIteration:
child = None
else:
try:
child_it.next()
except StopIteration:
pass
else:
# This should never happen.
raise ValueError('More than one child for member '
'attribute "%s" found.' % attr) # pragma: no cover
# Link handling: look for wrapper tag with *one* link child.
if child.countchildren() == 1:
grand_child = child.getchildren()[0]
if ILinkedDataElement in provided_by(grand_child):
# We inject the id attribute from the wrapper element.
str_xml = child.get('id')
if not str_xml is None:
grand_child.set('id', str_xml)
child = grand_child
return child
def set_mapped_nested(self, attr, data_element):
data_element.tag = self.__get_q_tag(attr)
self.append(data_element)
def get_mapped_terminal(self, attr):
if attr.repr_name == 'id':
# The "special" id attribute.
xml_val = self.get('id')
if not xml_val is None:
val = attr.value_type(xml_val)
else:
val = None
else:
q_tag = self.__get_q_tag(attr)
val_el = getattr(self, q_tag, None)
if not val_el is None:
val = XmlConverterRegistry.convert_from_representation(
val_el.text,
attr.value_type)
else:
val = None
return val
def set_mapped_terminal(self, attr, value):
if attr.repr_name == 'id':
# The "special" id attribute.
self.set('id', str(value))
else:
q_tag = self.__get_q_tag(attr)
xml_value = XmlConverterRegistry.convert_to_representation(
value,
attr.value_type)
setattr(self, q_tag, xml_value)
@property
def data(self):
data_map = {}
for child in self.iterchildren():
idx = child.tag.find('}')
if idx != -1:
tag = child.tag[idx + 1:]
else:
tag = child.tag
data_map[tag] = child.text
return data_map
def __get_q_tag(self, attr):
if not attr.namespace is None:
q_tag = '{%s}%s' % (attr.namespace, attr.repr_name)
else:
if attr.kind == ResourceAttributeKinds.TERMINAL:
xml_ns = \
self.mapping.configuration.get_option(XML_NAMESPACE_OPTION)
else:
if attr.kind == ResourceAttributeKinds.MEMBER:
attr_type = get_member_class(attr.value_type)
elif attr.kind == ResourceAttributeKinds.COLLECTION:
attr_type = get_collection_class(attr.value_type)
mp = self.mapping.mapping_registry.find_mapping(attr_type)
xml_ns = mp.configuration.get_option(XML_NAMESPACE_OPTION)
if not xml_ns is None:
q_tag = '{%s}%s' % (xml_ns, attr.repr_name)
else:
q_tag = attr.repr_name
return q_tag
class XmlCollectionDataElement(objectify.ObjectifiedElement,
_XmlDataElementMixin, CollectionDataElement):
def add_member(self, data_element):
self.append(data_element)
def get_members(self):
return self.iterchildren()
def __len__(self):
return self.countchildren()
class XmlLinkedDataElement(objectify.ObjectifiedElement, LinkedDataElement):
@classmethod
def create(cls, url, kind, relation=None, title=None, **options):
# mp_reg = get_mapping_registry(XmlMime)
# ns_map = mp_reg.namespace_map
xml_ns = options[XML_NAMESPACE_OPTION]
el_fac = XmlParserFactory.create().makeelement
tag = '{%s}link' % xml_ns
link_el = el_fac(tag)
link_el.set('href', url)
link_el.set('kind', kind)
if not relation is None:
link_el.set('rel', relation)
if not title is None:
link_el.set('title', title)
return link_el
@classmethod
def create_from_resource(cls, resource):
# Create the wrapping element.
mp_reg = get_mapping_registry(XmlMime)
mp = mp_reg.find_or_create_mapping(type(resource))
xml_ns = mp.configuration.get_option(XML_NAMESPACE_OPTION)
options = {XML_NAMESPACE_OPTION:xml_ns}
rc_data_el = mp.create_data_element_from_resource(resource)
if provides_member_resource(resource):
link_el = cls.create(resource_to_url(resource),
ResourceKinds.MEMBER,
relation=resource.relation,
title=resource.title,
**options)
rc_data_el.set('id', str(resource.id))
rc_data_el.append(link_el)
else: # collection resource.
# Collection links only get an actual link element if they
# contain any members.
link_el = cls.create(resource_to_url(resource),
ResourceKinds.COLLECTION,
relation=resource.relation,
title=resource.title,
**options)
rc_data_el.append(link_el)
return rc_data_el
def get_url(self):
return self.get('href')
def get_kind(self):
return self.get('kind')
def get_relation(self):
return self.get('rel')
def get_title(self):
return self.get('title')
def get_id(self):
return self.get('id')
[docs]class XmlRepresenterConfiguration(RepresenterConfiguration):
"""
Specialized configuration class for XML representers.
Allowed configuration attribute names:
xml_tag :
The XML tag to use for the represented data element class.
xml_schema :
The XML schema to use for the represented data element class.
xml_ns :
The XML namespace to use for the represented data element class.
xml_prefix :
The XML namespace prefix to use for the represented data element class.
"""
_default_config_options = \
dict(RepresenterConfiguration._default_config_options.items()
+ [(XML_TAG_OPTION, None), (XML_SCHEMA_OPTION, None),
(XML_NAMESPACE_OPTION, None), (XML_PREFIX_OPTION, None)])
_default_attributes_options = \
dict(RepresenterConfiguration._default_attributes_options.items()
+ [(NAMESPACE_MAPPING_OPTION, None)])
[docs]class XmlMappingRegistry(MappingRegistry):
"""
Registry for XML mappings.
"""
member_data_element_base_class = XmlMemberDataElement
collection_data_element_base_class = XmlCollectionDataElement
linked_data_element_base_class = XmlLinkedDataElement
configuration_class = XmlRepresenterConfiguration
#: Static namespace prefix: namespace map.
NS_MAP = dict(xsi=XML_NS_XSI)
def __init__(self):
MappingRegistry.__init__(self)
self.__ns_map = self.__class__.NS_MAP.copy()
self.__ns_lookup = None
def _initialize(self):
# Create and register the linked data element class.
configuration = self.configuration_class()
mapping = self.create_mapping(Link, configuration)
self.set_mapping(mapping)
def set_mapping(self, mapping):
# First, record the namespace and prefix, if necessary.
xml_ns = mapping.configuration.get_option(XML_NAMESPACE_OPTION)
if not xml_ns is None:
xml_prefix = mapping.configuration.get_option(XML_PREFIX_OPTION)
if not xml_prefix is None:
ns = self.__ns_map.get(xml_prefix)
if ns is None:
# New prefix - register.
self.__ns_map[xml_prefix] = xml_ns
elif xml_ns != ns:
raise ValueError('Prefix "%s" is already registered for '
'namespace %s.' % (xml_prefix, ns))
# Make sure we rebuild the lookup.
if not self.__ns_lookup is None:
self.__ns_lookup = None
MappingRegistry.set_mapping(self, mapping)
@property
def namespace_map(self):
return self.__ns_map.copy()
@property
def parsing_lookup(self):
if self.__ns_lookup is None:
self.__ns_lookup = self.__create_parsing_lookup()
return self.__ns_lookup
def __create_parsing_lookup(self):
lookup = etree.ElementNamespaceClassLookup(
objectify.ObjectifyElementClassLookup())
for mapping in self.get_mappings():
de_cls = mapping.data_element_class
if issubclass(de_cls, XmlLinkedDataElement):
continue
xml_ns = mapping.configuration.get_option(XML_NAMESPACE_OPTION)
xml_tag = mapping.configuration.get_option(XML_TAG_OPTION)
ns_cls_map = lookup.get_namespace(xml_ns)
if xml_tag in ns_cls_map:
raise ValueError('Duplicate tag "%s" in namespace "%s" '
'(trying to register class %s)'
% (xml_tag, xml_ns, de_cls))
ns_cls_map[xml_tag] = de_cls
ns_cls_map['link'] = XmlLinkedDataElement
return lookup