import json
import boto3
import logging
from os import environ
from indra.statements import stmts_to_json, stmts_from_json
from indra.statements.io import stmts_to_json_file, stmts_from_json_file
from . import file_defaults, InvalidCorpusError, CACHE, default_bucket, \
default_key_base, default_profile
from .util import _stmts_dict_to_json, _json_to_stmts_dict, _json_dumper, \
_json_loader
logger = logging.getLogger(__name__)
[docs]class Corpus(object):
"""Represent a corpus of statements with curation.
Parameters
----------
statements : list[indra.statement.Statement]
A list of INDRA Statements to embed in the corpus.
raw_statements : list[indra.statement.Statement]
A List of raw statements forming the basis of the statements in
'statements'.
aws_name : str
The name of the profile in the AWS credential file to use. 'default' is
used by default.
Attributes
----------
statements : dict
A dict of INDRA Statements keyed by UUID.
raw_statements : list
A list of the raw statements
curations : dict
A dict keeping track of the curations submitted so far for Statement
UUIDs in the corpus.
meta_data : dict
A dict with meta data associated with the corpus
"""
def __init__(self, corpus_id, statements=None, raw_statements=None,
meta_data=None, aws_name=default_profile):
self.corpus_id = corpus_id
self.statements = {st.uuid: st for st in statements} if statements \
else {}
self.raw_statements = raw_statements if raw_statements else []
self.curations = {}
self.meta_data = meta_data if meta_data else {}
self.aws_name = aws_name
self._s3 = None
def _get_s3_client(self):
if self._s3 is None:
if environ.get('AWS_ACCESS_KEY_ID') and \
environ.get('AWS_SECRET_ACCESS_KEY'):
logger.info('Got credentials in environment for client')
self._s3 = boto3.session.Session(
aws_access_key_id=environ.get('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=environ.get('AWS_SECRET_ACCESS_KEY')
).client('s3')
else:
logger.info('Using stored AWS profile for client')
self._s3 = boto3.session.Session(
profile_name=self.aws_name).client('s3')
return self._s3
def __str__(self):
return 'Corpus(%s -> %s)' % (str(self.statements), str(self.curations))
def __repr__(self):
return str(self)
@classmethod
def load_from_s3(cls, corpus_id, aws_name=default_profile,
bucket=default_bucket, force_s3_reload=False,
raise_exc=False):
corpus = cls(corpus_id, statements=[], aws_name=aws_name)
corpus.s3_get(bucket, cache=(not force_s3_reload),
raise_exc=raise_exc)
return corpus
[docs] def s3_put(self, bucket=default_bucket, cache=True):
"""Push a corpus object to S3 in the form of three json files
The json files representing the object have S3 keys of the format
<key_base_name>/<name>/<file>.json
Parameters
----------
bucket : str
The S3 bucket to upload the Corpus to. Default: 'world-modelers'.
cache : bool
If True, also create a local cache of the corpus. Default: True.
Returns
-------
keys : tuple(str)
A tuple of three strings giving the S3 key to the pushed objects
"""
# Note that the S3 path to each json file is of the form
# <bucket>/indra_models/<corpus_id>/<file>.json"
s3key = '%s/%s/' % (default_key_base, self.corpus_id)
raw = s3key + file_defaults['raw'] + '.json'
sts = s3key + file_defaults['sts'] + '.json'
cur = s3key + file_defaults['cur'] + '.json'
meta = s3key + file_defaults['meta'] + '.json'
try:
s3 = self._get_s3_client()
# Structure and upload raw statements
self._s3_put_file(s3,
raw,
json.dumps(stmts_to_json(self.raw_statements),
indent=1),
bucket)
# Structure and upload assembled statements
self._s3_put_file(s3,
sts,
'\n'.join(json.dumps(jo) for jo in
_stmts_dict_to_json(self.statements)),
bucket)
# Structure and upload curations
self._s3_put_file(s3, cur, json.dumps(self.curations), bucket)
# Upload meta data
self._s3_put_file(s3, meta, json.dumps(self.meta_data), bucket)
if cache:
self._save_to_cache(raw, sts, cur)
return list((raw, sts, cur))
except Exception as e:
logger.exception('Failed to put on s3: %s' % e)
return None
@staticmethod
def _s3_put_file(s3, key, json_str, bucket=default_bucket):
"""Does the json.dumps operation for the the upload, i.e. json_obj
must be an object that can be turned into a bytestring using
json.dumps"""
logger.info('Uploading %s to S3' % key)
s3.put_object(Body=json_str, Bucket=bucket, Key=key)
def _save_to_cache(self, raw=None, sts=None, cur=None, meta=None):
"""Helper method that saves the current state of the provided
file keys"""
# Assuming file keys are full s3 keys:
# <base_name>/<dirname>/<file>.json
# Raw:
if raw:
rawf = CACHE.joinpath(raw.replace(default_key_base + '/', ''))
if not rawf.is_file():
rawf.parent.mkdir(exist_ok=True, parents=True)
rawf.touch(exist_ok=True)
_json_dumper(jsonobj=stmts_to_json(self.raw_statements),
fpath=rawf.as_posix())
# Assembled
if sts:
stsf = CACHE.joinpath(sts.replace(default_key_base + '/', ''))
if not stsf.is_file():
stsf.parent.mkdir(exist_ok=True, parents=True)
stsf.touch(exist_ok=True)
stmts_to_json_file(stmts=[s for _, s in self.statements.items()],
fname=stsf.as_posix(), format='jsonl')
# Curation
if cur:
curf = CACHE.joinpath(cur.replace(default_key_base + '/', ''))
if not curf.is_file():
curf.parent.mkdir(exist_ok=True, parents=True)
curf.touch(exist_ok=True)
_json_dumper(jsonobj=self.curations, fpath=curf.as_posix())
# Meta data
if meta:
metaf = CACHE.joinpath(meta.replace(default_key_base + '/', ''))
if not metaf.is_file():
metaf.parent.mkdir(exist_ok=True, parents=True)
metaf.touch(exist_ok=True)
_json_dumper(jsonobj=self.meta_data, fpath=metaf.as_posix())
[docs] def s3_get(self, bucket=default_bucket, cache=True,
raise_exc=False):
"""Fetch a corpus object from S3 in the form of three json files
The json files representing the object have S3 keys of the format
<s3key>/statements.json and <s3key>/raw_statements.json.
Parameters
----------
bucket : str
The S3 bucket to fetch the Corpus from. Default: 'world-modelers'.
cache : bool
If True, look for corpus in local cache instead of loading it
from s3. Default: True.
raise_exc : bool
If True, raise InvalidCorpusError when corpus failed to load
"""
# Note that the S3 path to each json file is of the form
# <bucket>/indra_models/<corpus_id>/<file>.json"
s3key = '%s/%s/' % (default_key_base, self.corpus_id)
raw = s3key + file_defaults['raw'] + '.json'
sts = s3key + file_defaults['sts'] + '.json'
cur = s3key + file_defaults['cur'] + '.json'
meta = s3key + file_defaults['meta'] + '.json'
try:
logger.info('Loading corpus: %s' % s3key)
s3 = self._get_s3_client()
# Get and process raw statements
raw_stmt_jsons = []
if cache:
raw_stmt_jsons = self._load_from_cache(raw) or []
if not raw_stmt_jsons:
raw_stmt_jsons_str = s3.get_object(
Bucket=bucket, Key=raw)['Body'].read()
raw_stmt_jsons = json.loads(raw_stmt_jsons_str) or []
self.raw_statements = stmts_from_json(raw_stmt_jsons)
# Get and process assembled statements from list to dict
json_stmts = []
if cache:
json_stmts = self._load_from_cache(sts) or []
if not json_stmts:
raw_str = s3.get_object(Bucket=bucket, Key=sts)[
'Body'].read().decode()
if len(raw_str.split('\n')) > 1:
json_stmts = [json.loads(s) for s in raw_str.split('\n')]
else:
json_stmts = json.loads(raw_str) or []
self.statements = _json_to_stmts_dict(json_stmts)
# Get and process curations if any
curation_json = {}
if cache:
curation_json = self._load_from_cache(cur) or {}
if not curation_json:
curation_json = json.loads(s3.get_object(
Bucket=bucket, Key=cur)['Body'].read()) or {}
self.curations = curation_json
meta_json = {}
try:
if cache:
meta_json = self._load_from_cache(meta)
if not meta_json:
meta_json = json.loads(s3.get_object(
Bucket=bucket, Key=meta)['Body'].read())
except Exception as e:
if isinstance(e, s3.exceptions.NoSuchKey):
logger.warning('No meta data found on s3')
else:
logger.warning('No meta data found')
meta_json = {}
self.meta_data = meta_json
except Exception as e:
if raise_exc:
raise InvalidCorpusError('Failed to get from s3: %s' % e)
else:
logger.warning('Failed to get from s3: %s' % e)
[docs] def upload_curations(self, look_in_cache=False,
save_to_cache=False, bucket=default_bucket):
"""Upload the current state of curations for the corpus
Parameters
----------
look_in_cache : bool
If True, when no curations are avaialbe check if there are
curations cached locally. Default: False
save_to_cache : bool
If True, also save current curation state to cache. If
look_in_cache is True, this option will have no effect. Default:
False.
bucket : str
The bucket to upload to. Default: 'world-modelers'.
"""
# Get curation file key
file_key = '%s/%s/%s.json' % (default_key_base, self.corpus_id,
file_defaults['cur'])
# First see if we have any curations, then check in cache if
# look_in_cache == True
if self.curations:
curations = self.curations
elif look_in_cache:
curations = self._load_from_cache(file_key)
else:
curations = None
# Only upload if we actually have any curations to upload
if curations:
self._s3_put_file(s3=self._get_s3_client(),
key=file_key,
json_str=json.dumps(curations),
bucket=bucket)
if self.curations and save_to_cache and not look_in_cache:
self._save_to_cache(cur=file_key)
[docs] def get_curations(self, look_in_cache=False):
"""Get curations for the corpus
Parameters
----------
look_in_cache : bool
If True, look in local cache if there are no curations loaded
Returns
-------
dict
The curations for this corpus, if any
"""
if self.curations:
curations = self.curations
elif look_in_cache:
file_key = '%s/%s/%s.json' % (default_key_base, self.corpus_id,
file_defaults['cur'])
curations = self._load_from_cache(file_key) or {}
else:
curations = {}
return curations
@staticmethod
def _load_from_cache(file_key):
# Assuming file_key is cleaned, contains the file name and contains
# the initial file base name:
# <base_name>/<dirname>/<file>.json
# Remove <base_name> and get local path to file
local_file = CACHE.joinpath(
'/'.join([s for s in file_key.split('/')[1:]]))
# Load json object
if local_file.is_file():
if local_file.as_posix().endswith(
'/' + file_defaults['sts'] + '.json'):
with open(local_file.as_posix(), 'r') as fh:
all_lines = fh.readlines()
if len(all_lines) > 1:
return [json.loads(s) for s in all_lines]
else:
return json.loads(all_lines[0])
else:
return _json_loader(local_file.as_posix())
return None
[docs] def to_json_file(self, fname, w_newlines=False):
"""Dump the statements to a file in json format
Parameters
----------
fname : str
A valid file path
w_newlines : bool
If True, the statements will be separated by newlines in the
file. Default: False.
"""
stmts_to_json_file(stmts=[s for _, s in self.statements.items()],
fname=fname,
format='jsonl' if w_newlines else 'json')