Source code for colour_datasets.records.zenodo
"""
Zenodo
======
Defines the objects implementing support for a *Zenodo* community and its
records:
- :class:`colour_datasets.Record`
- :class:`colour_datasets.Community`
"""
from __future__ import annotations
import json
import os
import shutil
import setuptools.archive_util
import stat
import tempfile
import textwrap
import urllib
import urllib.error
from collections import Mapping
from html.parser import HTMLParser
from pprint import pformat
from colour.hints import (
Any,
Boolean,
Callable,
Dict,
Generator,
Integer,
List,
Optional,
Union,
)
from colour.utilities import optional, warning
from colour_datasets.utilities import url_download, json_open
from colour_datasets.records import Configuration
__author__ = "Colour Developers"
__copyright__ = "Copyright 2019 Colour Developers"
__license__ = "New BSD License - https://opensource.org/licenses/BSD-3-Clause"
__maintainer__ = "Colour Developers"
__email__ = "colour-developers@colour-science.org"
__status__ = "Production"
__all__ = [
"Record",
"Community",
]
[docs]class Record:
"""
Define an object storing a *Zenodo* record data and providing methods to
sync it in a local repository.
Parameters
----------
data
*Zenodo* record data.
configuration
*Colour - Datasets* configuration.
Attributes
----------
- :attr:`colour_datasets.Record.data`
- :attr:`colour_datasets.Record.configuration`
- :attr:`colour_datasets.Record.repository`
- :attr:`colour_datasets.Record.id`
- :attr:`colour_datasets.Record.title`
Methods
-------
- :meth:`colour_datasets.Record.__init__`
- :meth:`colour_datasets.Record.__str__`
- :meth:`colour_datasets.Record.__repr__`
- :meth:`colour_datasets.Record.from_id`
- :meth:`colour_datasets.Record.synced`
- :meth:`colour_datasets.Record.pull`
- :meth:`colour_datasets.Record.remove`
Examples
--------
>>> record = Record(json_open('https://zenodo.org/api/records/3245883'))
# Doctests skip for Python 2.x compatibility.
>>> record.id # doctest: +SKIP
'3245883'
>>> record.title # doctest: +SKIP
'Camera Spectral Sensitivity Database - Jiang et al. (2013)'
"""
def __init__(
self, data: dict, configuration: Optional[Configuration] = None
):
self._data: dict = data
self._configuration: Configuration = optional(
configuration, Configuration()
)
@property
def data(self) -> dict:
"""
Getter property for the *Zenodo* record data.
Returns
-------
:class:`dict`
*Zenodo* record data.
"""
return self._data
@property
def configuration(self) -> Configuration:
"""
Getter property for the *Colour - Datasets* configuration.
Returns
-------
:class:`colour_datasets.Configuration`
*Colour - Datasets* configuration.
"""
return self._configuration
@property
def repository(self) -> str:
"""
Getter property for the *Zenodo* record local repository.
Returns
-------
:class:`str`
*Zenodo* record local repository.
"""
return os.path.join(self._configuration.repository, self.id)
@property
def id(self) -> str:
"""
Getter property for the *Zenodo* record id.
Returns
-------
:class:`str`
*Zenodo* record id.
"""
return str(self._data["id"])
@property
def title(self) -> str:
"""
Getter property for the *Zenodo* record title.
Returns
-------
:class:`str`
*Zenodo* record title.
"""
return self._data["metadata"]["title"]
def __str__(self) -> str:
"""
Return a formatted string representation of the *Zenodo* record.
Returns
-------
:class:`str`
Formatted string representation.
Examples
--------
>>> data = json_open('https://zenodo.org/api/records/3245883')
>>> print('\\n'.join(str(Record(data)).splitlines()[:4]))
Camera Spectral Sensitivity Database - Jiang et al. (2013) - 1.0.0
==================================================================
<BLANKLINE>
Record ID : 3245883
"""
def strip_html(text: str) -> str:
"""Strip *HTML* tags from given text."""
text = text.replace(" ", " ").replace("\n\n", " ")
parts: List[str] = []
parser = HTMLParser()
parser.handle_data = parts.append # type: ignore[assignment]
parser.feed(text)
return "".join(parts)
metadata = self._data["metadata"]
authors = "; ".join(
[creator["name"] for creator in metadata["creators"]]
)
files = self._data["files"]
description = "\n".join(
textwrap.wrap(strip_html(metadata["description"]), 79)
)
files = "\n".join(
[
f"- {file_data['key']} : {file_data['links']['self']}"
for file_data in sorted(files, key=lambda x: x["key"])
]
)
representation = (
f'{metadata["title"]} - {metadata["version"]}\n'
f'{"=" * (len(self.title) + 3 + len(metadata["version"]))}\n\n'
f"Record ID : {self.id}\n"
f"Authors : {authors}\n"
f'License : {metadata["license"]["id"]}\n'
f'DOI : {metadata["doi"]}\n'
f'Publication Date : {metadata["publication_date"]}\n'
f'URL : {self._data["links"]["html"]}\n\n'
f"Description\n-----------\n\n{description}\n\n"
f"Files\n-----\n\n{files}"
)
return representation
def __repr__(self) -> str:
"""
Return an evaluable string representation of the *Zenodo* record.
Returns
-------
:class:`str`
Evaluable string representation.
Examples
--------
>>> data = json_open('https://zenodo.org/api/records/3245883')
# Doctests skip for Python 2.x compatibility.
>>> print('\\n'.join(repr(Record(data)).splitlines()[:4]))
... # doctest: +SKIP
Record(
{'conceptdoi': '10.5281/zenodo.3245882',
'conceptrecid': '3245882',
'created': '2019-06-14T09:34:15.765924+00:00',
"""
data = "\n".join(
[f" {line}" for line in pformat(self._data).splitlines()]
)
configuration = "\n".join(
[
f" {line}"
for line in pformat(self._configuration).splitlines()
]
)
configuration = f" Configuration(\n{configuration}\n )"
return f"{self.__class__.__name__}(\n{data},\n{configuration}\n)"
[docs] @staticmethod
def from_id(
id_: str,
configuration: Optional[Configuration] = None,
retries: Integer = 3,
) -> Record:
"""
:class:`colour_datasets.Record` class factory that builds an instance
using given *Zenodo* record id.
Parameters
----------
id_
*Zenodo* record id.
configuration
configuration
*Colour - Datasets* configuration.
retries
Number of retries in case where a networking error occurs.
Returns
-------
:class:`colour_datasets.Record`
*Zenodo* record data.
Examples
--------
# Doctests skip for Python 2.x compatibility.
>>> Record.from_id('3245883').title
... # doctest: +SKIP
'Camera Spectral Sensitivity Database - Jiang et al. (2013)'
"""
configuration = (
Configuration() if configuration is None else configuration
)
if not os.path.exists(configuration.repository):
os.makedirs(configuration.repository)
record_url = f"{configuration.api_url}/records/{id_}"
return Record(json_open(record_url, retries), configuration)
[docs] def synced(self) -> Boolean:
"""
Return whether the *Zenodo* record data is synced to the local
repository.
Returns
-------
:class:`bool`
Whether the *Zenodo* record data is synced to the local repository.
Examples
--------
>>> from colour_datasets.utilities import suppress_stdout
>>> record = Record.from_id('3245883')
>>> with suppress_stdout():
... record.pull()
>>> record.synced()
True
>>> record.remove()
>>> record.synced()
False
"""
downloads_directory = os.path.join(
self.repository, self._configuration.downloads_directory
)
deflate_directory = os.path.join(
self.repository, self._configuration.deflate_directory
)
return all(
[
os.path.exists(downloads_directory),
os.path.exists(deflate_directory),
]
)
[docs] def pull(self, use_urls_txt_file: Boolean = True, retries: Integer = 3):
"""
Pull the *Zenodo* record data to the local repository.
Parameters
----------
use_urls_txt_file
Whether to use the *urls.txt* file: if such a file is present in
the *Zenodo* record data, the urls it defines take precedence over
the record data files. The later will be used in the eventuality
where the urls are not available.
retries
Number of retries in case where a networking error occurs or the
*MD5* hash is not matching.
Examples
--------
>>> from colour_datasets.utilities import suppress_stdout
>>> record = Record.from_id('3245883')
>>> record.remove()
>>> with suppress_stdout():
... record.pull()
>>> record.synced()
True
"""
print(f'Pulling "{self.title}" record content...')
if not os.path.exists(self._configuration.repository):
os.makedirs(self._configuration.repository)
downloads_directory = os.path.join(
self.repository, self._configuration.downloads_directory
)
if not os.path.exists(downloads_directory):
os.makedirs(downloads_directory)
# As much as possible, the original file urls are used, those are
# given by the content of :attr:`URLS_TXT_FILE` attribute file.
urls_txt = None
for file_data in self.data["files"]:
if file_data["key"] == self._configuration.urls_txt_file:
urls_txt = file_data
break
def urls_download(urls: Dict):
"""Download given urls."""
for url, md5 in urls.items():
filename = os.path.join(
downloads_directory,
urllib.parse.unquote( # type: ignore[attr-defined]
url.split("/")[-1]
),
)
url_download(url, filename, md5.split(":")[-1], retries)
try:
if use_urls_txt_file and urls_txt:
urls = {}
urls_txt_file = tempfile.mktemp()
url_download(
urls_txt["links"]["self"],
urls_txt_file,
urls_txt["checksum"].split(":")[-1],
retries,
)
with open(urls_txt_file) as json_file:
urls_txt_json = json.load(json_file)
for url, md5 in urls_txt_json["urls"].items():
urls[url] = md5.split(":")[-1]
shutil.copyfile(
urls_txt_file,
os.path.join(
downloads_directory, self._configuration.urls_txt_file
),
)
urls_download(urls)
else:
raise ValueError(
f'"{self._configuration.urls_txt_file}" file was not '
f"found in record data!"
)
except (urllib.error.URLError, ValueError) as error:
warning(
f"An error occurred using urls from "
f'"{self._configuration.urls_txt_file}" file: {error}\n'
f"Switching to record urls..."
)
urls = {}
for file_data in self.data["files"]:
if file_data["key"] == self._configuration.urls_txt_file:
continue
urls[file_data["links"]["self"]] = file_data["checksum"].split(
":"
)[-1]
urls_download(urls)
deflate_directory = os.path.join(
self.repository, self._configuration.deflate_directory
)
if os.path.exists(deflate_directory):
shutil.rmtree(deflate_directory, onerror=_remove_readonly)
shutil.copytree(downloads_directory, deflate_directory)
for filename in os.listdir(deflate_directory):
filename = os.path.join(deflate_directory, filename)
if not os.path.isfile(filename):
continue
basename, extension = os.path.splitext(filename)
basename = os.path.basename(basename)
if extension.lower() in (".zip", ".tar", ".gz", ".bz2"):
if basename.lower().endswith(".tar"):
basename = basename.rsplit(".", 1)[0]
basename = basename.replace(".", "_")
unpacking_directory = os.path.join(deflate_directory, basename)
print(f'Unpacking "{filename}" archive...')
setuptools.archive_util.unpack_archive(
filename, unpacking_directory
)
os.remove(filename)
with open(
os.path.join(self.repository, "record.json"), "w"
) as record_json:
json.dump(self.data, record_json, indent=4, sort_keys=True)
[docs] def remove(self):
"""
Remove the *Zenodo* record data local repository.
Examples
--------
>>> from colour_datasets.utilities import suppress_stdout
>>> record = Record.from_id('3245883')
>>> with suppress_stdout():
... record.pull()
>>> record.remove()
>>> record.synced()
False
"""
if os.path.exists(self.repository):
shutil.rmtree(self.repository, onerror=_remove_readonly)
[docs]class Community(Mapping):
"""
Define an object storing a *Zenodo* community data.
Parameters
----------
data
*Zenodo* community data.
configuration
*Colour - Datasets* configuration.
Attributes
----------
- :attr:`colour_datasets.Community.data`
- :attr:`colour_datasets.Community.configuration`
- :attr:`colour_datasets.Community.repository`
- :attr:`colour_datasets.Community.records`
Methods
-------
- :meth:`colour_datasets.Community.__init__`
- :meth:`colour_datasets.Community.__str__`
- :meth:`colour_datasets.Community.__repr__`
- :meth:`colour_datasets.Community.__getitem__`
- :meth:`colour_datasets.Community.__iter__`
- :meth:`colour_datasets.Community.__len__`
- :meth:`colour_datasets.Community.from_id`
- :meth:`colour_datasets.Community.synced`
- :meth:`colour_datasets.Community.pull`
- :meth:`colour_datasets.Community.remove`
Examples
--------
>>> community_data = json_open(
... 'https://zenodo.org/api/communities/colour-science-datasets')
>>> records_data = json_open(
... 'https://zenodo.org/api/records/?q=communities:'
... 'colour-science-datasets')
>>> community = Community({
... 'community': community_data,
... 'records': records_data,
... })
# Doctests skip for Python 2.x compatibility.
>>> community['3245883'].title # doctest: +SKIP
'Camera Spectral Sensitivity Database - Jiang et al. (2013)'
"""
def __init__(
self, data: Dict, configuration: Optional[Configuration] = None
):
self._data: Dict = data
self._configuration: Configuration = optional(
configuration, Configuration()
)
hits = self._data["records"]["hits"]["hits"]
self._records: Dict = {
str(hit["id"]): Record(hit, self._configuration) for hit in hits
}
@property
def data(self) -> Dict:
"""
Getter property for the *Zenodo* community data.
Returns
-------
:class:`dict`
*Zenodo* community data.
"""
return self._data
@property
def configuration(self) -> Configuration:
"""
Getter property for the *Colour - Datasets* configuration.
Returns
-------
:class:`colour_datasets.Configuration`
*Colour - Datasets* configuration.
"""
return self._configuration
@property
def repository(self) -> str:
"""
Getter property for the *Zenodo* community local repository.
Returns
-------
:class:`str`
*Zenodo* community local repository.
"""
return self._configuration.repository
@property
def records(self) -> Dict:
"""
Getter property for the *Zenodo* community records.
Returns
-------
:class:`dict`
*Zenodo* community records.
"""
return self._records
def __str__(self) -> str:
"""
Return a formatted string representation of the *Zenodo* community.
Returns
-------
:class:`str`
Formatted string representation.
Examples
--------
>>> community = Community.from_id('colour-science-datasets-tests')
>>> print('\\n'.join(str(community).splitlines()[:6]))
... # doctest: +ELLIPSIS
colour-science-datasets-tests
=============================
<BLANKLINE>
Datasets : ...
Synced : ...
URL : https://zenodo.org/communities/\
colour-science-datasets-tests/
"""
datasets = "\n".join(
[
(
f"[{'x' if dataset.synced() else ' '}] "
f"{dataset.id} : {dataset.title}"
)
for dataset in sorted(self.values(), key=lambda x: x.title)
]
)
synced = len(
[dataset for dataset in self.values() if dataset.synced()]
)
representation = (
f"{self._configuration.community}\n"
f'{"=" * len(self._configuration.community)}\n\n'
f"Datasets : {len(self)}\n"
f"Synced : {synced}\n"
f'URL : {self._data["community"]["links"]["html"]}\n\n'
f"Datasets\n--------\n\n"
f"{datasets}"
)
return representation
def __repr__(self) -> str:
"""
Return an evaluable string representation of the *Zenodo* community.
Returns
-------
:class:`str`
Evaluable string representation.
Examples
--------
>>> community = Community.from_id('colour-science-datasets-tests')
# Doctests skip for Python 2.x compatibility.
>>> print('\\n'.join(repr(community).splitlines()[:4]))
... # doctest: +SKIP
Community(
{'community': {'created': '2019-06-09T10:45:47.999975+00:00',
'curation_policy': '',
'description': '',
"""
data = "\n".join(
[f" {line}" for line in pformat(self._data).splitlines()]
)
configuration = "\n".join(
[
f" {line}"
for line in pformat(self._configuration).splitlines()
]
)
configuration = f" Configuration(\n{configuration}\n )"
return f"{self.__class__.__name__}(\n{data},\n{configuration}\n)"
def __getitem__(self, item: Union[str, Any]) -> Any:
"""
Return the *Zenodo* record at given id.
Parameters
----------
item
*Zenodo* recordid.
Returns
-------
:class:`colour_datasets.Record`
*Zenodo* record at given id.
Examples
--------
>>> community = Community.from_id('colour-science-datasets-tests')
# Doctests skip for Python 2.x compatibility.
>>> community['3245883'].title # doctest: +SKIP
'Camera Spectral Sensitivity Database - Jiang et al. (2013)'
"""
return self._records[item]
def __iter__(self) -> Generator:
"""
Iterate through the *Zenodo* community records.
Yields
------
Generator
*Zenodo* community records iterator.
Examples
--------
# Doctests skip for Python 2.x compatibility.
>>> for record in Community.from_id('colour-science-datasets-tests'):
... print(record) # doctest: +SKIP
"""
yield from self._records
def __len__(self) -> Integer:
"""
Return *Zenodo* community records count.
Returns
-------
:class:`int`
*Zenodo* community records count.
Examples
--------
# Doctests skip for Python 2.x compatibility.
>>> len(Community.from_id('colour-science-datasets-tests'))
... # doctest: +SKIP
3
"""
return len(self._records)
[docs] @staticmethod
def from_id(
id_: str,
configuration: Optional[Configuration] = None,
retries: Integer = 3,
) -> Community:
"""
:class:`colour_datasets.Community` class factory that builds an
instance using given *Zenodo* community id.
Parameters
----------
id_ :
*Zenodo* community id.
configuration :
configuration :
*Colour - Datasets* configuration.
retries :
Number of retries in case where a networking error occurs.
Returns
-------
:class:`colour_datasets.Community`
*Zenodo* community data.
Examples
--------
>>> community = Community.from_id('colour-science-datasets-tests')
# Doctests skip for Python 2.x compatibility.
>>> community['3245883'].title # doctest: +SKIP
'Camera Spectral Sensitivity Database - Jiang et al. (2013)'
"""
configuration = (
Configuration() if configuration is None else configuration
)
configuration.community = id_
if not os.path.exists(configuration.repository):
os.makedirs(configuration.repository)
community_url = (
f"{configuration.api_url}/communities/{configuration.community}"
)
# NOTE: Retrieving 512 datasets at most. This should cover needs for
# the foreseeable future. There is likely an undocumented hard limit on
# "Zenodo" server side.
records_url = (
f"{configuration.api_url}/records/"
f"?q=communities:{configuration.community}&size=512"
)
community_json_filename = os.path.join(
configuration.repository,
f"{configuration.community}-community.json",
)
records_json_filename = os.path.join(
configuration.repository, f"{configuration.community}-records.json"
)
try:
community_data = json_open(community_url, retries)
records_data = json_open(records_url, retries)
for key, value in {
community_json_filename: community_data,
records_json_filename: records_data,
}.items():
with open(key, "w") as json_file:
json.dump(value, json_file, indent=4, sort_keys=True)
except (urllib.error.URLError, ValueError):
warning(
'Retrieving the "{0}" community data failed '
"after {1} attempts, "
"attempting to use cached local data!"
)
if not all(
[
os.path.exists(community_json_filename),
os.path.exists(records_json_filename),
]
):
raise RuntimeError("Local files were not found, aborting!")
with open(community_json_filename) as json_file:
community_data = json.loads(json_file.read())
with open(records_json_filename) as json_file:
records_data = json.loads(json_file.read())
data = {
"community": community_data,
"records": records_data,
}
return Community(data, configuration)
[docs] def synced(self) -> Boolean:
"""
Return whether the *Zenodo* community data is synced to the local
repository.
Returns
-------
:class:`bool`
Whether the *Zenodo* community data is synced to the local
repository.
Examples
--------
>>> from colour_datasets.utilities import suppress_stdout
>>> community = Community.from_id('colour-science-datasets-tests')
>>> with suppress_stdout():
... community.pull() # doctest: +SKIP
>>> community.synced() # doctest: +SKIP
True
>>> community.remove()
>>> community.synced()
False
"""
return all([record.synced() for record in self._records.values()])
[docs] def pull(self, use_urls_txt_file: Boolean = True, retries: Integer = 3):
"""
Pull the *Zenodo* community data to the local repository.
Parameters
----------
use_urls_txt_file
Whether to use the *urls.txt* file: if such a file is present in
a *Zenodo* record data, the urls it defines take precedence over
the record data files. The later will be used in the eventuality
where the urls are not available.
retries
Number of retries in case where a networking error occurs or the
*MD5* hash is not matching.
Examples
--------
>>> from colour_datasets.utilities import suppress_stdout
>>> community = Community.from_id('colour-science-datasets-tests')
>>> community.remove()
>>> with suppress_stdout():
... community.pull() # doctest: +SKIP
>>> community.synced() # doctest: +SKIP
True
"""
if not os.path.exists(self._configuration.repository):
os.makedirs(self._configuration.repository)
for record in self._records.values():
record.pull(use_urls_txt_file, retries)
[docs] def remove(self):
"""
Remove the *Zenodo* community data local repository.
Examples
--------
>>> from colour_datasets.utilities import suppress_stdout
>>> community = Community.from_id('colour-science-datasets-tests')
>>> with suppress_stdout():
... community.pull() # doctest: +SKIP
>>> community.remove()
>>> community.synced()
False
"""
if os.path.exists(self.repository):
shutil.rmtree(self.repository, onerror=_remove_readonly)
def _remove_readonly(function: Callable, path: str, excinfo: Any):
"""
Error handler for :func:`shutil.rmtree` definition that removes read-only
files.
"""
os.chmod(path, stat.S_IWRITE)
function(path)