# -*- coding: utf-8 -*-
"""This module defines a class and relative functions for mapping Uniprot
sequences to PDB and Pfam databases."""
import os
import re
import pickle
import datetime
import time
import numpy as np
import prody as pd
from prody import LOGGER, SETTINGS
from prody.utilities import openURL
from tqdm import tqdm
from Bio.pairwise2 import align as bioalign
from Bio.pairwise2 import format_alignment
from Bio.SubsMat import MatrixInfo as matlist
__author__ = "Luca Ponzoni"
__date__ = "December 2019"
__maintainer__ = "Luca Ponzoni"
__email__ = "lponzoni@pitt.edu"
__status__ = "Production"
__all__ = ['queryUniprot', 'UniprotMapping', 'mapSAVs2PDB',
'seqScanning', 'printSAVlist']
[docs]def queryUniprot(*args, n_attempts=3, dt=1, **kwargs):
"""
Redefine prody function to check for no internet connection
"""
attempt = 0
while attempt < n_attempts:
try:
_ = openURL('http://www.uniprot.org/')
break
except:
LOGGER.info(
f'Attempt {attempt} to contact www.uniprot.org failed')
attempt += 1
time.sleep((attempt+1)*dt)
else:
_ = openURL('http://www.uniprot.org/')
return pd.queryUniprot(*args, **kwargs)
[docs]class UniprotMapping:
[docs] def __init__(self, acc, recover_pickle=False, **kwargs):
self.acc = self._checkAccessionNumber(acc)
self.uniq_acc = None
self.fullRecord = None
self.sequence = None
self.PDBrecords = None
self.PDBmappings = None
self.customPDBmappings = None
self._align_algo_args = None
self._align_algo_kwargs = None
self.timestamp = None
self.Pfam = None
assert type(recover_pickle) is bool
if recover_pickle:
try:
self.recoverPickle(**kwargs)
except Exception as e:
LOGGER.warn(f'Unable to recover pickle: {e}')
self.refresh()
else:
self.refresh()
[docs] def refresh(self):
"""Refresh imported Uniprot records and mappings, and
delete precomputed alignments.
"""
# import Uniprot record and official accession number
self.fullRecord = queryUniprot(self.acc)
self.uniq_acc = self.fullRecord['accession 0']
# import main sequence and PDB records
rec = self.fullRecord
self.sequence = rec['sequence 0'].replace("\n", "")
self.PDBrecords = [rec[key] for key in rec.keys()
if key.startswith('dbRef') and 'PDB' in rec[key]]
# parse PDB records into PDB mappings, easier to access
self._initiatePDBmappings()
# set remaining attributes
self.customPDBmappings = []
self._align_algo_args = ['localxs', -0.5, -0.1]
self._align_algo_kwargs = {'one_alignment_only': True}
self.timestamp = str(datetime.datetime.utcnow())
self.Pfam = None
return
[docs] def getFullRecord(self):
"""Returns the output from :func:`.queryUniprot`"""
return self.fullRecord
[docs] def getPDBrecords(self):
"""Returns a dictionary containing only the 'dbReference' records
relative to PDB, extracted from the full Uniprot record.
"""
return self.PDBrecords
[docs] def getPDBmappings(self, PDBID=None):
"""Returns a list of dictionaries, with mappings of the Uniprot
sequence onto single PDB chains. For each PDB chain, the residue
intervals retrieved from the Uniprot database are parsed into a list
of tuples ('chain_sel') corresponding to endpoints of individual
segments. NB: '@' stands for 'all chains', following Uniprot naming
convention.
"""
if PDBID is None:
return self.PDBmappings
# retrieve record for given PDBID
PDBID = PDBID.upper()
recs = [d for d in self.PDBmappings if d['PDB'] == PDBID]
# there should be only one record for a given PDBID
if len(recs) == 0:
raise ValueError(f'PDBID {PDBID} not found in Uniprot record.')
if len(recs) > 1:
m = f"Multiple entries in Uniprot record for PDBID {PDBID}. "
m += "Only the first one will be considered."
LOGGER.warn(m)
return recs[0]
[docs] def alignSinglePDB(self, PDBID, chain='longest'):
"""Aligns the Uniprot sequence with the sequence from the given
PDB entry.
"""
PDBrecord = self.getPDBmappings(PDBID)
if PDBrecord['chain_seq'] is None:
raise RuntimeError("Unable to parse PDB.")
# retrieve chain mappings. Format: {'A': [(1, 10), (15, 100)]}
mappings = PDBrecord['chain_sel']
# retrieve list of chains from Uniprot record for given PDBID
all_chains = set(mappings.keys())
if '@' in all_chains:
all_chains = PDBrecord['chain_seq'].keys()
# select chains to be aligned
chains_to_align = []
if chain == 'longest':
# align only the longest chain in the PDB file
nCA_max = 0
for c in sorted(all_chains):
nCA = len(PDBrecord['chain_res'][c])
if nCA > nCA_max:
nCA_max = nCA
chains_to_align = [c]
elif chain == 'all' or chain == '@':
# align all chains
chains_to_align = list(all_chains)
elif chain in all_chains:
# align only the requested chain
chains_to_align = [chain]
else:
raise ValueError(f'chain {chain} not found in Uniprot record.')
# align selected chains with BioPython module pairwise2
self._calcAlignments(PDBID, chains_to_align)
# return alignments and maps of selected chains
rec = [d for d in self.PDBmappings if d['PDB'] == PDBID][0]
sel_alignms = {c: rec['alignments'][c] for c in chains_to_align}
sel_maps = {c: rec['maps'][c] for c in chains_to_align}
return sel_alignms, sel_maps
[docs] def alignCustomPDB(self, PDB, chain='all', title=None, recover=False):
"""Aligns the Uniprot sequence with the sequence from the given PDB.
"""
assert isinstance(PDB, (str, pd.Atomic)), \
'PDB must be a PDBID or an Atomic instance (e.g. AtomGroup).'
assert isinstance(chain, str) or all(
isinstance(s, str) for s in chain), \
"'chain' must be a string or a list of strings."
assert isinstance(title, str) or title is None
# parse/import pdb and assign title
if isinstance(PDB, str):
try:
pdb = pd.parsePDB(PDB, subset='calpha')
except Exception as e:
msg = (
'Unable to import PDB: PDB ID might be invalid or '
f'PDB file might be corrupted. Error message: {e}')
LOGGER.error(msg)
if title is None:
title = os.path.basename(PDB.strip())
title = title.replace(' ', '_')
else:
pdb = PDB.ca
if title is None:
title = PDB.getTitle()
# check if a record is already present
rec = [d for d in self.customPDBmappings if d['PDB'] == title]
if recover and len(rec) > 1:
raise RuntimeError('Multiple records found with same ID.')
elif recover and len(rec) == 1:
customPDBrecord = rec[0]
else:
# create record for custom PDB
customPDBrecord = {
'PDB': title,
'chain_res': {},
'chain_seq': {},
'warnings': []
}
self.customPDBmappings.append(customPDBrecord)
# check given chain list
all_chains = set(pdb.getChids())
if chain == 'all' or chain == '@':
chains_to_align = list(all_chains)
elif type(chain) is list:
chains_to_align = chain
else:
chains_to_align = [chain, ]
invalid_chIDs = [c for c in chains_to_align if c not in all_chains]
if invalid_chIDs != []:
raise ValueError('Invalid chain: {}.'.format(invalid_chIDs))
# store resids and sequence of selected chains
for c in chains_to_align:
if c in customPDBrecord['chain_res']:
continue
customPDBrecord['chain_res'][c] = pdb[c].getResnums()
customPDBrecord['chain_seq'][c] = pdb[c].getSequence()
# align selected chains with BioPython module pairwise2
self._calcCustomAlignments(title, chains_to_align)
return customPDBrecord
[docs] def alignAllPDBs(self, chain='longest'):
"""Aligns the Uniprot sequence with the sequences of all PDBs in the
Uniprot record.
"""
assert chain in ['longest', 'all']
PDBIDs_list = [d['PDB'] for d in self.PDBmappings]
for PDBID in PDBIDs_list:
try:
_ = self.alignSinglePDB(PDBID, chain=chain)
except:
continue
return self.PDBmappings
[docs] def mapSingleResidue(self, resid, check_aa=False, depth='best'):
"""Map a single amino acid in a Uniprot sequence to PDBs.
If 'check_aa' is True, it will return only PDB residues with the
wild-type amino acid.
If 'depth' is 'matching', it will use info from Uniprot record to
determine which PDBs contain the given residue, and if 'depth' is 'best'
only the longest chain will be considered and printed, to save time.
If 'depth' is all, it will perform a thorough search among all PDBs (slow).
The matching PDB residues will be sorted, in descending order, according
to the identity of the relative chain with the Uniprot sequence.
"""
assert 1 <= resid <= len(self.sequence), \
'Index out of range: sequence length is {}.'.format(len(self.sequence))
assert type(check_aa) is bool
if check_aa:
aa = self.sequence[resid-1]
else:
aa = None
assert depth in ['best', 'matching', 'all']
matches = []
if depth in ['best', 'matching']:
# trust Uniprot database and find PDBs containing the given resid
# according to Uniprot records
for PDBrecord in self.PDBmappings:
PDBID = PDBrecord['PDB']
chain_sel = PDBrecord['chain_sel']
# e.g. 'chain_sel': {'A': [(1, 9), (15, 20)]}
if chain_sel is None:
# add all chains anyway, if possible
if PDBrecord['chain_seq'] is not None:
chainIDs = PDBrecord['chain_seq'].keys()
else:
chainIDs = []
for chainID in chainIDs:
matches.append((PDBID, chainID, -999))
else:
for chainID, intervals in chain_sel.items():
if None in intervals:
# range is undefined, add it anyway
matches.append((PDBID, chainID, -999))
elif np.any([i[0] <= resid <= i[1] for i in intervals]):
length = sum([i[1]-i[0]+1 for i in intervals])
matches.append((PDBID, chainID, length))
# sort first by length, then by PDBID and chainID
matches.sort(key=lambda x: (-x[2], x[0], x[1]))
else:
# don't trust Uniprot record: select all PDBs for
# alignment to find those containing the given resid
for PDBrecord in self.PDBmappings:
PDBID = PDBrecord['PDB']
for chainID in PDBrecord['chain_sel']:
matches.append((PDBID, chainID, -999))
# now align selected chains to find actual hits
hits = []
for PDBID, chainID, _ in matches:
try:
als, maps = self.alignSinglePDB(PDBID, chain=chainID)
except:
continue
if chainID == '@':
c_list = sorted(maps.keys())
else:
c_list = [chainID]
for c in c_list:
hit = maps[c].get(resid)
if hit is None:
# resid is not found in the chain
continue
elif aa is not None and hit[1] != aa:
# resid is in the chain but has wrong aa type
continue
else:
identity = sum([1 for a1, a2 in zip(als[c][0], als[c][1])
if a1 == a2])
hits.append((PDBID, c, hit[0], hit[1], identity))
if depth == 'best' and len(hits) > 0:
# stop after finding first hit
break
# sort hits first by identity, then by PDBID and chainID
hits.sort(key=lambda x: (-x[4], x[0], x[1]))
if depth == 'best':
hits = hits[:1]
return hits
[docs] def mapSingleRes2CustomPDBs(self, resid, check_aa=False):
"""Map an amino acid in the Uniprot sequence to aligned custom PDBs.
If 'check_aa' is True, it will return only PDB residues with the
wild-type amino acid.
"""
assert 1 <= resid <= len(self.sequence), \
'Index out of range: sequence length is {}.'.format(len(self.sequence))
assert type(check_aa) is bool
if check_aa:
aa = self.sequence[resid-1]
else:
aa = None
hits = []
for rec in self.customPDBmappings:
title = rec['PDB']
als = rec['alignments']
maps = rec['maps']
for c in maps.keys():
hit = maps[c].get(resid)
if hit is None:
# resid is not found in the chain
continue
elif aa is not None and hit[1] != aa:
# resid is in the chain but has wrong aa type
msg = 'Residue was found in chain {} '.format(c)
msg += 'of PDB {} but has wrong aa ({})'.format(title, hit[1])
LOGGER.info(msg)
continue
else:
identity = sum([1 for a1, a2 in zip(als[c][0], als[c][1])
if a1 == a2])
hits.append((title, c, hit[0], hit[1], identity))
# sort hits first by identity, then by title and chainID
hits.sort(key=lambda x: (-x[4], x[0], x[1]))
return hits
[docs] def setAlignAlgorithm(self, align_algorithm=1,
gap_open_penalty=-0.5, gap_ext_penalty=-0.1,
refresh=True):
"""Set the Biopython alignment algorithm used for aligning
Uniprot sequence to PDB sequences. All precomputed alignments
will be deleted.
"""
assert align_algorithm in [0, 1, 2]
# delete old alignments
if refresh:
self.refresh()
# set new alignment parameters
if align_algorithm == 0:
# use fastest alignment algorithm (gaps are not penalized)
self._align_algo_args = ['localxx']
elif align_algorithm == 1:
# gaps are penalized when opened and extended
self._align_algo_args = ['localxs',
gap_open_penalty, gap_open_penalty]
else:
# slow, high quality alignment, with scoring of mismatching chars
# based on BLOSUM62 matrix and penalized opened/extended gaps
self._align_algo_args = ['localds', matlist.blosum62,
gap_open_penalty, gap_open_penalty]
return
[docs] def savePickle(self, filename=None, folder=None, store_custom_PDBs=False):
if folder is None:
folder = SETTINGS.get('rhapsody_local_folder')
if folder is None:
folder = '.'
else:
folder = os.path.join(folder, 'pickles')
if filename is None:
filename = 'UniprotMap-' + self.uniq_acc + '.pkl'
pickle_path = os.path.join(folder, filename)
cache = self.customPDBmappings
if store_custom_PDBs is not True:
# do not store alignments of custom PDBs
self.customPDBmappings = []
# save pickle
pickle.dump(self, open(pickle_path, "wb"))
self.customPDBmappings = cache
LOGGER.info("Pickle '{}' saved.".format(filename))
return pickle_path
[docs] def recoverPickle(self, filename=None, folder=None, days=30, **kwargs):
acc = self.uniq_acc
if acc is None:
# assume acc is equal to uniq_acc
acc = self.acc
if folder is None:
folder = SETTINGS.get('rhapsody_local_folder')
if folder is None:
folder = '.'
else:
folder = os.path.join(folder, 'pickles')
if filename is None:
# assume acc is equal to uniq_acc
acc = self.acc
filename = 'UniprotMap-' + acc + '.pkl'
pickle_path = os.path.join(folder, filename)
if not os.path.isfile(pickle_path):
# import unique accession number
acc = queryUniprot(self.acc)['accession 0']
filename = 'UniprotMap-' + acc + '.pkl'
pickle_path = os.path.join(folder, filename)
else:
pickle_path = os.path.join(folder, filename)
# check if pickle exists
if not os.path.isfile(pickle_path):
raise IOError("File '{}' not found".format(filename))
# load pickle
recovered_self = pickle.load(open(pickle_path, "rb"))
if acc not in [recovered_self.acc, recovered_self.uniq_acc]:
raise ValueError('Accession number in recovered pickle (%s) '
% recovered_self.uniq_acc + 'does not match.')
# check timestamp and ignore pickles that are too old
date_format = "%Y-%m-%d %H:%M:%S.%f"
t_old = datetime.datetime.strptime(recovered_self.timestamp,
date_format)
t_now = datetime.datetime.utcnow()
Delta_t = datetime.timedelta(days=days)
if t_old + Delta_t < t_now:
raise RuntimeError(
'Pickle {} was too old and was ignored.'.format(filename))
self.fullRecord = recovered_self.fullRecord
self.uniq_acc = recovered_self.uniq_acc
self.sequence = recovered_self.sequence
self.PDBrecords = recovered_self.PDBrecords
self.PDBmappings = recovered_self.PDBmappings
self.customPDBmappings = recovered_self.customPDBmappings
self._align_algo_args = recovered_self._align_algo_args
self._align_algo_kwargs = recovered_self._align_algo_kwargs
self.timestamp = recovered_self.timestamp
self.Pfam = recovered_self.Pfam
LOGGER.info("Pickle '{}' recovered.".format(filename))
return
[docs] def resetTimestamp(self):
self.timestamp = str(datetime.datetime.utcnow())
def _checkAccessionNumber(self, acc):
if '-' in acc:
acc = acc.split('-')[0]
message = 'Isoforms are not allowed, the main sequence for ' + \
acc + ' will be used instead.'
LOGGER.warn(message)
return acc
def _parseSelString(self, sel_str):
# example: "A/B/C=15-100, D=30-200"
# or: "@=10-200"
parsedSelStr = {}
for segment in sel_str.replace(' ', '').split(','):
fields = segment.split('=')
chains = fields[0].split('/')
resids = fields[1].split('-')
try:
resids = tuple([int(s) for s in resids])
except Exception:
# sometimes the interval is undefined,
# e.g. "A=-"
resids = None
for chain in chains:
parsedSelStr.setdefault(chain, []).append(resids)
return parsedSelStr
def _initiatePDBmappings(self):
illegal_chars = r"[^A-Za-z0-9-@=/,\s]"
PDBmappings = []
for singlePDBrecord in self.PDBrecords:
PDBID = singlePDBrecord.get('PDB').upper()
mapping = {'PDB': PDBID,
'chain_sel': None,
'chain_res': None,
'chain_seq': None,
'warnings': []}
# import selection string
sel_str = singlePDBrecord.get('chains')
if sel_str is None:
mapping['warnings'].append('Empty selection string.')
else:
# check for illegal characters in selection string
match = re.search(illegal_chars, sel_str)
if match:
chars = re.findall(illegal_chars, sel_str)
message = "Illegal characters found in 'chains' " \
+ 'selection string: ' + ' '.join(chars)
mapping['warnings'].append(message)
else:
parsed_sel_str = self._parseSelString(sel_str)
mapping['chain_sel'] = parsed_sel_str
# store resids and sequence of PDB chains
try:
pdb = pd.parsePDB(PDBID, subset='calpha')
mapping['chain_res'] = {}
mapping['chain_seq'] = {}
for c in set(pdb.getChids()):
mapping['chain_res'][c] = pdb[c].getResnums()
mapping['chain_seq'][c] = pdb[c].getSequence()
except Exception as e:
mapping['chain_res'] = None
mapping['chain_seq'] = None
msg = "Error while parsing PDB: {}".format(e)
mapping['warnings'].append(msg)
LOGGER.warn(msg)
PDBmappings.append(mapping)
self.PDBmappings = PDBmappings
if PDBmappings == []:
LOGGER.warn('No PDB entries have been found '
'that map to given sequence.')
return
def _align(self, seqU, seqC, PDBresids, print_info=False):
algo = self._align_algo_args[0]
args = self._align_algo_args[1:]
kwargs = self._align_algo_kwargs
# align Uniprot and PDB sequences
al = None
if algo == 'localxx':
al = bioalign.localxx(seqU, seqC, *args, **kwargs)
elif algo == 'localxs':
al = bioalign.localxs(seqU, seqC, *args, **kwargs)
else:
al = bioalign.localds(seqU, seqC, *args, **kwargs)
if print_info is True:
info = format_alignment(*al[0])
LOGGER.info(info[:-1])
idnt = sum([1 for a1, a2 in zip(al[0][0], al[0][1]) if a1 == a2])
frac = idnt/len(seqC)
m = "{} out of {} ({:.1%}) residues".format(idnt, len(seqC), frac)
m += " in the chain are identical to Uniprot amino acids."
LOGGER.info(m)
# compute mapping between Uniprot and PDB chain resids
aligned_seqU = al[0][0]
aligned_seqC = al[0][1]
mp = {}
resid_U = 0
resindx_PDB = 0
for i in range(len(aligned_seqU)):
aaU = aligned_seqU[i]
aaC = aligned_seqC[i]
if aaU != '-':
resid_U += 1
if aaC != '-':
mp[resid_U] = (PDBresids[resindx_PDB], aaC)
if aaC != '-':
resindx_PDB += 1
return al[0][:2], mp
def _quickAlign(self, seqU, seqC, PDBresids):
'''Works only if PDB sequence and resids perfectly match
those found in Uniprot.'''
s = ['-'] * len(seqU)
mp = {}
for resid, aaC in zip(PDBresids, seqC):
indx = resid-1
try:
aaU = seqU[indx]
except:
raise RuntimeError('Invalid resid in PDB.')
if resid in mp:
raise RuntimeError('Duplicate resid in PDB.')
elif aaC != aaU:
raise RuntimeError('Non-WT aa in PDB sequence.')
else:
mp[resid] = (resid, aaC)
s[indx] = aaC
aligned_seqC = "".join(s)
return (seqU, aligned_seqC), mp
def _calcAlignments(self, PDBID, chains_to_align):
seqUniprot = self.sequence
PDBrecord = self.getPDBmappings(PDBID)
alignments = PDBrecord.setdefault('alignments', {})
maps = PDBrecord.setdefault('maps', {})
for c in chains_to_align:
# check for precomputed alignments and maps
if c in alignments:
continue
# otherwise, align and map to PDB resids
PDBresids = PDBrecord['chain_res'][c]
seqChain = PDBrecord['chain_seq'][c]
LOGGER.timeit('_align')
try:
a, m = self._quickAlign(seqUniprot, seqChain, PDBresids)
msg = "Chain {} in {} was quick-aligned".format(c, PDBID)
except:
a, m = self._align(seqUniprot, seqChain, PDBresids)
msg = "Chain {} in {} was aligned".format(c, PDBID)
LOGGER.report(msg + ' in %.1fs.', '_align')
# store alignments and maps into PDBmappings
alignments[c] = a
maps[c] = m
return
def _calcCustomAlignments(self, title, chains_to_align):
seqUniprot = self.sequence
PDBrecord = [d for d in self.customPDBmappings
if d['PDB'] == title][0]
alignments = PDBrecord.setdefault('alignments', {})
maps = PDBrecord.setdefault('maps', {})
for c in chains_to_align:
# check for precomputed alignments and maps
if c in alignments:
continue
# otherwise, align and map to PDB resids
PDBresids = PDBrecord['chain_res'][c]
seqChain = PDBrecord['chain_seq'][c]
LOGGER.timeit('_align')
try:
a, m = self._quickAlign(seqUniprot, seqChain, PDBresids)
msg = f"Chain {c} was quick-aligned"
except:
LOGGER.info(f"Aligning chain {c} of custom PDB {title}...")
a, m = self._align(seqUniprot, seqChain, PDBresids,
print_info=True)
msg = f"Chain {c} was aligned"
LOGGER.report(msg + ' in %.1fs.', '_align')
# store alignments and maps into PDBmappings
alignments[c] = a
maps[c] = m
return
# PFAM methods
def _searchPfam(self, refresh=False, **kwargs):
assert type(refresh) is bool
if refresh is True or self.Pfam is None:
try:
self.Pfam = pd.searchPfam(self.uniq_acc, **kwargs)
except:
self.Pfam = {}
raise
return self.Pfam
def _sliceMSA(self, msa):
acc_name = self.fullRecord['name 0']
# find sequences in MSA related to the given Uniprot name
indexes = msa.getIndex(acc_name)
if indexes is None:
raise RuntimeError('No sequence found in MSA for {}'.format(acc_name))
elif type(indexes) is not list:
indexes = [indexes]
# slice MSA to include only columns from selected sequences
cols = np.array([], dtype=int)
arr = msa._getArray()
for i in indexes:
cols = np.append(cols, np.char.isalpha(arr[i]).nonzero()[0])
cols = np.unique(cols)
arr = arr.take(cols, 1)
sliced_msa = pd.MSA(arr, title='refined', labels=msa._labels)
LOGGER.info('Number of columns in MSA reduced to {}.'.format(
sliced_msa.numResidues()))
return sliced_msa, indexes
def _mapUniprot2Pfam(self, PF_ID, msa, indexes):
def compareSeqs(s1, s2, tol=0.01):
if len(s1) != len(s2):
return None
seqid = sum(np.array(list(s1)) == np.array(list(s2)))
seqid = seqid/len(s1)
if (1 - seqid) > tol:
return None
return seqid
# fetch sequences from Pfam (all locations)
m = [None]*len(self.sequence)
sP_list = []
for i in indexes:
arr = msa[i].getArray()
cols = np.char.isalpha(arr).nonzero()[0]
sP = str(arr[cols], 'utf-8').upper()
sP_list.append((sP, cols))
# NB: it's not known which msa index corresponds
# to each location
for l in self.Pfam[PF_ID]['locations']:
r_i = int(l['start']) - 1
r_f = int(l['end']) - 1
sU = self.sequence[r_i:r_f+1]
max_seqid = 0.
for sP, cols in sP_list:
seqid = compareSeqs(sU, sP)
if seqid is None:
continue
if seqid > max_seqid:
max_seqid = seqid
m[r_i:r_f+1] = cols
if np.allclose(seqid, 1):
break
return {k: v for k, v in enumerate(m) if v is not None}
[docs] def calcEvolProperties(self, resid='all', refresh=False, folder=None,
max_cols=None, max_seqs=25000, **kwargs):
''' Computes Evol properties, i.e. Shannon entropy, Mutual
Information and Direct Information, from Pfam Multiple
Sequence Alignments, for a given residue.
'''
assert type(refresh) is bool
# recover Pfam mapping (if not found already)
self._searchPfam(refresh=refresh)
if resid == 'all':
PF_list = self.Pfam.keys()
else:
# get list of Pfam domains containing resid
PF_list = [k for k in self.Pfam if any([
resid >= int(segment['start']) and
resid <= int(segment['end'])
for segment in self.Pfam[k]['locations']
])
]
if len(PF_list) == 0:
raise RuntimeError(f'No Pfam domain for resid {resid}.')
if len(PF_list) > 1:
LOGGER.warn(f'Residue {resid} is found in multiple '
'({}) Pfam domains.'.format(len(PF_list)))
if folder is None:
folder = SETTINGS.get('rhapsody_local_folder')
if folder is None:
folder = '.'
else:
folder = os.path.join(folder, 'pickles')
# iterate over Pfam families
for PF in PF_list:
d = self.Pfam[PF]
# skip if properties are pre-computed
if not refresh and d.get('mapping') is not None:
continue
d['mapping'] = None
d['ref_MSA'] = None
d['entropy'] = np.nan
d['MutInfo'] = np.nan
d['DirInfo'] = np.nan
try:
LOGGER.info('Processing {}...'.format(PF))
# fetch & parse MSA
# fname = PF + '_full.sth'
# fullname = os.path.join(folder, fname)
# if not os.path.isfile(fullname):
# f = fetchPfamMSA(PF)
# shutil.move(f, folder)
# msa = parseMSA(fullname, **kwargs)
# fetch & parse MSA without saving downloaded MSA
f = pd.fetchPfamMSA(PF)
msa = pd.parseMSA(f, **kwargs)
os.remove(f)
# slice MSA to match all segments of the Uniprot sequence
sliced_msa, indexes = self._sliceMSA(msa)
# if max_cols is not None and sliced_msa.numResidues() > max_cols:
# raise Exception('Unable to compute DI: MSA has ' +\
# 'too many columns (max: {}).'.format(max_cols))
# get mapping between Uniprot sequence and Pfam domain
d['mapping'] = self._mapUniprot2Pfam(PF, sliced_msa, indexes)
except Exception as e:
LOGGER.warn('{}: {}'.format(PF, e))
d['mapping'] = str(e)
continue
try:
# refine MSA ('seqid' param. is set as in PolyPhen-2)
rowocc = 0.6
while True:
sliced_msa = pd.refineMSA(sliced_msa, rowocc=rowocc)
rowocc += 0.02
if sliced_msa.numSequences() <= max_seqs or rowocc >= 1:
break
ref_msa = pd.refineMSA(sliced_msa, seqid=0.94, **kwargs)
d['ref_MSA'] = ref_msa
# compute evolutionary properties
d['entropy'] = pd.calcShannonEntropy(ref_msa)
d['MutInfo'] = pd.buildMutinfoMatrix(ref_msa)
# d['DirInfo'] = buildDirectInfoMatrix(ref_msa)
except Exception as e:
LOGGER.warn('{}: {}'.format(PF, e))
return {k: self.Pfam[k] for k in PF_list}
[docs]def mapSAVs2PDB(SAV_coords, custom_PDB=None, refresh=False,
status_file=None, status_prefix=None):
LOGGER.info('Mapping SAVs to PDB structures...')
LOGGER.timeit('_map2PDB')
# sort SAVs, so to group together those
# with identical accession number
accs = [s.split()[0] for s in SAV_coords]
sorting_map = np.argsort(accs)
# define a structured array
PDBmap_dtype = np.dtype([
('orig. SAV coords', 'U25'),
('unique SAV coords', 'U25'),
('PDB SAV coords', 'U100'),
('PDB size', 'i')])
nSAVs = len(SAV_coords)
mapped_SAVs = np.zeros(nSAVs, dtype=PDBmap_dtype)
# define how to report progress
if status_prefix is None:
status_prefix = ''
bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]'
if status_file is not None:
status_file = open(status_file, 'w')
progress_bar = tqdm(
[(i, SAV_coords[i]) for i in sorting_map], file=status_file,
bar_format=bar_format+'\n')
else:
progress_bar = tqdm(
[(i, SAV_coords[i]) for i in sorting_map], bar_format=bar_format)
# map to PDB using Uniprot class
cache = {'acc': None, 'obj': None}
count = 0
for indx, SAV in progress_bar:
count += 1
acc, pos, aa1, aa2 = SAV.split()
pos = int(pos)
# report progress
progress_msg = f"{status_prefix}Mapping SAV '{SAV}' to PDB"
# LOGGER.info(f"[{count}/{nSAVs}] {progress_msg}...")
progress_bar.set_description(progress_msg)
# map Uniprot to PDB chains
if acc == cache['acc']:
# use mapping from previous iteration
U2P_map = cache['obj']
else:
# save previous mapping
if isinstance(cache['obj'], UniprotMapping):
cache['obj'].savePickle()
cache['acc'] = acc
# compute the new mapping
try:
U2P_map = UniprotMapping(acc, recover_pickle=not(refresh))
if custom_PDB is not None:
LOGGER.info('Aligning Uniprot sequence to custom PDB...')
U2P_map.alignCustomPDB(custom_PDB, 'all')
except Exception as e:
U2P_map = str(e)
cache['obj'] = U2P_map
# map specific SAV
try:
if isinstance(U2P_map, str):
raise RuntimeError(U2P_map)
# check wt aa
if not 0 < pos <= len(U2P_map.sequence):
raise ValueError('Index out of range')
wt_aa = U2P_map.sequence[pos-1]
if aa1 != wt_aa:
raise ValueError(f'Incorrect wt aa: {aa1} instead of {wt_aa}')
# map to PDB. Format: [('2DZF', 'A', 150, 'N', 335)]
if custom_PDB is None:
r = U2P_map.mapSingleResidue(pos, check_aa=True)
else:
r = U2P_map.mapSingleRes2CustomPDBs(pos, check_aa=True)
if len(r) == 0:
raise RuntimeError('Unable to map SAV to PDB')
else:
PDBID, chID, resid, aa, PDB_size = r[0]
# NB: check for blank "chain" field
if chID.strip() == '':
chID = '?'
res_map = f'{PDBID} {chID} {resid} {aa}'
except Exception as e:
res_map = str(e)
PDB_size = 0
# store SAVs mapped on PDB chains and unique Uniprot coordinates
if isinstance(U2P_map, str):
uniq_coords = U2P_map
else:
uniq_coords = f'{U2P_map.uniq_acc} {pos} {aa1} {aa2}'
mapped_SAVs[indx] = (SAV, uniq_coords, res_map, PDB_size)
# save last pickle
if isinstance(cache['obj'], UniprotMapping):
cache['obj'].savePickle()
n = sum(mapped_SAVs['PDB size'] != 0)
LOGGER.report(f'{n} out of {nSAVs} SAVs have been mapped to PDB in %.1fs.',
'_map2PDB')
if status_file:
os.remove(status_file.name)
return mapped_SAVs
[docs]def seqScanning(Uniprot_coord, sequence=None):
'''Returns a list of SAVs. If the string 'Uniprot_coord' is just a
Uniprot ID, the list will contain all possible amino acid substitutions
at all positions in the sequence. If 'Uniprot_coord' also includes a
specific position, the list will only contain all possible amino acid
variants at that position. If 'sequence' is 'None' (default), the
sequence will be downloaded from Uniprot.
'''
assert isinstance(Uniprot_coord, str), "Must be a string."
coord = Uniprot_coord.upper().strip().split()
assert len(coord) < 3, "Invalid format. Examples: 'Q9BW27' or 'Q9BW27 10'."
aa_list = 'ACDEFGHIKLMNPQRSTVWY'
if sequence is None:
Uniprot_record = queryUniprot(coord[0])
sequence = Uniprot_record['sequence 0'].replace("\n", "")
else:
assert isinstance(sequence, str), "Must be a string."
sequence = sequence.upper()
assert set(sequence).issubset(aa_list), "Invalid list of amino acids."
if len(coord) == 1:
# user asks for full-sequence scanning
positions = range(len(sequence))
else:
# user asks for single-site scanning
site = int(coord[1])
positions = [site - 1]
# if user provides only one amino acid as 'sequence', interpret it
# as the amino acid at the specified position
if len(sequence) == 1:
sequence = sequence*site
else:
assert len(sequence) >= site, ("Requested position is not found "
"in input sequence.")
SAV_list = []
acc = coord[0]
for i in positions:
wt_aa = sequence[i]
for aa in aa_list:
if aa == wt_aa:
continue
s = ' '.join([acc, str(i+1), wt_aa, aa])
SAV_list.append(s)
return SAV_list
[docs]def printSAVlist(input_SAVs, filename):
if isinstance(input_SAVs, str):
input_SAVs = [input_SAVs]
with open(filename, 'w', 1) as f:
for i, line in enumerate(input_SAVs):
m = f'error in SAV {i}: '
assert isinstance(line, str), f'{m} not a string'
assert len(line) < 25, f'{m} too many characters'
print(line.upper(), file=f)
return filename