Commit bddba8c2 authored by Martino Bertoni's avatar Martino Bertoni 🌋
Browse files

improved parsing of molecules, added option to drop invalid molecules

parent 2b107646
......@@ -120,10 +120,45 @@ class Signaturizer(object):
else:
RDLogger.DisableLog('rdApp.*')
def predict(self, molecules, destination=None, keytype='SMILES',
save_mfp=False, chunk_size=32, batch_size=128,
compression=None, y_scramble=False, keys=None):
"""Predict signatures for given SMILES.
def _smiles_to_mol(self, molecules, keys, drop_invalid=True):
mol_objects = list()
valid_keys = list()
for smi, key in tqdm(zip(molecules, keys), desc='Parsing SMILES'):
if smi == '':
smi = 'INVALID SMILES'
mol = Chem.MolFromSmiles(smi)
if mol is None:
if self.verbose:
print("Cannot get molecule from SMILES: %s." % smi)
if drop_invalid:
continue
valid_keys.append(key)
mol_objects.append(mol)
return mol_objects, valid_keys
def _inchi_to_mol(self, molecules, keys, drop_invalid=True):
mol_objects = list()
valid_keys = list()
for inchi, key in tqdm(zip(molecules, keys), desc='Parsing InChI'):
inchi = inchi.encode('ascii', 'ignore')
if inchi == '':
inchi = 'INVALID InChI'
mol = Chem.MolFromInchi(inchi)
if mol is None:
if self.verbose:
print("Cannot get molecule from InChI: %s." % inchi)
if drop_invalid:
continue
valid_keys.append(key)
mol_objects.append(mol)
return mol_objects, valid_keys
def predict(self, molecules, destination=None, molecule_fmt='SMILES',
keys=None, save_mfp=False, drop_invalid=True,
batch_size=128, chunk_size=32,
compression=None, y_scramble=False,):
"""Predict signatures for given molecules.
Perform signature prediction for input SMILES. We recommend that the
list is sorted and non-redundant, but this is optional. Some input
......@@ -131,14 +166,21 @@ class Signaturizer(object):
is possible and the corresponding signature will be set to NaN.
Args:
molecules(list): List of strings representing molecules. Can be
SMILES (by default) or InChI.
destination(str): File path where to save predictions.
keytype(str): Whether to interpret molecules as InChI or SMILES.
save_mfp(bool): if True and additional matrix with the Morgan
Fingerprint is saved.
chunk_size(int): Perform prediction on chunks of this size.
molecules(list): List of strings representing molecules.
Can be SMILES (by default), InChI or RDKIT molecule objects.
destination(str): File path where to save predictions. If file
exists already it will throw an exception.
molecule_fmt(str): Molecule format, whether to interpret molecules
as InChI, SMILES or RDKIT.
keys(list): A list of keys that will be saved along with the
predictions.
save_mfp(bool): Set to True to save an additional matrix with
classical Morgan Fingerprint ECFP4.
drop_invalid(bool): Wether to drop invalid molecules i.e. molecules
that cannot be interpreted with RdKit.
batch_size(int): Batch size for prediction.
chunk_size(int): Chunk size for the reulting HDF5.
compression(str): Compression used for storing the HDF5.
y_scramble(bool): Validation test scrambling the MFP before
prediction.
Returns:
......@@ -148,38 +190,28 @@ class Signaturizer(object):
# input must be a list, otherwise we make it so
if isinstance(molecules, str):
molecules = [molecules]
# convert input molecules to InChI
inchies = list()
# if keys are not specified just use incremental numbers
if keys is None:
keys = molecules
valid_keys = list()
if keytype.upper() == 'SMILES':
for smi, key in tqdm(zip(molecules, keys), desc='Converting to InChI'):
if smi == '':
smi = 'INVALID SMILES'
mol = Chem.MolFromSmiles(smi)
if mol is None:
if self.verbose:
print("Cannot get molecule from SMILES: %s." % smi)
inchies.append('INVALID SMILES')
continue
inchi = Chem.rdinchi.MolToInchi(mol)[0]
if self.verbose:
#print('CONVERTED:', smi, inchi)
pass
inchies.append(inchi)
valid_keys.append(key)
keys = [str(x) for x in range(len(molecules))]
# convert input molecules to molecule object
if molecule_fmt.upper() == 'SMILES':
molecules, keys = self._smiles_to_mol(
molecules, keys=keys, drop_invalid=drop_invalid)
elif molecule_fmt.upper() == 'INCHI':
molecules, keys = self._inchi_to_mol(
molecules, keys=keys, drop_invalid=drop_invalid)
elif molecule_fmt.upper() == 'RDKIT':
molecules, keys = molecules, keys
else:
inchies = molecules
self.inchies = inchies
# if inchies are the input we still need a valid keys list
if len(valid_keys) == 0:
valid_keys = keys
# Prepare result object
raise Exception('Unsupported molecule format `%s`' % molecule_fmt)
# prepare result object
features = len(self.model_names) * 128
chunk_size = min(chunk_size, len(molecules))
results = SignaturizerResult(
len(inchies), destination, features, save_mfp=save_mfp,
chunk_size=chunk_size, compression=compression, keys=valid_keys)
len(molecules), destination, features, save_mfp=save_mfp,
chunk_size=chunk_size, compression=compression, keys=keys)
results.dataset[:] = self.model_names
if results.readonly:
raise Exception(
......@@ -187,22 +219,14 @@ class Signaturizer(object):
'delete or rename to proceed.')
# predict by chunk
all_chunks = range(0, len(inchies), batch_size)
all_chunks = range(0, len(molecules), batch_size)
for i in tqdm(all_chunks, desc='Generating signatures'):
chunk = slice(i, i + batch_size)
# prepare predictor input
sign0s = list()
failed = list()
for idx, inchi in enumerate(inchies[chunk]):
for idx, mol in enumerate(molecules[chunk]):
try:
# read molecule
inchi = inchi.encode('ascii', 'ignore')
if self.verbose:
# print('READING', inchi, type(inchi))
pass
mol = Chem.inchi.MolFromInchi(inchi)
if mol is None:
raise Exception(
"Cannot get molecule from InChI.")
info = {}
fp = AllChem.GetMorganFingerprintAsBitVect(
mol, 2, nBits=2048, bitInfo=info)
......@@ -212,7 +236,7 @@ class Signaturizer(object):
except Exception as err:
# in case of failure save idx to fill NaNs
if self.verbose:
print("SKIPPING %s: %s" % (inchi, str(err)))
print("FAILED %s: %s" % (idx, str(err)))
failed.append(idx)
calc_s0 = np.full((2048, ), np.nan)
finally:
......@@ -223,9 +247,11 @@ class Signaturizer(object):
y_shuffle = np.arange(sign0s.shape[1])
np.random.shuffle(y_shuffle)
sign0s = sign0s[:, y_shuffle]
preds = self.model.predict(tf.convert_to_tensor(sign0s, dtype=tf.float32),
batch_size=batch_size)
# add NaN where SMILES conversion failed
# run prediction
preds = self.model.predict(
tf.convert_to_tensor(sign0s, dtype=tf.float32),
batch_size=batch_size)
# add NaN where conversion failed
if failed:
preds[np.array(failed)] = np.full(features, np.nan)
results.signature[chunk] = preds
......@@ -241,7 +267,7 @@ class Signaturizer(object):
results.applicability[chunk] = apreds
failed = np.isnan(results.signature[:, 0])
results.failed[:] = np.isnan(results.signature[:, 0])
results.keys[:] = valid_keys
results.keys[:] = keys
results.close()
if self.verbose:
print('PREDICTION complete!')
......
......@@ -97,9 +97,9 @@ class TestExporter(unittest.TestCase):
# load CC instance and smiles prediction model
cc = ChemicalChecker()
s3 = cc.signature('B1.001', 'sign3')
s4 = cc.signature('B1.001', 'sign4')
tmp_pred_ref = os.path.join(self.tmp_dir, 'tmp.h5')
s3.predict_from_smiles(self.test_smiles, tmp_pred_ref)
s4.predict_from_smiles(self.test_smiles, tmp_pred_ref)
pred_ref = DataSignature(tmp_pred_ref)[:]
# export smilespred
......@@ -109,7 +109,7 @@ class TestExporter(unittest.TestCase):
self.tmp_dir, version, module_file)
tmp_path_smilespred = os.path.join(self.tmp_dir, 'export_smilespred')
export_smilespred(
os.path.join(s3.model_path, 'smiles_final'),
os.path.join(s4.model_path, 'smiles_final'),
module_destination, tmp_path=tmp_path_smilespred, clear_tmp=False)
# test intermediate step
module = Signaturizer(tmp_path_smilespred, local=True)
......
......@@ -25,6 +25,9 @@ class TestSignaturizer(unittest.TestCase):
]
self.invalid_smiles = ['C', 'C&', 'C']
self.tautomer_smiles = ['CC(O)=Nc1ccc(O)cc1', 'CC(=O)NC1=CC=C(C=C1)O']
self.enantiomer_smiles = [
'C1CC(=O)NC(=O)[C@H]1N2C(=O)C3=CC=CC=C3C2=O',
'C1CC(=O)NC(=O)[C@@H]1N2C(=O)C3=CC=CC=C3C2=O']
self.inchi = [
'InChI=1S/C8H9NO2/c1-6(10)9-7-2-4-8(11)5-3-7/h2-5,11H,1H3,(H,9,10)'
]
......@@ -40,7 +43,7 @@ class TestSignaturizer(unittest.TestCase):
pred_ref = np.load(open(ref_file, 'rb'))
# load module and predict
module_dir = os.path.join(self.data_dir, 'B1')
module = Signaturizer(module_dir, local=True)
module = Signaturizer(module_dir, local=True, verbose=True)
res = module.predict(self.test_smiles)
np.testing.assert_almost_equal(pred_ref, res.signature[:])
# test saving to file
......@@ -51,13 +54,20 @@ class TestSignaturizer(unittest.TestCase):
self.assertEqual(len(res.applicability[:]), 2)
self.assertFalse(np.isnan(res.applicability[0]))
# test prediction of invalid SMILES
res = module.predict(self.invalid_smiles)
res = module.predict(self.invalid_smiles, drop_invalid=False)
for comp in res.signature[0]:
self.assertFalse(math.isnan(comp))
for comp in res.signature[1]:
self.assertTrue(math.isnan(comp))
for comp in res.signature[2]:
self.assertFalse(math.isnan(comp))
self.assertTrue(all(res.failed == [False, True, False]))
res = module.predict(self.invalid_smiles, drop_invalid=True)
for comp in res.signature[0]:
self.assertFalse(math.isnan(comp))
for comp in res.signature[1]:
self.assertFalse(math.isnan(comp))
self.assertTrue(all(res.failed == [False, False]))
def test_predict_multi(self):
# load multiple model and check that results stacked correctly
......@@ -81,7 +91,7 @@ class TestSignaturizer(unittest.TestCase):
np.testing.assert_almost_equal(res_A1B1.signature[:, 128:],
res_B1.signature)
res = module_A1B1.predict(self.invalid_smiles)
res = module_A1B1.predict(self.invalid_smiles, drop_invalid=False)
for comp in res.signature[0]:
self.assertFalse(math.isnan(comp))
for comp in res.signature[1]:
......@@ -108,11 +118,23 @@ class TestSignaturizer(unittest.TestCase):
def test_tautomers(self):
module = Signaturizer('A1')
res = module.predict(self.tautomer_smiles)
self.assertTrue(all(res.signature[0] == res.signature[1]))
# they are molecules with different connectivity so must be different
np.testing.assert_raises(
AssertionError, np.testing.assert_array_equal,
res.signature[0], res.signature[1])
def test_enantiomers(self):
module = Signaturizer('A1')
res = module.predict(self.enantiomer_smiles, save_mfp=True)
# they are molecules with different chirality
# not yet handled as they are have identical MFP
print(np.argwhere(res.mfp[0]).ravel())
print(np.argwhere(res.mfp[1]).ravel())
self.assertTrue(all(res.signature[0] == res.signature[0]))
def test_inchi(self):
module = Signaturizer('A1')
res_inchi = module.predict(self.inchi, keytype='InChI')
res_inchi = module.predict(self.inchi, molecule_fmt='InChI')
res_smiles = module.predict([self.tautomer_smiles[0]])
self.assertTrue(all(res_inchi.signature[0] == res_smiles.signature[0]))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment