import os
import re
import logging
import objectpath
from indra.statements import *
from indra.util import read_unicode_csv
from indra.databases import go_client
from indra.ontology.standardize import \
standardize_db_refs, standardize_agent_name
from collections import namedtuple
logger = logging.getLogger(__name__)
Site = namedtuple('Site', ['residue', 'position'])
[docs]class ReachProcessor(object):
"""The ReachProcessor extracts INDRA Statements from REACH parser output.
Parameters
----------
json_dict : dict
A JSON dictionary containing the REACH extractions.
pmid : Optional[str]
The PubMed ID associated with the extractions. This can be passed
in case the PMID cannot be determined from the extractions alone.`
Attributes
----------
tree : objectpath.Tree
The objectpath Tree object representing the extractions.
statements : list[indra.statements.Statement]
A list of INDRA Statements that were extracted by the processor.
citation : str
The PubMed ID associated with the extractions.
all_events : dict[str, str]
The frame IDs of all events by type in the REACH extraction.
"""
def __init__(self, json_dict, pmid=None):
self.tree = objectpath.Tree(json_dict)
self.statements = []
self.citation = pmid
if pmid is None:
if self.tree is not None:
self.citation =\
self.tree.execute("$.events.object_meta.doc_id")
self.get_all_events()
[docs] def print_event_statistics(self):
"""Print the number of events in the REACH output by type."""
logger.info('All events by type')
logger.info('-------------------')
for k, v in self.all_events.items():
logger.info('%s, %s' % (k, len(v)))
logger.info('-------------------')
[docs] def get_all_events(self):
"""Gather all event IDs in the REACH output by type.
These IDs are stored in the self.all_events dict.
"""
self.all_events = {}
events = self.tree.execute("$.events.frames")
if events is None:
return
for e in events:
event_type = e.get('type')
frame_id = e.get('frame_id')
try:
self.all_events[event_type].append(frame_id)
except KeyError:
self.all_events[event_type] = [frame_id]
def print_regulations(self):
qstr = "$.events.frames[(@.type is 'regulation')]"
res = self.tree.execute(qstr)
if res is None:
return
for r in res:
print(r['subtype'])
for a in r['arguments']:
print(a['type'], '/', a['argument-type'], ':', a['text'])
[docs] def get_modifications(self):
"""Extract Modification INDRA Statements."""
# Find all event frames that are a type of protein modification
qstr = "$.events.frames[(@.type is 'protein-modification')]"
res = self.tree.execute(qstr)
if res is None:
return
# Extract each of the results when possible
for r in res:
# The subtype of the modification
modification_type = r.get('subtype')
# Skip negated events (i.e. something doesn't happen)
epistemics = self._get_epistemics(r)
if epistemics.get('negated'):
continue
annotations, context = self._get_annot_context(r)
frame_id = r['frame_id']
args = r['arguments']
site = None
theme = None
# Find the substrate (the "theme" agent here) and the
# site and position it is modified on
for a in args:
if self._get_arg_type(a) == 'theme':
theme = a['arg']
elif self._get_arg_type(a) == 'site':
site = a['text']
theme_agent, theme_coords = self._get_agent_from_entity(theme)
if site is not None:
mods = self._parse_site_text(site)
else:
mods = [(None, None)]
for mod in mods:
# Add up to one statement for each site
residue, pos = mod
# Now we need to look for all regulation event to get to the
# enzymes (the "controller" here)
qstr = "$.events.frames[(@.type is 'regulation') and " + \
"(@.arguments[0].arg is '%s')]" % frame_id
reg_res = self.tree.execute(qstr)
reg_res = list(reg_res)
for reg in reg_res:
controller_agent, controller_coords = None, None
for a in reg['arguments']:
if self._get_arg_type(a) == 'controller':
controller = a.get('arg')
if controller is not None:
controller_agent, controller_coords = \
self._get_agent_from_entity(controller)
break
# Check the polarity of the regulation and if negative,
# flip the modification type.
# For instance, negative-regulation of a phosphorylation
# will become an (indirect) dephosphorylation
reg_subtype = reg.get('subtype')
if reg_subtype == 'negative-regulation':
modification_type = \
modtype_to_inverse.get(modification_type)
if not modification_type:
logger.warning('Unhandled modification type: %s' %
modification_type)
continue
sentence = reg['verbose-text']
annotations['agents']['coords'] = [controller_coords,
theme_coords]
ev = Evidence(source_api='reach', text=sentence,
annotations=annotations, pmid=self.citation,
context=context, epistemics=epistemics)
args = [controller_agent, theme_agent, residue, pos, ev]
# Here ModStmt is a sub-class of Modification
ModStmt = modtype_to_modclass.get(modification_type)
if ModStmt is None:
logger.warning('Unhandled modification type: %s' %
modification_type)
else:
# Handle this special case here because only
# enzyme argument is needed
if modification_type == 'autophosphorylation':
args = [theme_agent, residue, pos, ev]
self.statements.append(ModStmt(*args))
[docs] def get_regulate_amounts(self):
"""Extract RegulateAmount INDRA Statements."""
qstr = "$.events.frames[(@.type is 'transcription')]"
res = self.tree.execute(qstr)
all_res = []
if res is not None:
all_res += list(res)
qstr = "$.events.frames[(@.type is 'amount')]"
res = self.tree.execute(qstr)
if res is not None:
all_res += list(res)
for r in all_res:
subtype = r.get('subtype')
epistemics = self._get_epistemics(r)
if epistemics.get('negated'):
continue
annotations, context = self._get_annot_context(r)
frame_id = r['frame_id']
args = r['arguments']
theme = None
for a in args:
if self._get_arg_type(a) == 'theme':
theme = a['arg']
break
if theme is None:
continue
theme_agent, theme_coords = self._get_agent_from_entity(theme)
qstr = "$.events.frames[((@.type is 'regulation') or "\
"(@.type is 'activation')) and " + \
"(@.arguments[0].arg is '%s')]" % frame_id
reg_res = self.tree.execute(qstr)
for reg in reg_res:
controller_agent, controller_coords = None, None
for a in reg['arguments']:
if self._get_arg_type(a) == 'controller':
controller_agent, controller_coords = \
self._get_controller_agent(a)
sentence = reg['verbose-text']
annotations['agents']['coords'] = [controller_coords,
theme_coords]
ev = Evidence(source_api='reach', text=sentence,
annotations=annotations, pmid=self.citation,
context=context, epistemics=epistemics)
args = [controller_agent, theme_agent, ev]
subtype = reg.get('subtype')
if subtype.startswith('positive'):
st = IncreaseAmount(*args)
else:
st = DecreaseAmount(*args)
self.statements.append(st)
[docs] def get_complexes(self):
"""Extract INDRA Complex Statements."""
qstr = "$.events.frames[@.type is 'complex-assembly']"
res = self.tree.execute(qstr)
if res is None:
return
for r in res:
epistemics = self._get_epistemics(r)
if epistemics.get('negated'):
continue
# Due to an issue with the REACH output serialization
# (though seemingly not with the raw mentions), sometimes
# a redundant complex-assembly event is reported which can
# be recognized by the missing direct flag, which we can filter
# for here
if epistemics.get('direct') is None:
continue
annotations, context = self._get_annot_context(r)
args = r['arguments']
sentence = r['verbose-text']
members = []
agent_coordinates = []
for a in args:
agent, coords = self._get_agent_from_entity(a['arg'])
members.append(agent)
agent_coordinates.append(coords)
annotations['agents']['coords'] = agent_coordinates
ev = Evidence(source_api='reach', text=sentence,
annotations=annotations, pmid=self.citation,
context=context, epistemics=epistemics)
stmt = Complex(members, ev)
self.statements.append(stmt)
[docs] def get_activation(self):
"""Extract INDRA Activation Statements."""
qstr = "$.events.frames[@.type is 'activation']"
res = self.tree.execute(qstr)
if res is None:
return
for r in res:
epistemics = self._get_epistemics(r)
if epistemics.get('negated'):
continue
sentence = r['verbose-text']
annotations, context = self._get_annot_context(r)
ev = Evidence(source_api='reach', text=sentence,
pmid=self.citation, annotations=annotations,
context=context, epistemics=epistemics)
args = r['arguments']
controller_agent = None
for a in args:
if self._get_arg_type(a) == 'controller':
controller_agent, controller_coords = \
self._get_controller_agent(a)
if self._get_arg_type(a) == 'controlled':
controlled = a['arg']
controlled_agent, controlled_coords = \
self._get_agent_from_entity(controlled)
if controller_agent is None or controlled_agent is None:
continue
annotations['agents']['coords'] = [controller_coords,
controlled_coords]
if r['subtype'] == 'positive-activation':
st = Activation(controller_agent, controlled_agent,
evidence=ev)
else:
st = Inhibition(controller_agent, controlled_agent,
evidence=ev)
self.statements.append(st)
[docs] def get_translocation(self):
"""Extract INDRA Translocation Statements."""
qstr = "$.events.frames[@.type is 'translocation']"
res = self.tree.execute(qstr)
if res is None:
return
for r in res:
epistemics = self._get_epistemics(r)
if epistemics.get('negated'):
continue
sentence = r['verbose-text']
annotations, context = self._get_annot_context(r)
args = r['arguments']
from_location = None
to_location = None
for a in args:
if self._get_arg_type(a) == 'theme':
agent, theme_coords = self._get_agent_from_entity(a['arg'])
if agent is None:
continue
elif self._get_arg_type(a) == 'source':
from_location = self._get_location_by_id(a['arg'])
elif self._get_arg_type(a) == 'destination':
to_location = self._get_location_by_id(a['arg'])
# We skip statements that have no locations associated with them
# at all
if not from_location and not to_location:
continue
annotations['agents']['coords'] = [theme_coords]
ev = Evidence(source_api='reach', text=sentence,
pmid=self.citation, annotations=annotations,
context=context, epistemics=epistemics)
st = Translocation(agent, from_location, to_location,
evidence=ev)
self.statements.append(st)
def get_conversion(self):
qstr = "$.events.frames[@.type is 'conversion']"
res = self.tree.execute(qstr)
if res is None:
return
for r in res:
epistemics = self._get_epistemics(r)
if epistemics.get('negated'):
continue
sentence = r['verbose-text']
annotations, context = self._get_annot_context(r)
ev = Evidence(source_api='reach', text=sentence,
pmid=self.citation, annotations=annotations,
context=context, epistemics=epistemics)
args = r['arguments']
controller_agent = substrate_agent = product_agent = None
for a in args:
if self._get_arg_type(a) == 'controller':
controller_agent, controller_coords = \
self._get_controller_agent(a)
if self._get_arg_type(a) == 'substrate':
substrate_agent, substrate_coords = \
self._get_agent_from_entity(a['arg'])
if self._get_arg_type(a) == 'product':
product_agent, product_coords = \
self._get_agent_from_entity(a['arg'])
if not all({controller_agent, substrate_agent, product_agent}):
continue
annotations['agents']['coords'] = [controller_coords,
substrate_coords,
product_coords]
st = Conversion(controller_agent, [substrate_agent],
[product_agent], evidence=[ev])
self.statements.append(st)
def _get_location_by_id(self, loc_id):
qstr = "$.entities.frames[(@.frame_id is \'%s\')]" % loc_id
res = self.tree.execute(qstr)
if res is None:
return None
try:
entity_term = next(res)
except StopIteration:
logger.debug(' %s is not an entity' % loc_id)
return None
name = entity_term.get('text')
go_id = None
for xr in entity_term['xrefs']:
ns = xr['namespace']
if ns == 'go':
go_id = xr['id']
# If there is no GO ID, we try to "ground" the name to an ID
if go_id is None:
go_id = go_client.get_go_id_from_label_or_synonym(name.lower())
# Try to get valid location based on GO id
if go_id is not None:
loc = go_client.get_go_label(go_id)
if loc:
return loc
return None
def _get_agent_from_entity(self, entity_id):
qstr = "$.entities.frames[(@.frame_id is \'%s\')]" % entity_id
res = self.tree.execute(qstr)
if res is None:
return None, None
try:
entity_term = next(res)
except StopIteration:
logger.debug(' %s is not an entity' % entity_id)
return None, None
# This is the default name, which can be overwritten
# below for specific database entries
agent_name = entity_term['text']
db_refs = self._get_db_refs(entity_term)
mod_terms = entity_term.get('modifications')
mods, muts = self._get_mods_and_muts_from_mod_terms(mod_terms)
# get sentence coordinates of the entity
coords = self._get_entity_coordinates(entity_term)
agent = Agent(agent_name, db_refs=db_refs, mods=mods, mutations=muts)
standardize_agent_name(agent, standardize_refs=True)
return agent, coords
@staticmethod
def _get_db_refs(entity_term):
db_refs = {}
for xr in entity_term['xrefs']:
ns = xr['namespace']
if ns == 'uniprot':
db_refs['UP'] = xr['id']
elif ns == 'hgnc':
db_refs['HGNC'] = xr['id']
elif ns == 'pfam':
fplx_id = famplex_map.get(('PF', xr['id']))
if fplx_id:
db_refs['FPLX'] = fplx_id
db_refs['PF'] = xr['id']
elif ns == 'interpro':
fplx_id = famplex_map.get(('IP', xr['id']))
if fplx_id:
db_refs['FPLX'] = fplx_id
db_refs['IP'] = xr['id']
elif ns == 'chebi':
db_refs['CHEBI'] = xr['id']
elif ns == 'pubchem':
db_refs['PUBCHEM'] = xr['id']
elif ns == 'go':
go_id = xr['id']
# Handle secondary to primary mapping if necessary
pri = go_client.get_primary_id(go_id)
if pri:
go_id = pri
db_refs['GO'] = go_id
elif ns == 'mesh':
db_refs['MESH'] = xr['id']
elif ns == 'hmdb':
db_refs['HMDB'] = xr['id']
elif ns == 'simple_chemical':
if xr['id'].startswith('HMDB'):
db_refs['HMDB'] = xr['id']
# We handle "be" here for compatibility with older versions
elif ns in ('fplx', 'be'):
db_refs['FPLX'] = xr['id']
# These name spaces are ignored
elif ns in ['uaz']:
pass
else:
logger.warning('Unhandled xref namespace: %s' % ns)
db_refs['TEXT'] = entity_term['text']
db_refs = standardize_db_refs(db_refs)
return db_refs
def _get_mods_and_muts_from_mod_terms(self, mod_terms):
mods = []
muts = []
if mod_terms is not None:
for m in mod_terms:
if m['type'].lower() == 'mutation':
# Evidence is usualy something like "V600E"
# We could parse this to get the amino acid
# change that happened.
mutation_str = m.get('evidence')
# TODO: sometimes mutation_str is "mutant", "Mutant",
# "mutants" - this indicates that there is a mutation
# but not the specific type. We should encode this
# somehow as a "blank" mutation condition
mut = self._parse_mutation(mutation_str)
if mut is not None:
muts.append(mut)
else:
mcs = self._get_mod_conditions(m)
mods.extend(mcs)
return mods, muts
def _get_mod_conditions(self, mod_term):
"""Return a list of ModConditions given a mod term dict."""
site = mod_term.get('site')
if site is not None:
mods = self._parse_site_text(site)
else:
mods = [Site(None, None)]
mcs = []
for mod in mods:
mod_res, mod_pos = mod
mod_type_str = mod_term['type'].lower()
mod_state = agent_mod_map.get(mod_type_str)
if mod_state is not None:
mc = ModCondition(mod_state[0], residue=mod_res,
position=mod_pos, is_modified=mod_state[1])
mcs.append(mc)
else:
logger.warning('Unhandled entity modification type: %s'
% mod_type_str)
return mcs
def _get_entity_coordinates(self, entity_term):
"""Return sentence coordinates for a given entity.
Given an entity term return the associated sentence coordinates as
a tuple of the form (int, int). Returns None if for any reason the
sentence coordinates cannot be found.
"""
# The following lines get the starting coordinate of the sentence
# containing the entity.
sent_id = entity_term.get('sentence')
if sent_id is None:
return None
qstr = "$.sentences.frames[(@.frame_id is \'%s')]" % sent_id
res = self.tree.execute(qstr)
if res is None:
return None
try:
sentence = next(res)
except StopIteration:
return None
sent_start = sentence.get('start-pos')
if sent_start is None:
return None
sent_start = sent_start.get('offset')
if sent_start is None:
return None
# Get the entity coordinate in the entire text and subtract the
# coordinate of the first character in the associated sentence to
# get the sentence coordinate of the entity. Return None if entity
# coordinates are missing
entity_start = entity_term.get('start-pos')
entity_stop = entity_term.get('end-pos')
if entity_start is None or entity_stop is None:
return None
entity_start = entity_start.get('offset')
entity_stop = entity_stop.get('offset')
if entity_start is None or entity_stop is None:
return None
return (entity_start - sent_start, entity_stop - sent_start)
def _get_annot_context(self, frame_term):
annotations = {'found_by': frame_term['found_by'],
'agents': {}}
try:
context_id = frame_term['context']
except KeyError:
return annotations, None
# For backwards compatibility with older versions
# of REACH
if isinstance(context_id, dict):
context_term = context_id
species = context_term.get('Species')
cell_type = context_term.get('CellType')
cell_line = None
location = None
tissue = None
organ = None
else:
qstr = "$.entities.frames[(@.frame_id is \'%s\')]" % context_id[0]
res = self.tree.execute(qstr)
if res is None:
return annotations, None
context_frame = next(res)
facets = context_frame['facets']
cell_line = facets.get('cell-line')
cell_type = facets.get('cell-type')
species = facets.get('organism')
location = facets.get('location')
tissue = facets.get('tissue_type')
organ = facets.get('organ')
def get_ref_context(lst):
if not lst:
return None
db_name, db_id = lst[0].split(':', 1)
return RefContext(db_refs={db_name.upper(): db_id})
context = BioContext()
# Example: ['taxonomy:9606']
context.species = get_ref_context(species)
# Example: ['cl:CL:0000148']
context.cell_type = get_ref_context(cell_type)
# Example: ['cellosaurus:CVCL_0504']
context.cell_line = get_ref_context(cell_line)
# Example: ['go:GO:0005886']
context.location = get_ref_context(location)
# Example: ['uberon:UBERON:0000105']
context.organ = get_ref_context(organ)
# NOTE: we can't handle tissue currently
# context['tissue'] = tissue
# This is so we don't add a blank BioContext as context and rather
# just add None
if not context:
context = None
return annotations, context
def _get_epistemics(self, event):
epistemics = {}
# Check whether information is negative
neg = event.get('is_negated')
if neg is True:
epistemics['negated'] = True
# Check if it is a hypothesis
hyp = event.get('is_hypothesis')
if hyp is True:
epistemics['hypothesis'] = True
# Check if it is direct
if 'is_direct' in event:
direct = event['is_direct']
epistemics['direct'] = direct
# Get the section of the paper it comes from
section = self._get_section(event)
epistemics['section_type'] = section
return epistemics
_section_list = ['title', 'abstract', 'introduction', 'background',
'results', 'methods', 'discussion', 'conclusion',
'supplementary', 'figure']
def _get_section(self, event):
"""Get the section of the paper that the event is from."""
sentence_id = event.get('sentence')
section = None
if sentence_id:
qstr = "$.sentences.frames[(@.frame_id is \'%s\')]" % sentence_id
res = self.tree.execute(qstr)
if res:
sentence_frame = list(res)[0]
passage_id = sentence_frame.get('passage')
if passage_id:
qstr = "$.sentences.frames[(@.frame_id is \'%s\')]" % \
passage_id
res = self.tree.execute(qstr)
if res:
passage_frame = list(res)[0]
section = passage_frame.get('section-id')
# If the section is in the standard list, return as is
if section in self._section_list:
return section
# Next, handle a few special cases that come up in practice
elif section.startswith('fig'):
return 'figure'
elif section.startswith('supm'):
return 'supplementary'
elif section == 'article-title':
return 'title'
elif section in ['subjects|methods', 'methods|subjects']:
return 'methods'
elif section == 'conclusions':
return 'conclusion'
elif section == 'intro':
return 'introduction'
else:
return None
def _get_controller_agent(self, arg):
"""Return a single or a complex controller agent."""
controller_agent = None
controller = arg.get('arg')
# There is either a single controller here
if controller is not None:
controller_agent, coords = self._get_agent_from_entity(controller)
# Or the controller is a complex
elif arg['argument-type'] == 'complex':
controllers = list(arg.get('args').values())
controller_agent, coords = \
self._get_agent_from_entity(controllers[0])
bound_agents = [self._get_agent_from_entity(c)[0]
for c in controllers[1:]]
bound_conditions = [BoundCondition(ba, True) for
ba in bound_agents]
controller_agent.bound_conditions = bound_conditions
return controller_agent, coords
@staticmethod
def _get_arg_type(arg):
"""Return the type of the argument with backwards compatibility."""
if arg.get('argument_label') is not None:
return arg.get('argument_label')
else:
return arg.get('type')
@staticmethod
def _parse_mutation(s):
m = re.match(r'([A-Z]+)([0-9]+)([A-Z]+)', s.upper())
if m is not None:
parts = [str(g) for g in m.groups()]
try:
residue_from = get_valid_residue(parts[0])
except Exception as e:
return None
try:
residue_to = get_valid_residue(parts[2])
except Exception as e:
return None
position = parts[1]
mut = MutCondition(position, residue_from, residue_to)
return mut
elif s.lower() in ('mutation', 'mutations', 'mutant', 'mutants',
'mutational'):
mut = MutCondition(None, None, None)
return mut
else:
logger.warning('Unhandled mutation string: %s' % s)
return None
@staticmethod
def _parse_site_text(s):
has_comma = ',' in s
has_slash = '/' in s
has_both = has_comma and has_slash
if has_both:
logger.error(s + ' is not a valid site text string')
return []
if has_comma:
texts = s.split(',')
else:
texts = s.split('/')
sites = [ReachProcessor._parse_site_text_single(t) for t in texts]
# If the first site has a residue, and the remaining sites do not
# explicitly give a residue (example: Tyr-577/576), then apply the
# first site's residue to all sites in the site text.
only_first_site_has_residue = sites[0].residue is not None
for i in range(1, len(sites)):
if sites[i].residue is not None:
only_first_site_has_residue = False
if only_first_site_has_residue:
for i in range(1, len(sites)):
sites[i] = Site(sites[0].residue, sites[i].position)
return sites
@staticmethod
def _parse_site_text_single(s):
s = s.strip()
for p in (_site_pattern1, _site_pattern2, _site_pattern3):
m = re.match(p, s.upper())
if m is not None:
residue = get_valid_residue(m.groups()[0])
site = m.groups()[1]
return Site(residue, site)
m = re.match(_site_pattern4, s.upper())
if m is not None:
site = m.groups()[0]
residue = m.groups()[1]
return Site(residue, site)
for p in (_site_pattern5, _site_pattern6, _site_pattern7):
m = re.match(p, s.upper())
if m is not None:
residue = get_valid_residue(m.groups()[0])
site = None
return Site(residue, site)
m = re.match(_site_pattern8, s.upper())
if m is not None:
site = m.groups()[0]
residue = None
return Site(residue, site)
logger.warning('Could not parse site text %s' % s)
return Site(None, None)
_site_pattern1 = '([' + ''.join(list(amino_acids.keys())) + '])[-]?([0-9]+)$'
_site_pattern2 = '(' + '|'.join([v['short_name'].upper() for
v in amino_acids.values()]) + \
')[- ]?([0-9]+)$'
_site_pattern3 = '(' + '|'.join([v['indra_name'].upper() for
v in amino_acids.values()]) + \
')[^0-9]*([0-9]+)$'
_site_pattern4 = '([0-9]+)[ ]?([' + ''.join(list(amino_acids.keys())) + '])$'
_site_pattern5 = '^([' + ''.join(list(amino_acids.keys())) + '])$'
_site_pattern6 = '^(' + '|'.join([v['short_name'].upper() for
v in amino_acids.values()]) + ')$'
_site_pattern7 = '.*(' + '|'.join([v['indra_name'].upper() for
v in amino_acids.values()]) + ').*'
_site_pattern8 = '([0-9]+)$'
# Subtypes that exist but we don't handle: hydrolysis
agent_mod_map = {
'phosphorylation': ('phosphorylation', True),
'phosphorylated': ('phosphorylation', True),
'dephosphorylation': ('phosphorylation', False),
'acetylation': ('acetylation', True),
'deacetylation': ('acetylation', False),
'ubiquitination': ('ubiquitination', True),
'deubiquitination': ('ubiquitination', False),
'hydroxylation': ('hydroxylation', True),
'dehydroxylation': ('hydroxylation', False),
'sumoylation': ('sumoylation', True),
'desumoylation': ('sumoylation', False),
'glycosylation': ('glycosylation', True),
'deglycosylation': ('glycosylation', False),
'farnesylation': ('farnesylation', True),
'defarnesylation': ('farnesylation', False),
'ribosylation': ('ribosylation', True),
'deribosylation': ('ribosylation', False),
'methylation': ('methylation', True),
'demethylation': ('methylation', False),
'unknown': ('modification', True),
}
def _read_famplex_map():
fname = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'../../resources/famplex_map.tsv')
famplex_map = {}
csv_rows = read_unicode_csv(fname, delimiter='\t')
for row in csv_rows:
source_ns = row[0]
source_id = row[1]
be_id = row[2]
famplex_map[(source_ns, source_id)] = be_id
return famplex_map
famplex_map = _read_famplex_map()
def _read_reach_rule_regexps():
"""Load in a file with the regular expressions corresponding to each
reach rule. Why regular expression matching?
The rule name in found_by has instances of some reach rules for each
possible event type
(activation, binding, etc). This makes for too many different types of
rules for practical curation of examples.
We use regular expressions to only match the rule used for extraction,
independently of what the event is.
"""
reach_rule_filename = \
os.path.join(os.path.dirname(os.path.abspath(__file__)),
'reach_rule_regexps.txt')
with open(reach_rule_filename, 'r') as f:
reach_rule_regexp = []
for line in f:
reach_rule_regexp.append(line.rstrip())
return reach_rule_regexp
reach_rule_regexps = _read_reach_rule_regexps()
[docs]def determine_reach_subtype(event_name):
"""Returns the category of reach rule from the reach rule instance.
Looks at a list of regular
expressions corresponding to reach rule types, and returns the longest
regexp that matches, or None if none of them match.
Parameters
----------
evidence : indra.statements.Evidence
A reach evidence object to subtype
Returns
-------
best_match : str
A regular expression corresponding to the reach rule that was used to
extract this evidence
"""
best_match_length = None
best_match = None
for ss in reach_rule_regexps:
if re.search(ss, event_name):
if best_match is None or len(ss) > best_match_length:
best_match = ss
best_match_length = len(ss)
return best_match