import os
import pickle
import logging
from indra.config import get_config
from ..ontology_graph import IndraOntology
from indra.util import read_unicode_csv
from indra.statements import modtype_conditions
logger = logging.getLogger(__name__)
[docs]class BioOntology(IndraOntology):
"""Represents the ontology used for biology applications."""
# The version is used to determine if the cached pickle is still valid
# or not. When updating relevant resource files in INDRA, this version
# should be incremented to "force" rebuilding the ontology to be consistent
# with the underlying resource files.
name = 'bio'
version = '1.1'
def __init__(self):
super().__init__()
[docs] def initialize(self, rebuild=False):
if rebuild or not os.path.exists(CACHE_FILE):
logger.info('Initializing INDRA bio ontology for the first time, '
'this may take a few minutes...')
self._build()
# Try to create the folder first, if it fails, we don't cache
if not os.path.exists(CACHE_DIR):
try:
os.makedirs(CACHE_DIR)
except Exception:
logger.warning('%s could not be created.' % CACHE_DIR)
# Try to dump the file next, if it fails, we don't cache
try:
logger.info('Caching INDRA bio ontology at %s' % CACHE_FILE)
with open(CACHE_FILE, 'wb') as fh:
pickle.dump(self, fh, pickle.HIGHEST_PROTOCOL)
except Exception:
logger.warning('Failed to cache ontology at %s.' % CACHE_FILE)
else:
logger.info(
'Loading INDRA bio ontology from cache at %s' % CACHE_FILE)
with open(CACHE_FILE, 'rb') as fh:
self.__dict__.update(pickle.load(fh).__dict__)
def _build(self):
# Add all nodes with annotations
logger.info('Adding nodes...')
self.add_hgnc_nodes()
self.add_uniprot_nodes()
self.add_famplex_nodes()
self.add_obo_nodes()
self.add_mesh_nodes()
self.add_ncit_nodes()
self.add_uppro_nodes()
self.add_mirbase_nodes()
# Add xrefs
logger.info('Adding xrefs...')
self.add_hgnc_uniprot_xrefs()
self.add_famplex_xrefs()
self.add_chemical_xrefs()
self.add_ncit_xrefs()
self.add_mesh_xrefs()
self.add_mirbase_xrefs()
# Add hierarchies
logger.info('Adding hierarchy...')
self.add_famplex_hierarchy()
self.add_obo_hierarchies()
self.add_mesh_hierarchy()
self.add_activity_hierarchy()
self.add_modification_hierarchy()
self.add_uppro_hierarchy()
self._initialized = True
# Build name to ID lookup
logger.info('Building name lookup...')
self._build_name_lookup()
# Label components
logger.info('Labeling components...')
self._label_components()
logger.info('Finished initializing bio ontology...')
def add_hgnc_nodes(self):
from indra.databases import hgnc_client
nodes = [(self.label('HGNC', hid), {'name': hname})
for (hid, hname) in hgnc_client.hgnc_names.items()]
self.add_nodes_from(nodes)
def add_uniprot_nodes(self):
from indra.databases import uniprot_client
nodes = [(self.label('UP', uid), {'name': uname})
for (uid, uname)
in uniprot_client.um.uniprot_gene_name.items()]
self.add_nodes_from(nodes)
def add_uppro_nodes(self):
from indra.databases import uniprot_client
nodes = []
for prot_id, features in uniprot_client.um.features.items():
for feature in features:
if feature.id is None:
continue
node = self.label('UPPRO', feature.id)
data = {'name': feature.name}
nodes.append((node, data))
self.add_nodes_from(nodes)
def add_hgnc_uniprot_xrefs(self):
from indra.databases import hgnc_client
from indra.databases import uniprot_client
edges = []
for hid, uid in hgnc_client.uniprot_ids.items():
uids = uid.split(', ')
for uid in uids:
edges.append((self.label('HGNC', hid), self.label('UP', uid),
{'type': 'xref', 'source': 'hgnc'}))
self.add_edges_from(edges)
edges = [(self.label('UP', uid), self.label('HGNC', hid),
{'type': 'xref', 'source': 'hgnc'})
for uid, hid in uniprot_client.um.uniprot_hgnc.items()]
self.add_edges_from(edges)
def add_famplex_nodes(self):
nodes = []
for row in read_unicode_csv(os.path.join(resources, 'famplex',
'entities.csv'),
delimiter=','):
entity = row[0]
nodes.append((self.label('FPLX', entity),
{'name': entity}))
self.add_nodes_from(nodes)
def add_famplex_hierarchy(self):
from indra.databases import hgnc_client
edges = []
for row in read_unicode_csv(os.path.join(resources, 'famplex',
'relations.csv'),
delimiter=','):
ns1, id1, rel, ns2, id2 = row
if ns1 == 'HGNC':
id1 = hgnc_client.get_hgnc_id(id1)
edges.append((self.label(ns1, id1),
self.label(ns2, id2),
{'type': rel}))
self.add_edges_from(edges)
def add_famplex_xrefs(self):
edges = []
include_refs = {'PF', 'IP', 'GO', 'NCIT', 'ECCODE', 'HGNC_GROUP'}
for row in read_unicode_csv(os.path.join(resources, 'famplex',
'equivalences.csv'),
delimiter=','):
ref_ns, ref_id, fplx_id = row
if ref_ns not in include_refs:
continue
edges.append((self.label(ref_ns, ref_id),
self.label('FPLX', fplx_id),
{'type': 'xref', 'source': 'fplx'}))
edges.append((self.label('FPLX', fplx_id),
self.label(ref_ns, ref_id),
{'type': 'xref', 'source': 'fplx'}))
self.add_edges_from(edges)
def add_obo_nodes(self):
from indra.databases import obo_client
namespaces = ['go', 'efo', 'hp', 'doid', 'chebi']
nodes = []
for ns in namespaces:
oc = obo_client.OboClient(prefix=ns)
for db_id, entry in oc.entries.items():
nodes.append((self.label(ns.upper(), db_id),
{'name': entry['name']}))
self.add_nodes_from(nodes)
def add_obo_hierarchies(self):
from indra.databases import obo_client
namespaces = ['go', 'efo', 'hp', 'doid', 'chebi']
edges = []
rel_mappings = {
'xref': 'xref',
'isa': 'isa',
'partof': 'partof',
'is_a': 'isa',
'part_of': 'partof',
# These are for ChEBI: identical to the old behavior but it might
# make sense to add other relations here too
'is_conjugate_acid_of': 'isa',
'has_functional_parent': 'isa',
'has_parent_hydride': 'isa',
'has_role': 'isa'
}
for ns in namespaces:
oc = obo_client.OboClient(prefix=ns)
for db_id, entry in oc.entries.items():
for rel, targets in entry.get('relations', {}).items():
# Skip unknown relation types
mapped_rel = rel_mappings.get(rel)
if not mapped_rel:
continue
for target in targets:
edges.append((self.label(ns.upper(), db_id),
self.label(ns.upper(), target),
{'type': mapped_rel}))
self.add_edges_from(edges)
def add_chemical_xrefs(self):
from indra.databases import chebi_client, drugbank_client
mappings = [
(chebi_client.chebi_chembl, 'CHEBI', 'CHEMBL', True),
(chebi_client.chebi_pubchem, 'CHEBI', 'PUBCHEM', False),
(chebi_client.pubchem_chebi, 'PUBCHEM', 'CHEBI', False),
(chebi_client.hmdb_chebi, 'HMDB', 'CHEBI', True),
(chebi_client.cas_chebi, 'CAS', 'CHEBI', True),
(drugbank_client.drugbank_to_db, 'DRUGBANK', None, False),
(drugbank_client.db_to_drugbank, None, 'DRUGBANK', False),
]
edges = []
data = {'type': 'xref', 'source': 'chebi'}
def label_fix(ns, id):
if ns == 'CHEBI' and not id.startswith('CHEBI'):
id = 'CHEBI:%s' % id
return self.label(ns, id)
for map_dict, from_ns, to_ns, symmetric in mappings:
for from_id, to_id in map_dict.items():
# Here we assume if no namespace is given, then
# we're dealing with a (namespace, id) tuple
if from_ns is None:
from_ns_, from_id = from_id
to_ns_ = to_ns
elif to_ns is None:
from_id, to_ns_ = from_id
from_ns_ = from_ns
else:
from_ns_, to_ns_ = from_ns, to_ns
source = label_fix(from_ns_, from_id)
target = label_fix(to_ns_, to_id)
edges.append((source, target, data))
if symmetric:
edges.append((target, source, data))
self.add_edges_from(edges)
def add_mesh_nodes(self):
from indra.databases import mesh_client
nodes = [(self.label('MESH', mesh_id),
{'name': name})
for mesh_id, name in
mesh_client.mesh_id_to_name.items()]
self.add_nodes_from(nodes)
def add_mesh_xrefs(self):
from indra.databases import mesh_client
edges = []
data = {'type': 'xref', 'source': 'gilda'}
for mesh_id, (db_ns, db_id) in mesh_client.mesh_to_db.items():
edges.append((self.label('MESH', mesh_id),
self.label(db_ns, db_id),
data))
for (db_ns, db_id), mesh_id in mesh_client.db_to_mesh.items():
edges.append((self.label(db_ns, db_id),
self.label('MESH', mesh_id),
data))
self.add_edges_from(edges)
def add_mesh_hierarchy(self):
from indra.databases import mesh_client
mesh_tree_numbers_to_id = {}
for mesh_id, tns in mesh_client.mesh_id_to_tree_numbers.items():
for tn in tns:
mesh_tree_numbers_to_id[tn] = mesh_id
edges = []
for mesh_id, tns in mesh_client.mesh_id_to_tree_numbers.items():
parents_added = set()
for tn in tns:
if '.' not in tn:
continue
parent_tn, _ = tn.rsplit('.', maxsplit=1)
parent_id = mesh_tree_numbers_to_id[parent_tn]
if parent_id in parents_added:
continue
edges.append((self.label('MESH', mesh_id),
self.label('MESH', parent_id),
{'type': 'isa'}))
self.add_edges_from(edges)
def add_ncit_nodes(self):
from indra.sources.trips.processor import ncit_map
nodes = [(self.label('NCIT', ncit_id)) for ncit_id in ncit_map]
self.add_nodes_from(nodes)
def add_ncit_xrefs(self):
from indra.sources.trips.processor import ncit_map
edges = []
for ncit_id, (target_ns, target_id) in ncit_map.items():
edges.append((self.label('NCIT', ncit_id),
self.label(target_ns, target_id),
{'type': 'xref', 'source': 'ncit'}))
self.add_edges_from(edges)
def add_uppro_hierarchy(self):
from indra.databases import uniprot_client
edges = []
for prot_id, features in uniprot_client.um.features.items():
prot_node = self.label('UP', prot_id)
for feature in features:
if feature.id is None:
continue
feat_node = self.label('UPPRO', feature.id)
edges.append((feat_node, prot_node,
{'type': 'partof'}))
self.add_edges_from(edges)
def add_mirbase_nodes(self):
from indra.databases import mirbase_client
nodes = []
for mirbase_id, name in mirbase_client._mirbase_id_to_name.items():
nodes.append((self.label('MIRBASE', mirbase_id),
{'name': name}))
self.add_nodes_from(nodes)
def add_mirbase_xrefs(self):
from indra.databases import mirbase_client
edges = []
for mirbase_id, hgnc_id in \
mirbase_client._mirbase_id_to_hgnc_id.items():
edges.append((self.label('MIRBASE', mirbase_id),
self.label('HGNC', hgnc_id),
{'type': 'xref', 'source': 'mirbase'}))
for hgnc_id, mirbase_id in \
mirbase_client._hgnc_id_to_mirbase_id.items():
edges.append((self.label('HGNC', hgnc_id),
self.label('MIRBASE', mirbase_id),
{'type': 'xref', 'source': 'mirbase'}))
self.add_edges_from(edges)
def add_activity_hierarchy(self):
rels = [
('transcription', 'activity'),
('catalytic', 'activity'),
('gtpbound', 'activity'),
('kinase', 'catalytic'),
('phosphatase', 'catalytic'),
('gef', 'catalytic'),
('gap', 'catalytic')
]
self.add_edges_from([
(self.label('INDRA_ACTIVITIES', source),
self.label('INDRA_ACTIVITIES', target),
{'type': 'isa'})
for source, target in rels
]
)
def add_modification_hierarchy(self):
self.add_edges_from([
(self.label('INDRA_MODS', source),
self.label('INDRA_MODS', 'modification'),
{'type': 'isa'})
for source in modtype_conditions
if source != 'modification'
]
)
HERE = os.path.dirname(os.path.abspath(__file__))
resources = os.path.join(HERE, os.pardir, os.pardir, 'resources')
CACHE_DIR = get_config('INDRA_RESOURCES') or \
os.path.join(os.path.expanduser('~'), '.indra',
'%s_ontology' % BioOntology.name,
BioOntology.version)
CACHE_FILE = os.path.join(CACHE_DIR, 'bio_ontology.pkl')