__all__ = ['IndraDBRestSearchProcessor', 'IndraDBRestHashProcessor']
import logging
from copy import deepcopy
from threading import Thread
from datetime import datetime
from collections import OrderedDict, defaultdict
from indra.statements import stmts_from_json, get_statement_by_name, \
get_all_descendants
from indra.sources.indra_db_rest.util import submit_query_request, \
submit_statement_request
from indra.sources.indra_db_rest.exceptions import IndraDBRestResponseError
logger = logging.getLogger(__name__)
class RemoveParam(object):
pass
class IndraDBRestProcessor(object):
"""The generalized packaging for query responses.
General Parameters
------------------
timeout : positive int or None
If an int, block until the work is done and statements are retrieved, or
until the timeout has expired, in which case the results so far will be
returned in the response object, and further results will be added in
a separate thread as they become available. If simple_response is True,
all statements available will be returned. Otherwise (if None), block
indefinitely until all statements are retrieved. Default is None.
ev_limit : int or None
Limit the amount of evidence returned per Statement. Default is 10.
best_first : bool
If True, the preassembled statements will be sorted by the amount of
evidence they have, and those with the most evidence will be
prioritized. When using `max_stmts`, this means you will get the "best"
statements. If False, statements will be queried in arbitrary order.
tries : int > 0
Set the number of times to try the query. The database often caches
results, so if a query times out the first time, trying again after a
timeout will often succeed fast enough to avoid a timeout. This can also
help gracefully handle an unreliable connection, if you're willing to
wait. Default is 2.
max_stmts : int or None
Select the maximum number of statements to return. When set less than
1000 the effect is much the same as setting persist to false, and will
guarantee a faster response. Default is None.
Attributes
----------
statements : list[:py:class:`indra.statements.Statement`]
A list of INDRA Statements that will be filled once all queries have
been completed.
"""
_override_default_api_params = {}
def __init__(self, *args, **kwargs):
self.statements = []
self.__statement_jsons = {}
self.__evidence_counts = {}
self.__source_counts = {}
# Define the basic generic defaults.
default_api_params = dict(timeout=None, ev_limit=10, best_first=True,
tries=2, max_stmts=None)
# Update with any overrides.
default_api_params.update(self._override_default_api_params)
# Some overrides may be RemoveParam objects, indicating the key should
# be removed. Filter those out.
default_api_params = {k: v for k, v in default_api_params.items()
if not isinstance(v, RemoveParam)}
# Update the kwargs to include these default values, if not already
# specified by the user.
kwargs.update((k, kwargs.get(k, default_api_params[k]))
for k in default_api_params.keys())
self._run(*args, **kwargs)
return
def get_ev_count(self, stmt):
"""Get the total evidence count for a statement."""
return self.get_ev_count_by_hash(stmt.get_hash(shallow=True))
def get_ev_count_by_hash(self, stmt_hash):
"""Get the total evidence count for a statement hash."""
return self.__evidence_counts.get(stmt_hash, 0)
def get_source_counts(self):
"""Get the source counts as a dict per statement hash."""
return deepcopy(self.__source_counts)
def get_source_count(self, stmt):
"""Get the source counts for a given statement."""
return self.get_source_count_by_hash(stmt.get_hash(shallow=True))
def get_source_count_by_hash(self, stmt_hash):
"""Get the source counts for a given statement."""
return self.__source_counts.get(stmt_hash, {})
def get_ev_counts(self):
"""Get a dictionary of evidence counts."""
return self.__evidence_counts.copy()
def get_hash_statements_dict(self):
"""Return a dict of Statements keyed by hashes."""
res = {stmt_hash: stmts_from_json([stmt])[0]
for stmt_hash, stmt in self.__statement_jsons.items()}
return res
def merge_results(self, other_processor):
"""Merge the results of this processor with those of another."""
if not isinstance(other_processor, self.__class__):
raise ValueError("Can only extend with another %s instance."
% self.__class__.__name__)
self.statements.extend(other_processor.statements)
self._merge_json(other_processor.__statement_jsons,
other_processor.__evidence_counts,
other_processor.__source_counts)
return
def _merge_json(self, stmt_json, ev_counts, source_counts):
"""Merge these statement jsons with new jsons."""
# Where there is overlap, there _should_ be agreement.
self.__evidence_counts.update(standardize_counts(ev_counts))
# We turn source counts into an int-keyed dict and update it that way
self.__source_counts.update(standardize_counts(source_counts))
for k, sj in stmt_json.items():
if k not in self.__statement_jsons:
self.__statement_jsons[k] = sj # This should be most of them
else:
# This should only happen rarely.
for evj in sj['evidence']:
self.__statement_jsons[k]['evidence'].append(evj)
return
def _compile_statements(self):
"""Generate statements from the jsons."""
self.statements = stmts_from_json(self.__statement_jsons.values())
def _unload_and_merge_resp(self, resp):
resp_dict = resp.json(object_pairs_hook=OrderedDict)
stmts_json = resp_dict['statements']
ev_totals = resp_dict['evidence_totals']
source_counts = resp_dict['source_counts']
eos = resp_dict['end_of_statements']
limit = resp_dict['statement_limit']
num_returned = resp_dict['statements_returned']
# Update the result
self._merge_json(stmts_json, ev_totals, source_counts)
return eos, num_returned, limit
def _run(self, *args, **kwargs):
raise NotImplementedError("_run must be defined in subclass.")
[docs]class IndraDBRestHashProcessor(IndraDBRestProcessor):
"""The packaging and processor for hash lookup of statements.
Parameters
----------
hash_list : list[int or str]
A list of the matches-key hashes for the statements you want to get.
Keyword Parameters
------------------
timeout : positive int or None
If an int, block until the work is done and statements are retrieved, or
until the timeout has expired, in which case the results so far will be
returned in the response object, and further results will be added in
a separate thread as they become available. If simple_response is True,
all statements available will be returned. Otherwise (if None), block
indefinitely until all statements are retrieved. Default is None.
ev_limit : int or None
Limit the amount of evidence returned per Statement. Default is 100.
best_first : bool
If True, the preassembled statements will be sorted by the amount of
evidence they have, and those with the most evidence will be
prioritized. When using `max_stmts`, this means you will get the "best"
statements. If False, statements will be queried in arbitrary order.
tries : int > 0
Set the number of times to try the query. The database often caches
results, so if a query times out the first time, trying again after a
timeout will often succeed fast enough to avoid a timeout. This can also
help gracefully handle an unreliable connection, if you're willing to
wait. Default is 2.
Attributes
----------
statements : list[:py:class:`indra.statements.Statement`]
A list of INDRA Statements that will be filled once all queries have
been completed.
"""
_default_api_params = {'ev_limit': 100, 'max_stmts': RemoveParam()}
def _run(self, hash_list, **api_params):
# Make sure the input is a list (not just a single hash).
if not isinstance(hash_list, list):
raise ValueError("The `hash_list` input is a list, not %s."
% type(hash_list))
# If there is nothing in the list, don't waste time with a query.
if not hash_list:
return
# Regularize and check the types of elements in the hash list.
if isinstance(hash_list[0], str):
hash_list = [int(h) for h in hash_list]
if not all([isinstance(h, int) for h in hash_list]):
raise ValueError("Hashes must be ints or strings that can be "
"converted into ints.")
# Execute the query and load the results.
resp = submit_statement_request('post', 'from_hashes',
data={'hashes': hash_list},
**api_params)
self._unload_and_merge_resp(resp)
self._compile_statements()
return
class IndraDBRestPaperProcessor(IndraDBRestProcessor):
"""The packaging and processor for hash lookup of statements.
Parameters
----------
ids : list[(<id type>, <id value>)]
A list of tuples with ids and their type. The type can be any one of
'pmid', 'pmcid', 'doi', 'pii', 'manuscript id', or 'trid', which is the
primary key id of the text references in the database.
Keyword Parameters
------------------
timeout : positive int or None
If an int, block until the work is done and statements are retrieved, or
until the timeout has expired, in which case the results so far will be
returned in the response object, and further results will be added in
a separate thread as they become available. If simple_response is True,
all statements available will be returned. Otherwise (if None), block
indefinitely until all statements are retrieved. Default is None.
ev_limit : int or None
Limit the amount of evidence returned per Statement. Default is 100.
best_first : bool
If True, the preassembled statements will be sorted by the amount of
evidence they have, and those with the most evidence will be
prioritized. When using `max_stmts`, this means you will get the "best"
statements. If False, statements will be queried in arbitrary order.
tries : int > 0
Set the number of times to try the query. The database often caches
results, so if a query times out the first time, trying again after a
timeout will often succeed fast enough to avoid a timeout. This can also
help gracefully handle an unreliable connection, if you're willing to
wait. Default is 2.
max_stmts : int or None
Select the maximum number of statements to return. When set less than
1000 the effect is much the same as setting persist to false, and will
guarantee a faster response. Default is None.
Attributes
----------
statements : list[:py:class:`indra.statements.Statement`]
A list of INDRA Statements that will be filled once all queries have
been completed.
"""
def _run(self, ids, **api_params):
id_l = [{'id': id_val, 'type': id_type} for id_type, id_val in ids]
resp = submit_statement_request('post', 'from_papers',
data={'ids': id_l},
**api_params)
self._unload_and_merge_resp(resp)
self._compile_statements()
return
[docs]class IndraDBRestSearchProcessor(IndraDBRestProcessor):
"""The packaging for agent and statement type search query responses.
Parameters
----------
subject/object : str
Optionally specify the subject and/or object of the statements in
you wish to get from the database. By default, the namespace is assumed
to be HGNC gene names, however you may specify another namespace by
including `@<namespace>` at the end of the name string. For example, if
you want to specify an agent by chebi, you could use `CHEBI:6801@CHEBI`,
or if you wanted to use the HGNC id, you could use `6871@HGNC`.
agents : list[str]
A list of agents, specified in the same manner as subject and object,
but without specifying their grammatical position.
stmt_type : str
Specify the types of interactions you are interested in, as indicated
by the sub-classes of INDRA's Statements. This argument is *not* case
sensitive. If the statement class given has sub-classes
(e.g. RegulateAmount has IncreaseAmount and DecreaseAmount), then both
the class itself, and its subclasses, will be queried, by default. If
you do not want this behavior, set use_exact_type=True. Note that if
max_stmts is set, it is possible only the exact statement type will
be returned, as this is the first searched. The processor then cycles
through the types, getting a page of results for each type and adding it
to the quota, until the max number of statements is reached.
use_exact_type : bool
If stmt_type is given, and you only want to search for that specific
statement type, set this to True. Default is False.
persist : bool
Default is True. When False, if a query comes back limited (not all
results returned), just give up and pass along what was returned.
Otherwise, make further queries to get the rest of the data (which may
take some time).
Keyword Parameters
------------------
timeout : positive int or None
If an int, block until the work is done and statements are retrieved, or
until the timeout has expired, in which case the results so far will be
returned in the response object, and further results will be added in
a separate thread as they become available. If simple_response is True,
all statements available will be returned. Otherwise (if None), block
indefinitely until all statements are retrieved. Default is None.
ev_limit : int or None
Limit the amount of evidence returned per Statement. Default is 10.
best_first : bool
If True, the preassembled statements will be sorted by the amount of
evidence they have, and those with the most evidence will be
prioritized. When using `max_stmts`, this means you will get the "best"
statements. If False, statements will be queried in arbitrary order.
tries : int > 0
Set the number of times to try the query. The database often caches
results, so if a query times out the first time, trying again after a
timeout will often succeed fast enough to avoid a timeout. This can also
help gracefully handle an unreliable connection, if you're willing to
wait. Default is 2.
max_stmts : int or None
Select the maximum number of statements to return. When set less than
1000 the effect is much the same as setting persist to false, and will
guarantee a faster response. Default is None.
Attributes
----------
statements : list[:py:class:`indra.statements.Statement`]
A list of INDRA Statements that will be filled once all queries have
been completed.
statements_sample : list[:py:class:`indra.statements.Statement`]
A list of the INDRA Statements received from the first query. In
general these will be the "best" (currently this means they have the
most evidence) Statements available.
"""
def __init__(self, *args, **kwargs):
self.statements_sample = None
super(self.__class__, self).__init__(*args, **kwargs)
return
[docs] def is_working(self):
"""Check if the thread is running."""
if not self.__th:
return False
return self.__th.is_alive()
[docs] def wait_until_done(self, timeout=None):
"""Wait for the background load to complete."""
start = datetime.now()
if not self.__th:
raise IndraDBRestResponseError("There is no thread waiting to "
"complete.")
self.__th.join(timeout)
now = datetime.now()
dt = now - start
if self.__th.is_alive():
logger.warning("Timed out after %0.3f seconds waiting for "
"statement load to complete." % dt.total_seconds())
ret = False
else:
logger.info("Waited %0.3f seconds for statements to finish"
"loading." % dt.total_seconds())
ret = True
return ret
def _all_done(self):
every_type_done = (len(self.__done_dict) > 0
and all(self.__done_dict.values()))
quota_done = (self.__quota is not None and self.__quota <= 0)
return every_type_done or quota_done
def _query_and_extract(self, agent_strs, params, stmt_type=None):
assert not self._all_done(), "Tried to run query but I'm done!"
params['offset'] = self.__page_dict[stmt_type]
params['max_stmts'] = self.__quota
if stmt_type is not None:
params['type'] = stmt_type
resp = submit_query_request('from_agents', *agent_strs, **params)
eos, num_returned, page_step = self._unload_and_merge_resp(resp)
# NOTE: this is technically not a direct conclusion, and could be
# wrong, resulting in a single unnecessary extra query, but that
# should almost never happen, and if it does, it isn't the end of
# the world.
self.__done_dict[stmt_type] = eos
# Update the quota
if self.__quota is not None:
self.__quota -= num_returned
# Increment the page
self.__page_dict[stmt_type] += page_step
return
def _query_over_statement_types(self, agent_strs, stmt_types, params):
if not stmt_types:
self._query_and_extract(agent_strs, params.copy())
else:
for stmt_type in stmt_types:
if self.__done_dict[stmt_type]:
continue
self._query_and_extract(agent_strs, params.copy(), stmt_type)
# Check the quota
if self.__quota is not None and self.__quota <= 0:
break
return
def _run_queries(self, agent_strs, stmt_types, params, persist):
"""Use paging to get all statements requested."""
self._query_over_statement_types(agent_strs, stmt_types, params)
assert len(self.__done_dict) == len(stmt_types) \
or None in self.__done_dict.keys(), \
"Done dict was not initiated for all stmt_type's."
# Check if we want to keep going.
if not persist:
self._compile_statements()
return
# Get the rest of the content.
while not self._all_done():
self._query_over_statement_types(agent_strs, stmt_types, params)
# Create the actual statements.
self._compile_statements()
return
[docs] def merge_results(self, other_processor):
super(self.__class__, self).merge_results(other_processor)
if other_processor.statements_sample is not None:
if self.statements_sample is None:
self.statements_sample = other_processor.statements_sample
else:
self.statements_sample.extend(other_processor.statements_sample)
def _merge_json(self, stmt_json, ev_counts, source_counts):
super(self.__class__, self)._merge_json(stmt_json, ev_counts,
source_counts)
if not self.__started:
self.statements_sample = stmts_from_json(stmt_json.values())
self.__started = True
return
def _run(self, subject=None, object=None, agents=None, stmt_type=None,
use_exact_type=False, persist=True, strict_stop=False,
**api_params):
self.__started = False
self.__done_dict = defaultdict(lambda: False)
self.__page_dict = defaultdict(lambda: 0)
self.__th = None
self.__quota = api_params['max_stmts']
# Make sure we got at least SOME agents (the remote API will error if
# we proceed with no arguments).
if subject is None and object is None and not agents:
raise ValueError("At least one agent must be specified, or else "
"the scope will be too large.")
# Make timeouts apply differently in this case
if not strict_stop:
timeout = api_params.pop('timeout', None)
else:
timeout = api_params.get('timeout', None)
# Formulate inputs for the agents..
key_val_list = [('subject', subject), ('object', object)]
params = {param_key: param_val for param_key, param_val in key_val_list
if param_val is not None}
params.update(api_params)
agent_strs = [] if agents is None else ['agent%d=%s' % (i, ag)
for i, ag in enumerate(agents)]
# Handle the type(s).
stmt_types = [stmt_type] if stmt_type else []
if stmt_type is not None and not use_exact_type:
stmt_class = get_statement_by_name(stmt_type)
descendant_classes = get_all_descendants(stmt_class)
stmt_types += [cls.__name__ for cls in descendant_classes]
# Handle the content if we were limited.
args = [agent_strs, stmt_types, params, persist]
logger.debug("The remainder of the query will be performed in a "
"thread...")
self.__th = Thread(target=self._run_queries, args=args)
self.__th.start()
if timeout is None:
logger.debug("Waiting for thread to complete...")
self.__th.join()
elif timeout: # is not 0
logger.debug("Waiting at most %d seconds for thread to complete..."
% timeout)
self.__th.join(timeout)
return
def standardize_counts(counts):
"""Standardize hash-based counts dicts to be int-keyed."""
standardized_counts = {}
for k, v in counts.items():
try:
int_k = int(k)
standardized_counts[int_k] = v
except ValueError:
logger.warning('Could not convert statement hash %s to int' % k)
return standardized_counts