"""
Input/Output operations on resources.
This file is part of the everest project.
See LICENSE.txt for licensing, CONTRIBUTORS.txt for contributor information.
Created on Jan 27, 2012.
"""
from StringIO import StringIO
from collections import OrderedDict
from everest.mime import CsvMime
from everest.mime import MimeTypeRegistry
from everest.representers.utils import as_representer
from everest.resources.utils import get_member_class
from everest.resources.utils import new_stage_collection
from everest.resources.utils import provides_member_resource
from pygraph.algorithms.sorting import topological_sorting # pylint: disable=E0611,F0401
from pygraph.classes.digraph import digraph # pylint: disable=E0611,F0401
from urlparse import urlparse
from zipfile import ZIP_DEFLATED
from zipfile import ZipFile
import os
__docformat__ = 'reStructuredText en'
__all__ = ['ConnectedResourcesSerializer',
'ResourceGraph',
'build_resource_dependency_graph',
'build_resource_graph',
'dump_resource',
'dump_resource_to_files',
'dump_resource_to_zipfile',
'find_connected_resources',
'get_collection_filename',
'get_collection_name',
'get_read_collection_path',
'get_write_collection_path',
'load_collection_from_file',
'load_collection_from_stream',
'load_collection_from_url',
'load_collections_from_zipfile',
]
[docs]def load_collection_from_url(collection, url,
content_type=None, resolve_urls=True):
"""
Loads a collection resource of the given registered resource type from a
representation contained in the given URL.
:returns: collection resource
"""
parsed = urlparse(url)
if parsed.scheme == 'file': # pylint: disable=E1101
# Assume a local path.
load_collection_from_file(collection, parsed.path, # pylint: disable=E1101
content_type=content_type,
resolve_urls=resolve_urls)
else:
raise ValueError('Unsupported URL scheme "%s".' % parsed.scheme) # pylint: disable=E1101
[docs]def load_collection_from_file(collection, filename,
content_type=None, resolve_urls=True):
"""
Loads resources from the specified file into the given collection
resource.
If no content type is provided, an attempt is made to look up the
extension of the given filename in the MIME content type registry.
"""
if content_type is None:
ext = os.path.splitext(filename)[1]
try:
content_type = MimeTypeRegistry.get_type_for_extension(ext)
except KeyError:
raise ValueError('Could not infer MIME type for file extension '
'"%s".' % ext)
load_collection_from_stream(collection, open(filename, 'rU'),
content_type, resolve_urls=resolve_urls)
[docs]def load_collection_from_stream(collection, stream, content_type,
resolve_urls=True):
"""
Loads resources from the given stream into the given collection resource.
"""
rpr = as_representer(collection, content_type)
with stream:
data_el = rpr.data_from_stream(stream)
mem_coll = rpr.resource_from_data(data_el, resolve_urls=resolve_urls)
for mb in mem_coll:
collection.add(mb)
[docs]def load_collections_from_zipfile(collections, zipfile, resolve_urls=True):
"""
Loads resources contained in the given ZIP archive into each of the
given collections.
The ZIP file is expected to contain a list of file names obtained with
the :func:`get_collection_filename` function, each pointing to a file
of zipped collection resource data.
:param collections: sequence of collection resources
:param str zipfile: ZIP file name
:param bool resolve_urls: Flag indicating if URLs should be resolved
during loading.
"""
with ZipFile(zipfile) as zipf:
names = zipf.namelist()
name_map = dict([(os.path.splitext(name)[0], index)
for (index, name) in enumerate(names)])
for collection in collections:
coll_name = get_collection_name(collection)
index = name_map.get(coll_name)
if index is None:
continue
coll_fn = names[index]
ext = os.path.splitext(coll_fn)[1]
try:
content_type = MimeTypeRegistry.get_type_for_extension(ext)
except KeyError:
raise ValueError('Could not infer MIME type for file '
'extension "%s".' % ext)
load_collection_from_stream(collection, zipf.open(coll_fn, 'r'),
content_type,
resolve_urls=resolve_urls)
[docs]def dump_resource(resource, stream, content_type=None):
"""
Dumps the given resource to the given stream using the specified MIME
content type (defaults to CSV).
"""
if content_type is None:
content_type = CsvMime
rpr = as_representer(resource, content_type)
rpr.to_stream(resource, stream)
[docs]def build_resource_dependency_graph(resource_classes,
include_backrefs=False):
"""
Builds a graph of dependencies among the given resource classes.
The dependency graph is a directed graph with member resource classes as
nodes. An edge between two nodes represents a member or collection
attribute.
:param resource_classes: resource classes to determine interdependencies
of.
:type resource_classes: sequence of registered resources.
:param bool include_backrefs: flag indicating if dependencies
introduced by back-references (e.g., a child resource referencing its
parent) should be included in the dependency graph.
"""
def visit(mb_cls, grph, path, incl_backrefs):
for attr_name in mb_cls.get_attribute_names():
if mb_cls.is_terminal(attr_name):
continue
child_descr = getattr(mb_cls, attr_name)
child_mb_cls = get_member_class(child_descr.attr_type)
# We do not follow cyclic references back to a resource class
# that is last in the path.
if len(path) > 0 and child_mb_cls is path[-1] \
and not incl_backrefs:
continue
if not grph.has_node(child_mb_cls):
grph.add_node(child_mb_cls)
path.append(mb_cls)
visit(child_mb_cls, grph, path, incl_backrefs)
path.pop()
if not grph.has_edge((mb_cls, child_mb_cls)):
grph.add_edge((mb_cls, child_mb_cls))
dep_grph = digraph()
for resource_class in resource_classes:
mb_cls = get_member_class(resource_class)
if not dep_grph.has_node(mb_cls):
dep_grph.add_node(mb_cls)
visit(mb_cls, dep_grph, [], include_backrefs)
return dep_grph
[docs]def build_resource_graph(resource, dependency_graph=None):
"""
Traverses the graph of resources that is reachable from the given
resource.
If a resource dependency graph is given, links to other resources are
only followed if the dependency graph has an edge connecting the two
corresponding resource classes; otherwise, a default graph is built
which ignores all direct cyclic resource references.
:resource: a :class:`thelma.resources.MemberResource` instance.
:returns: a :class:`ResourceGraph` instance representing the graph of
resources reachable from the given resource.
"""
def visit(rc, grph, dep_grph):
mb_cls = type(rc)
attr_map = mb_cls.get_attributes()
for attr_name, attr in attr_map.iteritems():
if mb_cls.is_terminal(attr_name):
continue
# Only follow the resource attribute if the dependency graph
# has an edge here.
child_mb_cls = get_member_class(attr.value_type)
if not dep_grph.has_edge((mb_cls, child_mb_cls)):
continue
child_rc = getattr(rc, attr_name)
if mb_cls.is_collection(attr_name):
for child_mb in child_rc:
if not grph.has_node(child_mb): # Ignore cyclic references.
grph.add_node(child_mb)
grph.add_edge((rc, child_mb))
visit(child_mb, grph, dep_grph)
else: # Member.
if not grph.has_node(child_rc): # Ignore cyclic references.
grph.add_node(child_rc)
grph.add_edge((rc, child_rc))
visit(child_rc, grph, dep_grph)
if dependency_graph is None:
dependency_graph = build_resource_dependency_graph(
[get_member_class(resource)])
graph = ResourceGraph()
if provides_member_resource(resource):
rcs = [resource]
else:
rcs = resource
for rc in rcs:
graph.add_node(rc)
visit(rc, graph, dependency_graph)
return graph
[docs]def find_connected_resources(resource, dependency_graph=None):
"""
Collects all resources connected to the given resource and returns a
dictionary mapping member resource classes to new collections containing
the members found.
"""
# Build a resource_graph.
resource_graph = \
build_resource_graph(resource,
dependency_graph=dependency_graph)
# Build an ordered dictionary of collections.
collections = OrderedDict()
for mb in topological_sorting(resource_graph):
mb_cls = get_member_class(mb)
coll = collections.get(mb_cls)
if coll is None:
# Create new collection.
coll = new_stage_collection(mb)
collections[mb_cls] = coll
coll.add(mb)
return collections
[docs]class ResourceGraph(digraph):
"""
Specialized digraph for resource instances.
Nodes are resources, edges represent relationships between resources.
Since resources are wrapper objects generated on the fly, the presence
of a resource in the graph is determined by its underlying entity, using
the entity class and its ID as a key.
"""
def __init__(self):
digraph.__init__(self)
self.__entities = set()
def add_node(self, node, attrs=None):
digraph.add_node(self, node, attrs=attrs)
key = self.__make_key(node)
self.__entities.add(key)
# def del_node(self, node):
# digraph.del_node(self, node)
# key = self.__make_key(node)
# self.__entities.remove(key)
def has_node(self, node):
return self.__make_key(node) in self.__entities
def __make_key(self, obj):
ent = obj.get_entity()
return (type(ent), ent.id)
[docs]class ConnectedResourcesSerializer(object):
"""
Serializer for a graph of connected resources.
"""
[docs] def __init__(self, content_type, dependency_graph=None):
"""
:param content_type: MIME content type to use for representations
:type content_type: object implementing
:class:`everest.interfaces.IMime`.
:param dependency_graph: graph determining which resource connections
to follow when the graph of connected resources for a given
resource is built.
"""
self.__content_type = content_type
self.__dependency_graph = dependency_graph
[docs] def to_strings(self, resource):
"""
Dumps the all resources reachable from the given resource to a map of
string representations using the specified content_type (defaults
to CSV).
:returns: dictionary mapping resource member classes to string
representations
"""
collections = \
find_connected_resources(resource,
dependency_graph=self.__dependency_graph)
# Build a map of representations.
rpr_map = OrderedDict()
for (mb_cls, coll) in collections.iteritems():
strm = StringIO('w')
dump_resource(coll, strm, content_type=self.__content_type)
rpr_map[mb_cls] = strm.getvalue()
return rpr_map
[docs] def to_files(self, resource, directory):
"""
Dumps the given resource and all resources linked to it into a set of
representation files in the given directory.
"""
collections = \
find_connected_resources(resource,
dependency_graph=self.__dependency_graph)
for (mb_cls, coll) in collections.iteritems():
fn = get_write_collection_path(mb_cls,
self.__content_type,
directory=directory)
with open(os.path.join(directory, fn), 'wb') as strm:
dump_resource(coll, strm, content_type=self.__content_type)
[docs] def to_zipfile(self, resource, zipfile):
"""
Dumps the given resource and all resources linked to it into the given
ZIP file.
"""
rpr_map = self.to_strings(resource)
with ZipFile(zipfile, 'w') as zipf:
for (mb_cls, rpr_string) in rpr_map.iteritems():
fn = get_collection_filename(mb_cls, self.__content_type)
zipf.writestr(fn, rpr_string, compress_type=ZIP_DEFLATED)
[docs]def dump_resource_to_files(resource, content_type=None, directory=None):
"""
Convenience function. See
:meth:`thelma.resources.io.ConnectedResourcesSerializer.to_files` for
details.
If no directory is given, the current working directory is used.
The given context type defaults to CSV.
"""
if directory is None:
directory = os.getcwd() # pragma: no cover
if content_type is None:
content_type = CsvMime
srl = ConnectedResourcesSerializer(content_type)
srl.to_files(resource, directory=directory)
[docs]def dump_resource_to_zipfile(resource, zipfile, content_type=None):
"""
Convenience function. See
:meth:`thelma.resources.io.ConnectedResourcesSerializer.to_zipfile` for
details.
The given context type defaults to CSV.
"""
if content_type is None:
content_type = CsvMime
srl = ConnectedResourcesSerializer(content_type)
srl.to_zipfile(resource, zipfile)
def get_collection_name(rc_class):
coll_cls = get_member_class(rc_class)
collection_name = coll_cls.relation.split('/')[-1]
return "%s-collection" % collection_name
def get_collection_filename(rc_class, content_type=None):
if content_type is None:
content_type = CsvMime
return "%s%s" % (get_collection_name(rc_class),
content_type.file_extension)
def get_write_collection_path(collection_class, content_type, directory=None):
if directory is None:
directory = os.getcwd() # pragma: no cover
coll_fn = get_collection_filename(collection_class, content_type)
return os.path.join(directory, coll_fn)
def get_read_collection_path(collection_class, content_type, directory=None):
if directory is None:
directory = os.getcwd() # pragma: no cover
coll_fn = get_collection_filename(collection_class, content_type)
fn = os.path.join(directory, coll_fn)
if os.path.isfile(fn):
result = fn
else:
result = None
return result