#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import bz2
import collections.abc
import datetime
import multiprocessing as mp
import os
import random
import shutil
import subprocess
import tempfile
import time
from copy import deepcopy
from inspect import signature
from typing import Callable, Dict, Iterable, List, Union
from warnings import warn
import dill as pickle
import numpy as np
import pandas as pd
import tqdm
from crem.crem import grow_mol, mutate_mol
from meeko import (MoleculePreparation, PDBQTMolecule, PDBQTWriterLegacy,
RDKitMolCreate)
from rdkit import Chem, RDLogger
from rdkit.Chem import AllChem, DataStructs, Descriptors, Lipinski, rdFMCS
from moldrug import __version__
RDLogger.DisableLog('rdApp.*')
# # in order to pickle the isotope properties of the molecule
# Chem.SetDefaultPickleProperties(Chem.PropertyPickleOptions.AllProps)
################################################
# Here are some important functions to work with
################################################
[docs]
def run(command: str, shell: bool = True, executable: str = '/bin/bash'):
"""This function is just a useful wrapper around subprocess.run
Parameters
----------
command : str
Any command to execute.
shell : bool, optional
keyword of ``subprocess.Popen`` and ``subprocess.Popen``, by default True
executable : str, optional
keyword of ``subprocess.Popen`` and ``subprocess.Popen``, by default '/bin/bash'
Returns
-------
object
The processes returned by Run.
Raises
------
RuntimeError
In case of non-zero exit status on the provided command.
"""
process = subprocess.run(command, shell=shell, executable=executable,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
returncode = process.returncode
if returncode != 0:
# print(f'Command {command} returned non-zero exit status {returncode}')
raise RuntimeError(process.stderr)
return process
[docs]
def confgen(mol: Chem.rdchem.Mol, return_mol: bool = False, randomseed: Union[int, None] = None):
"""Create a 3D model from a smiles and return a pdbqt string and, a mol if ``return_mol = True``.
Parameters
----------
mol : Chem.rdchem.Mol
A valid RDKit molecule.
return_mol : bool, optional
If true the function will also return the ``rdkit.Chem.rdchem.Mol``, by default False
randomseed : Union[None, int], optional
Provide a seed for the random number generator so that the same coordinates
can be obtained for a molecule on multiple runs. If None, the RNG will not be seeded, by default None
Returns
-------
tuple or str
If ``return_mol = True`` it will return a tuple ``(str[pdbqt], Chem.rdchem.Mol)``,
if not only a ``str`` that represents the pdbqt.
"""
mol = Chem.AddHs(mol)
if randomseed is None:
randomSeed = -1
else:
randomSeed = randomseed
AllChem.EmbedMolecule(mol, randomSeed=randomSeed)
# The optimization introduce some sort of non-reproducible results.
# For that reason is not used when randomseed is set
if not randomseed:
AllChem.MMFFOptimizeMolecule(mol, maxIters=500)
preparator = MoleculePreparation()
mol_setups = preparator.prepare(mol)
pdbqt_string = PDBQTWriterLegacy.write_string(mol_setups[0])[0]
if return_mol:
return (pdbqt_string, mol)
else:
return pdbqt_string
[docs]
def update_reactant_zone(parent: Chem.rdchem.Mol, offspring: Chem.rdchem.Mol,
parent_replace_ids: List[int] = None, parent_protected_ids: List[int] = None):
"""This function will find the difference between offspring and parent
based on the Maximum Common Substructure (MCS).
This difference will be consider offspring_replace_ids.
Because after a reaction the indexes of the product could change respect to the reactant,
the parent_replace_ids could change.
The function will map the index of the parent to the offspring based on MCS. If on those indexes some of the
parent_replace_ids are still present, they will be updated based on the offspring and also added
to offspring_replace_ids. Similarly will be done for the parent_protected_ids.
Parameters
----------
parent : Chem.rdchem.Mol
The original molecule from where offspring was generated
offspring : Chem.rdchem.Mol
A derivative of parent
parent_replace_ids : List[int], optional
A list of replaceable indexes in the parent, by default None
parent_protected_ids : List[int], optional
A list of protected indexes in the parent, by default None
Returns
-------
tuple[list[int]]
The function returns a tuple composed by two list of integers.
The first list is offspring_replace_ids and the second one offspring_protected_ids.
"""
# Finding Maximum Common Substructure (MCS) and getting the SMARTS
mcs = rdFMCS.FindMCS([parent, offspring], matchValences=True, ringMatchesRingOnly=True)
mcs_mol = Chem.MolFromSmarts(mcs.smartsString)
# Get the index corresponding to the MCS for both parent and offspring
# The ordering of the indices corresponds to the atom ordering in GetSubstructMatch.
# Therefore we have a mapping between the two molecules
match_parent = parent.GetSubstructMatch(mcs_mol)
match_offspring = offspring.GetSubstructMatch(mcs_mol)
mapping = dict(zip(match_parent, match_offspring))
offspring_replace_ids = []
for atom in offspring.GetAtoms():
if atom.GetIdx() not in match_offspring:
offspring_replace_ids.append(atom.GetIdx())
# Because after a reaction the index could change we must update the original index.
# If the try raise exception is because the parent_replace_ids
# are not present in the MCS what means that they were already submitted to a reaction
# Therefore we dont need to update them neither add to the offspring_replace_ids
if parent_replace_ids:
for idx in parent_replace_ids:
try:
offspring_replace_ids.append(mapping[idx])
except KeyError:
pass
offspring_protected_ids = []
if parent_protected_ids:
for idx in parent_protected_ids:
try:
offspring_protected_ids.append(mapping[idx])
except KeyError:
pass
return offspring_replace_ids, offspring_protected_ids
[docs]
def get_sim(ms: List[Chem.rdchem.Mol], ref_fps: List):
"""Get the molecules with higher similarity to each member of ref_fps.
Parameters
----------
ms : list[Chem.rdchem.Mol]
List of molecules
ref_fps : list[AllChem.GetMorganFingerprintAsBitVect(mol, 2)]
A list of reference fingerprints
Returns
-------
list[Chem.rdchem.Mol]
A list of molecules with the higher similarity with their corresponded ref_fps value.
"""
output = []
fps1 = [AllChem.GetMorganFingerprintAsBitVect(m, 2) for m in ms]
for fp in fps1:
v = DataStructs.BulkTanimotoSimilarity(fp, ref_fps)
i = np.argmax(v)
output.append([v[i], i])
return output
[docs]
def get_similar_mols(mols: List, ref_mol: Chem.rdchem.Mol, pick: int, beta: float = 0.01):
"""Pick the similar molecules from mols respect to ref_mol using a roulette wheel selection strategy.
Parameters
----------
mols : list
The list of molecules from where to pick molecules.
ref_mol : Chem.rdchem.Mol
The reference molecule
pick : int
Number of molecules to pick from mols
beta : float, optional
Selection threshold, by default 0.01
Returns
-------
list
A list of picked molecules.
"""
if pick >= len(mols):
return mols
else:
ref_fp = AllChem.GetMorganFingerprintAsBitVect(ref_mol, 2)
fps = [AllChem.GetMorganFingerprintAsBitVect(mol, 2) for mol in mols]
similarities = np.array(DataStructs.BulkTanimotoSimilarity(ref_fp, fps))
probs = np.exp(beta * similarities) / np.sum(np.exp(beta * similarities))
cumsum = np.cumsum(probs)
indexes = set()
while len(indexes) != pick:
r = sum(probs) * random.random() # noqa
indexes.add(np.argwhere(r <= cumsum)[0][0])
return [mols[index] for index in indexes]
[docs]
def lipinski_filter(mol: Chem.rdchem.Mol, maxviolation: int = 2):
"""Implementation of Lipinski filter.
Parameters
----------
mol : Chem.rdchem.Mol
An RDKit molecule.
maxviolation : int, optional
Maximum number of violations. Above this value the function return False, by default 2
Returns
-------
bool
True if the molecule present less than maxviolation violations; otherwise False.
"""
filter_prop = {
'NumHAcceptors': {'method': Lipinski.NumHAcceptors, 'cutoff': 10},
'NumHDonors': {'method': Lipinski.NumHDonors, 'cutoff': 5},
'wt': {'method': Descriptors.MolWt, 'cutoff': 500},
'MLogP': {'method': Descriptors.MolLogP, 'cutoff': 5},
'NumRotatableBonds': {'method': Lipinski.NumRotatableBonds, 'cutoff': 10},
'TPSA': {'method': Chem.MolSurf.TPSA, 'cutoff': 140},
}
cont = 0
for prop in filter_prop:
if filter_prop[prop]['method'](mol) > filter_prop[prop]['cutoff']:
cont += 1
if cont >= maxviolation:
return False
return True
[docs]
def lipinski_profile(mol: Chem.rdchem.Mol):
"""See: https://www.rdkit.org/docs/source/rdkit.Chem.Lipinski.html?highlight=lipinski#module-rdkit.Chem.Lipinski
Parameters
----------
mol : Chem.rdchem.Mol
An RDKit molecule.
Returns
-------
dict
A dictionary with molecular properties.
"""
properties = {
'NumHAcceptors': {'method': Lipinski.NumHAcceptors, 'cutoff': 10},
'NumHDonors': {'method': Lipinski.NumHDonors, 'cutoff': 5},
'wt': {'method': Descriptors.MolWt, 'cutoff': 500},
'MLogP': {'method': Descriptors.MolLogP, 'cutoff': 5},
'NumRotatableBonds': {'method': Lipinski.NumRotatableBonds, 'cutoff': 10},
'TPSA': {'method': Chem.MolSurf.TPSA, 'cutoff': range(0, 140)},
'FractionCSP3': {'method': Lipinski.FractionCSP3, 'cutoff': None},
'HeavyAtomCount': {'method': Lipinski.HeavyAtomCount, 'cutoff': None},
'NHOHCount': {'method': Lipinski.NHOHCount, 'cutoff': None},
'NOCount': {'method': Lipinski.NOCount, 'cutoff': None},
'NumAliphaticCarbocycles': {'method': Lipinski.NumAliphaticCarbocycles, 'cutoff': None},
'NumAliphaticHeterocycles': {'method': Lipinski.NumAliphaticHeterocycles, 'cutoff': None},
'NumAliphaticRings': {'method': Lipinski.NumAliphaticRings, 'cutoff': None},
'NumAromaticCarbocycles': {'method': Lipinski.NumAromaticCarbocycles, 'cutoff': None},
'NumAromaticHeterocycles': {'method': Lipinski.NumAromaticHeterocycles, 'cutoff': None},
'NumAromaticRings': {'method': Lipinski.NumAromaticRings, 'cutoff': None},
'NumHeteroatoms': {'method': Lipinski.NumHeteroatoms, 'cutoff': None},
'NumSaturatedCarbocycles': {'method': Lipinski.NumSaturatedCarbocycles, 'cutoff': None},
'NumSaturatedHeterocycles': {'method': Lipinski.NumSaturatedHeterocycles, 'cutoff': None},
'NumSaturatedRings': {'method': Lipinski.NumSaturatedRings, 'cutoff': None},
'RingCount': {'method': Lipinski.RingCount, 'cutoff': None},
}
profile = {}
for prop in properties:
profile[prop] = properties[prop]['method'](mol)
# print(f"{prop}: {properties[prop]['method'](mol)}. cutoff: {properties[prop]['cutoff']}")
return profile
[docs]
def LargerTheBest(Value: float, LowerLimit: float, Target: float, r: float = 1) -> float:
"""Desirability function used when larger values are the targets. If Value is higher
or equal than the target it will return 1; if it is lower than LowerLimit it will return 0;
else a number between 0 and 1.
You can also check:
doi:10.1016/j.chemolab.2011.04.004
https://www.youtube.com/watch?v=quz4NW0uIYw&list=PL6ebkIZFT4xXiVdpOeKR4o_sKLSY0aQf_&index=3
Parameters
----------
Value : float
Value to test.
LowerLimit : float
Lower value accepted. Lower than this one will return 0.
Target : float
The target value. On this value (or higher) the function takes 1 as value.
r : float, optional
This is the exponent of the interpolation. Could be used to control the interpolation, by default 1
Returns
-------
float
A number between 0 and 1. Been 1 the desireable value to get.
"""
if Value < LowerLimit:
return 0.0
elif Value <= Target:
return ((Value - LowerLimit) / (Target - LowerLimit))**r
else:
return 1.0
[docs]
def SmallerTheBest(Value: float, Target: float, UpperLimit: float, r: float = 1) -> float:
"""Desirability function used when lower values are the targets. If Value is lower or
equal than the target it will return 1; if it is higher than UpperLimit it will return 0;
else a number between 0 and 1.
Parameters
----------
Value : float
Value to test.
Target : float
The target value. On this value (or lower) the function takes 1 as value.
UpperLimit : float
Upper value accepted. Higher than this one will return 0.
r : float, optional
This is the exponent of the interpolation. Could be used to control the interpolation, by default 1
Returns
-------
float
A number between 0 and 1. Been 1 the desireable value to get.
"""
if Value < Target:
return 1.0
elif Value <= UpperLimit:
return ((UpperLimit - Value) / (UpperLimit - Target))**r
else:
return 0.0
[docs]
def NominalTheBest(Value: float, LowerLimit: float, Target: float,
UpperLimit: float, r1: float = 1, r2: float = 1) -> float:
"""Desirability function used when a target value is desired. If Value is
lower or equal than the LowerLimit it will return 0; as well values higher or equal than
UpperLimit; else a number between 0 and 1.
Parameters
----------
Value : float
Value to test.
LowerLimit : float
Lower value accepted. Lower than this one will return 0.
Target : float
The target value. On this value the function takes 1 as value.
UpperLimit : float
Upper value accepted. Higher than this one will return 0.
r1 : float, optional
This is the exponent of the interpolation from LowerLimit to Target.
Could be used to control the interpolation, by default 1
r2 : float, optional
This is the exponent of the interpolation from Target to UpperLimit.
Could be used to control the interpolation, by default 1
Returns
-------
float
A number between 0 and 1. Been 1 the desireable value to get.
"""
if Value < LowerLimit:
return 0.0
elif Value <= Target:
return ((Value - LowerLimit) / (Target - LowerLimit))**r1
elif Value <= UpperLimit:
return ((UpperLimit - Value) / (UpperLimit - Target))**r2
else:
return 0.0
[docs]
def DerringerSuichDesirability():
"""A warper around the implemented desirability functions
Returns
-------
dict
A dict with key name of the desirability and value the corresponded function
"""
my_dict = {
'LargerTheBest': LargerTheBest,
'SmallerTheBest': SmallerTheBest,
'NominalTheBest': NominalTheBest
}
return my_dict
# Saving data
[docs]
def full_pickle(title: str, data: object):
"""Normal pickle.
Parameters
----------
title : str
Name of the file without extension, .pkl will be added by default.
data : object
Any serializable python object
"""
with open(f'{title}.pkl', 'wb') as pkl:
pickle.dump(data, pkl)
[docs]
def loosen(file: str):
"""Unpickle a pickled object.
Parameters
----------
file : str
The path to the file who store the pickle object.
Returns
-------
object
The python object.
"""
with open(file, 'rb') as pkl:
data = pickle.load(pkl)
return data
[docs]
def compressed_pickle(title: str, data: object):
"""Compress Python object. First cPickle it and then bz2.BZ2File compressed it.
Parameters
----------
title : str
Name of the file without extensions, .pbz2 will be added by default
data : object
Any serializable python object
"""
with bz2.BZ2File(f'{title}.pbz2', 'w') as f:
pickle.dump(data, f)
[docs]
def decompress_pickle(file: str):
"""Decompress CPickle objects compressed first with bz2 formats
Parameters
----------
file : str
This is the cPickle files compressed with bz2.BZ2File. (as a convention with extension .pbz2, but not needed)
Returns
-------
object
The python object.
"""
data = bz2.BZ2File(file, 'rb')
data = pickle.load(data)
return data
[docs]
def is_iter(obj):
"""Check if obj is iterable
Parameters
----------
obj : Any
Any python object
Returns
-------
bool
Tru if obj iterable, False if not
"""
try:
for _ in obj:
return True
except TypeError:
return False
[docs]
def import_sascorer():
"""Function to import sascorer from RDConfig.RDContribDir of RDKit
Returns
-------
module
The sascorer module ready to use.
"""
# In order to import sascorer from RDConfig.RDContribDir
import importlib.util as importlib_util
from rdkit.Chem import RDConfig
spec = importlib_util.spec_from_file_location(
'sascorer', os.path.join(RDConfig.RDContribDir, 'SA_Score', 'sascorer.py'))
sascorer = importlib_util.module_from_spec(spec)
spec.loader.exec_module(sascorer)
return sascorer
[docs]
def deep_update(target_dict: dict, update_dict: dict) -> dict:
"""Recursively update a dictionary with the key-value pairs from another dictionary.
Inpired on https://stackoverflow.com/questions/3232943/update-value-of-a-nested-dictionary-of-varying-depth
Parameters
----------
target_dict : dict
The dictionary to be updated
update_dict : dict
The dictionary providing the updates
Example
-------
.. ipython:: python
from moldrug.utils import deep_update
target = {'a': 1, 'b': {'c': 2, 'd': 3}}
updates = {'b': {'c': 4, 'e': 5}, 'f': 6}
result = deep_update(target, updates)
print(result)
# Output: {'a': 1, 'b': {'c': 4, 'd': 3, 'e': 5}, 'f': 6}
Returns
-------
dict
The updated dictionary
"""
for key, value in update_dict.items():
if isinstance(value, collections.abc.Mapping):
# Recursive update for nested dictionaries
target_dict[key] = deep_update(target_dict.get(key, {}), value)
else:
# Update the value if it's not a dictionary
target_dict[key] = value
return target_dict
def softmax(x, axis=None):
x_max = np.amax(x, axis=axis, keepdims=True)
exp_x_shifted = np.exp(x - x_max)
return exp_x_shifted / np.sum(exp_x_shifted, axis=axis, keepdims=True)
###########################################################################
# Classes to work with Vina
###########################################################################
[docs]
class Atom:
"""This is a simple class to wrap a pdbqt Atom.
It is based on https://userguide.mdanalysis.org/stable/formats/reference/pdbqt.html#writing-out.
"""
[docs]
def __init__(self, line):
self.lineType = "ATOM"
self.serial = int(line[6:11])
self.name = line[12:16].strip()
self.altLoc = line[16]
self.resName = line[17:21].strip()
self.chainID = line[21]
self.resSeq = int(line[22:26])
self.iCode = line[26]
self.x = float(line[30:38])
self.y = float(line[38:46])
self.z = float(line[46:54])
self.occupancy = line[54:60].strip()
self.tempFactor = line[60:66].strip()
self.partialChrg = line[66:76].strip()
self.atomType = line[78:80].strip()
def __getitem__(self, key):
return self.__dict__[key]
[docs]
class CHUNK_VINA_OUT:
"""This class will be used by VINA_OUT in order to read the pdbqt ouput of a vina docking results.
"""
[docs]
def __init__(self, chunk):
self.chunk = chunk
self.atoms = []
self.run = None
self.freeEnergy = None
self.RMSD1 = None
self.RMSD2 = None
self.parse()
def parse(self):
for line in self.chunk:
if line.startswith("MODEL"):
self.run = int(line[5:])
elif line.startswith("REMARK VINA RESULT:"):
(self.freeEnergy, self.RMSD1, self.RMSD2) = [float(number) for number in line.split(":")[-1].split()]
elif line.startswith("ATOM"):
self.atoms.append(Atom(line))
[docs]
def get_atoms(self):
"""Return a list of all atoms.
If to_dict is True, each atom is represented as a dictionary.
Otherwise, a list of Atom objects is returned."""
return [x.__dict__ for x in self.atoms]
def write(self, name=None):
if name:
with open(name, "w") as f:
f.writelines(self.chunk)
else:
with open(f"Run_{self.run}.pdbqt", "w") as f:
f.writelines(self.chunk)
[docs]
class VINA_OUT:
"""
Vina class to handle vina output. Think about use meeko in the future!
"""
[docs]
def __init__(self, file):
self.file = file
self.chunks = []
self.parse()
def parse(self):
with open(self.file, "r") as input_file:
lines = input_file.readlines()
i = 0
while i < len(lines):
if lines[i].startswith("MODEL"):
j = i
tmp_chunk = []
while (not lines[j].startswith("ENDMDL")) and (j < len(lines)):
tmp_chunk.append(lines[j])
j += 1
i += 1
tmp_chunk.append("ENDMDL\n")
self.chunks.append(CHUNK_VINA_OUT(tmp_chunk))
i += 1
def BestEnergy(self, write=False):
min_chunk = min(self.chunks, key=lambda x: x.freeEnergy)
if write:
min_chunk.write("best_energy.pdbqt")
return min_chunk
#################################
# Classes to work with moldrug
#################################
[docs]
class Individual:
"""
Base class to work with GA, Local and all the fitness functions.
Individual is a mutable object. Only the attribute smiles it is not mutable and is used
for hash. Therefore this class is hashable based on the smiles attribute.
This one is also used for '==' comparison
If two Individuals has the same smiles not matter if the rest of the elements are different,
they will be considered the same.
The cost attribute is used for arithmetic operations.
It also admit copy and deepcopy operations.
Known issue, in case that we would like to use a numpy array of individuals.
It is needed to change the dtype of the generated arrays
Attributes
----------
mol: Chem.rdchem.Mol
The molecule object
idx: Union[int, str]
The identifier
pdbqt: str
A pdbqt string representation of the molecule, used for docking with Vina. It is generated
during the initialization of the class
smiles: str (property)
The SMILES representation of the mol attribute without explicit hydrogens,
this attribute (property) is immutable.
cost: float
This attribute is used to interact with the fitness functions of :mod:`moldrug.fitness`
Example
-------
.. ipython:: python
from moldrug import utils, fitness
import numpy as np
from copy import copy, deepcopy
from rdkit import Chem
i1 = utils.Individual(mol = Chem.MolFromSmiles('CC'), idx = 1, cost = 5)
i2 = utils.Individual(mol = Chem.MolFromSmiles('CC'), idx = 2, cost = 4)
i3 = utils.Individual(mol = Chem.MolFromSmiles('CCC'), idx = 3, cost = 4)
# Show the '==' operation
print(i1 == i2, i1 == i3)
# Show that Individual is a hashable object based on the smiles
print(set([i1,i2,i3]))
# Show arithmetic operations
print(i1+i2)
# How to work with numpy
array = np.array([i1,i2, i3])
array_2 = (array*2).astype('float64')
print(array_2)
# Show copy
print(copy(i3), deepcopy(i3))
"""
[docs]
def __init__(self, mol: Chem.rdchem.Mol, idx: Union[int, str] = 0, pdbqt: str = None,
cost: float = np.inf, randomseed: Union[int, None] = None) -> None:
"""This is the constructor of the class.
Parameters
----------
mol : Chem.rdchem.Mol, optional
A valid RDKit molecule.
idx :Union[int str], optional
An identification, by default 0
pdbqt : str, optional
A valid pdbqt string. If it is not provided it will be generated from mol through utils.confgen
and the mol attribute will be update with the 3D model, by default None
cost : float, optional
This attribute is used to perform operations between Individuals and
should be used for the cost functions, by default np.inf
randomseed : Union[None, int], optional
Provide a seed for the random number generator so that the "same" coordinates can be obtained
for the attribute pdbqt on multiple runs. If None, the RNG will not be seeded, by default None
"""
self.mol = mol
if not pdbqt:
try:
self.pdbqt = confgen(self.mol, randomseed=randomseed)
except Exception:
self.pdbqt = None
else:
self.pdbqt = pdbqt
self.cost = cost
self.idx = idx
@property
def smiles(self):
return Chem.MolToSmiles(Chem.RemoveHs(self.mol))
def __repr__(self):
return f"{self.__class__.__name__}(idx = {self.idx}, smiles = {self.smiles}, cost = {self.cost})"
def __hash__(self):
return hash(self.smiles)
def __eq__(self, other: object) -> bool:
if self.__class__ is other.__class__:
return self.smiles == other.smiles # self.cost == other.cost and
else:
return False # self.smiles == other
def __gt__(self, other: object) -> bool:
return self.cost > other.cost
def __ge__(self, other: object) -> bool:
return self.cost >= other.cost
def __lt__(self, other: object) -> bool:
return self.cost < other.cost
def __le__(self, other: object) -> bool:
return self.cost <= other.cost
def __neg__(self) -> float:
return -self.cost
def __abs__(self) -> float:
return abs(self.cost)
def __add__(self, other: object):
return self.cost + other
def __radd__(self, other: object):
return other + self.cost
def __sub__(self, other: object):
return self.cost - other
def __rsub__(self, other: object):
return other - self.cost
def __mul__(self, other: object):
return self.cost * other
def __rmul__(self, other: object):
return other * self.cost
def __truediv__(self, other: object):
return self.cost / other
def __rtruediv__(self, other: object):
return other / self.cost
def __floordiv__(self, other: object):
return self.cost // other
def __rfloordiv__(self, other: object):
return other // self.cost
def __mod__(self, other: object):
return self.cost % other
def __rmod__(self, other: object):
return other % self.cost
def __divmod__(self, other: object):
return (self.cost // other, self.cost % other)
def __rdivmod__(self, other: object):
return (other // self.cost, other % self.cost)
def __pow__(self, other: object):
return self.cost ** other
def __rpow__(self, other: object):
return other ** self.cost
def exp(self):
return np.exp(self.cost)
def __copy__(self):
cls = self.__class__
result = cls.__new__(cls)
result.__dict__.update(self.__dict__)
return result
def __deepcopy__(self, memo):
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
for k, v in self.__dict__.items():
setattr(result, k, deepcopy(v, memo))
return result
[docs]
def make_sdf(individuals: List[Individual], sdf_name: str = 'out'):
"""This function create a sdf file from a list of Individuals based on their pdbqt attribute
This assume that the cost function update the pdbqt attribute after the docking with the conformations obtained
In the case of multiple receptor the attribute should be a list of valid pdbqt strings.
Here will export several sdf depending how many pdbqt string are in the pdbqt attribute.
Parameters
----------
individuals : list[Individual]
A list of individuals
sdf_name : str, optional
The name for the output file. Could be a ``path + sdf_name``.
The sdf extension will be added by the function, by default 'out'
Example
-------
.. ipython:: python
import tempfile, os
from moldrug import utils
from rdkit import Chem
# Create some temporal dir
tmp_path = tempfile.TemporaryDirectory()
# Creating two individuals
I1 = utils.Individual(Chem.MolFromSmiles('CCCCl'))
I2 = utils.Individual(Chem.MolFromSmiles('CCOCCCF'))
# Creating the pdbqt attribute as a list with the pdbqt attribute (this is just a silly example)
I1.pdbqt = [I1.pdbqt, I1.pdbqt]
I2.pdbqt = [I2.pdbqt, I2.pdbqt]
utils.make_sdf([I1, I2], sdf_name = os.path.join(tmp_path.name, 'out'))
# Two files were created
# In the other hand, if the attribute pdbqt is not a list, only one file is going to be created
# Set pdbqt to the original value
I1.pdbqt = I1.pdbqt[0]
I2.pdbqt = I2.pdbqt[0]
utils.make_sdf([I1, I2], sdf_name = os.path.join(tmp_path.name, 'out'))
# Only one file will be created if the pdbqt has not len in some of
# the individuals or they presents different lens as well.
# In this case the pdbqts will be completely ignored and pdbqt attribute
# will be used for the construction of the sdf file
I1.pdbqt = [I1.pdbqt, I1.pdbqt, I1.pdbqt]
I2.pdbqt = [I2.pdbqt, I2.pdbqt]
utils.make_sdf([I1, I2], sdf_name = os.path.join(tmp_path.name, 'out'))
"""
pdbqt_tmp = tempfile.NamedTemporaryFile(suffix='.pdbqt')
# Check for the attribute pdbqt in all passed individuals and that all of them have the same number of pdbqt
check = True
NumbOfpdbqt = set()
for individual in individuals:
if isinstance(individual.pdbqt, List):
NumbOfpdbqt.add(len(individual.pdbqt))
else:
check = False
break
if len(NumbOfpdbqt) > 1:
check = False
if check is True:
for i in range(list(NumbOfpdbqt)[0]):
with Chem.SDWriter(f"{sdf_name}_{i+1}.sdf") as w:
for individual in individuals:
with open(pdbqt_tmp.name, 'w') as f:
f.write(individual.pdbqt[i])
try:
pdbqt_mol = PDBQTMolecule.from_file(pdbqt_tmp.name, skip_typing=True)
mol = RDKitMolCreate.from_pdbqt_mol(pdbqt_mol)[0]
mol.SetProp("_Name",
f"idx :: {individual.idx}, smiles :: {individual.smiles}, "
f"cost :: {individual.cost}")
w.write(mol)
except Exception:
# Should be that the pdbqt is not valid
print(f"{individual} does not have a valid pdbqt: {individual.pdbqt}.")
print(f" File {sdf_name}_{i+1}.sdf was created!")
else:
with Chem.SDWriter(f"{sdf_name}.sdf") as w:
for individual in individuals:
with open(pdbqt_tmp.name, 'w') as f:
if len(NumbOfpdbqt) == 0:
f.write(individual.pdbqt)
else:
f.write(individual.pdbqt[0])
try:
pdbqt_mol = PDBQTMolecule.from_file(pdbqt_tmp.name, skip_typing=True)
mol = RDKitMolCreate.from_pdbqt_mol(pdbqt_mol)[0]
mol.SetProp("_Name",
f"idx :: {individual.idx}, smiles :: {individual.smiles}, "
f"cost :: {individual.cost}")
w.write(mol)
except Exception:
# Should be that the pdbqt is not valid
print(f"{individual} does not have a valid pdbqt: {individual.pdbqt}.")
print(f"File {sdf_name}.sdf was createad!")
def _make_kwargs_copy(costfunc, costfunc_kwargs,):
"""Make a copy of the self.costfunc_kwargs.
It creates a temporal directory.
Returns
-------
dict
A copy of self.costfunc_kwargs with wd changed if needed
"""
kwargs_copy = costfunc_kwargs.copy()
costfunc_jobs_tmp_dir = tempfile.TemporaryDirectory(prefix='.costfunc_moldrug_', dir='.')
if 'wd' in signature(costfunc).parameters:
kwargs_copy['wd'] = costfunc_jobs_tmp_dir.name
return kwargs_copy, costfunc_jobs_tmp_dir
[docs]
def tar_errors(error_path: str = 'error'):
"""Clean errors in the working directory.
Convert to error.tar.gz the error_path
and delete the directory.
Parameters
----------
error_path : str
Where the errors are storged.
"""
if os.path.isdir(error_path):
if os.listdir(error_path):
shutil.make_archive('error', 'gztar', error_path)
print(f"\n{50*'=+'}")
print("Note: Check the running warnings and erorrs in error.tar.gz file!")
print(f"{50*'=+'}\n")
shutil.rmtree(error_path)
######################
# Selection functions
######################
[docs]
def roulette_wheel_selection(p: List[float]):
"""Function to select the offsprings based on their fitness.
Parameters
----------
p : list[float]
Probabilities
Returns
-------
int
The selected index
"""
c = np.cumsum(p)
r = sum(p) * random.random() # noqa
ind = np.argwhere(r <= c)
return ind[0][0]
[docs]
def to_dataframe(individuals: List[Individual], return_mol: bool = False) -> pd.DataFrame:
"""Convert a list of individuals to a DataFrame
Parameters
----------
individuals : List[Individual]
The list of individuals
return_mol : bool, optional
If True the attribute mol will bot be return, by default False
Returns
-------
pd.DataFrame
The DataFrame
"""
list_of_dictionaries = []
for individual in individuals:
dictionary = individual.__dict__.copy()
if not return_mol:
del dictionary['mol']
list_of_dictionaries.append(dictionary)
return pd.DataFrame(list_of_dictionaries)
[docs]
class Local:
"""This class is used to genereate close solutions to the seed molecule.
It use :meth:`crem.crem.grow_mol`.
Attributes
----------
randomseed : Union[None, int]
The random seed to use with random module.
__moldrug_version__ : str
The molDrug version.
costfunc : object
The cost function set by the user.
crem_db_path : str
Path to the CReM data base.
costfunc_kwargs : dict
The keyword arguments of the costfunc.
grow_crem_kwargs : dict
The keyword arguments to pass to :meth:`crem.crem.grow_mol`.
AddHs : bool
In case explicit hydrogens should be added.
pop : list[:meth:`moldrug.utils.Individuals`]
The final population sorted by cost.
"""
[docs]
def __init__(self, seed_mol: Chem.rdchem.Mol, crem_db_path: str, costfunc: object, grow_crem_kwargs: Dict = None,
costfunc_kwargs: Dict = None, AddHs: bool = False, randomseed: Union[None, int] = None,
deffnm: str = 'local') -> None:
"""Creator
Parameters
----------
seed_mol : Chem.rdchem.Mol
The seed molecule from which the population will be generated.
crem_db_path : str
The pathway to the CReM data base.
costfunc : object
The cost function to work with (any from :mod:`moldrug.fitness` or a valid user defined).
grow_crem_kwargs : Dict, optional
The keywords of the grow_mol function of CReM, by default None
costfunc_kwargs : Dict, optional
The keyword arguments of the selected cost function, by default None
AddHs : bool, optional
If True the explicit hyrgones will be added, by default False
randomseed : Union[None, int], optional
Set a random seed for reproducibility, by default None
deffnm : str
Just a place holder for compatibility with the CLI.
Raises
------
Exception
In case that some problem occured during the creation of the Individula from the seed_mol
ValueError
In case of incorrect definition of grow_crem_kwargs and/or costfunc_kwargs.
They must be None or a dict instance.
"""
self.randomseed = randomseed
if self.randomseed is not None:
random.seed(randomseed)
self.__moldrug_version = __version__
if grow_crem_kwargs is None:
grow_crem_kwargs = dict()
elif not isinstance(grow_crem_kwargs, dict):
raise ValueError(f'grow_crem_kwargs must be None or a dict instance. {grow_crem_kwargs} was provided')
if costfunc_kwargs is None:
costfunc_kwargs = dict()
elif not isinstance(costfunc_kwargs, dict):
raise ValueError(f'grow_crem_kwargs must be None or a dict instance. {costfunc_kwargs} was provided')
if AddHs:
self._seed_mol = Chem.AddHs(seed_mol)
else:
self._seed_mol = seed_mol
self.InitIndividual = Individual(self._seed_mol, idx=0, randomseed=self.randomseed)
if not self.InitIndividual.pdbqt:
raise Exception("For some reason, it was not possible to create for the class Individula "
"a pdbqt from the seed_smiles. Consider to check the validity of the SMILES string!")
if os.path.exists(crem_db_path):
self.crem_db_path = os.path.abspath(crem_db_path)
else:
raise FileNotFoundError(f"{crem_db_path = } does not exists or is not accesible")
self.grow_crem_kwargs = grow_crem_kwargs
self.costfunc = costfunc
self.costfunc_kwargs = costfunc_kwargs
self.pop = [self.InitIndividual]
[docs]
def __call__(self, njobs: int = 1, pick: int = None):
"""Call deffinition
Parameters
----------
njobs : int, optional
The number of jobs for parallelization, the module multiprocessing will be used, by default 1
pick : int, optional
How many molecules take from the generated throgh the grow_mol CReM operation,
by default None which means all generated.
"""
# Check version of moldrug
if self.__moldrug_version != __version__:
warn(f"{self.__class__.__name__} was initilized with moldrug-{self.__moldrug_version} "
f"but was called with moldrug-{__version__}")
self.grow_crem_kwargs.update({'return_mol': True})
new_mols = list(grow_mol(self._seed_mol, self.crem_db_path, **self.grow_crem_kwargs))
if pick:
random.shuffle(new_mols)
new_mols = new_mols[:pick]
new_mols = [item[1] for item in new_mols]
idx0 = len(self.pop)
for i, mol in enumerate(new_mols):
individual = Individual(mol, idx=idx0 + i, randomseed=self.randomseed)
if individual.pdbqt:
self.pop.append(individual)
# Calculating cost of each individual
# Creating the arguments
args_list = []
# Make a copy of the self.costfunc_kwargs
kwargs_copy, costfunc_jobs_tmp_dir = _make_kwargs_copy(self.costfunc, self.costfunc_kwargs)
for individual in self.pop:
args_list.append((individual, kwargs_copy))
print('Calculating cost function...')
pool = mp.Pool(njobs)
self.pop = [individual for individual in tqdm.tqdm(pool.imap(self.__costfunc__, args_list), total=len(args_list))]
pool.close()
# Clean directory
costfunc_jobs_tmp_dir.cleanup()
# Tar errors
tar_errors('error')
# Printing how long was the simulation
print(f"Finished at {datetime.datetime.now().strftime('%c')}.\n")
def __costfunc__(self, args_list):
Individual, kwargs = args_list
# This is just to use the progress bar on pool.imap
return self.costfunc(Individual, **kwargs)
[docs]
def pickle(self, title: str, compress: bool = False):
"""Method to pickle the whole Local class
Parameters
----------
title : str
Name of the object which will be compleated with the correposnding
extension depending if compress is set to True or False.
compress : bool, optional
Use compression, by default False. If True :meth:`moldrug.utils.compressed_pickle` will be used;
if not :meth:`moldrug.utils.full_pickle` will be used instead.
"""
cls = self.__class__
result = cls.__new__(cls)
result.__dict__.update(self.__dict__)
if compress:
compressed_pickle(title, result)
else:
full_pickle(title, result)
[docs]
def to_dataframe(self, return_mol: bool = False):
"""Create a DataFrame from self.pop.
Returns
-------
pandas.DataFrame
The DataFrame
"""
return to_dataframe(self.pop, return_mol=return_mol)
#######################
# Optimazer functions
#######################
[docs]
class GA:
"""An implementation of a genetic algorithm to search in the chemical space.
Attributes
----------
randomseed : Union[None, int]
The random seed to use with random module.
__moldrug_version__ : str
The molDrug version.
costfunc : object
The cost function set by the user.
crem_db_path : str
Path to the CReM data base.
maxiter : int
Maximum number of iteratinos to perform.
popsize : int
Population size.
beta : float
Selection pressure.
costfunc_kwargs : dict
The keyword arguments of the costfunc.
costfunc_ncores : int
The number of cores to use for costfunc.
nc : int
Number of childs of offsprints ``= round(pc * popsize)``
get_similar : bool
Bias the search upon similar molecules. If True :meth:`modrug.utils.get_similar_mols` is used after the
mutation with CReM instead random choice.
mutate_crem_kwargs : dict
The keyword arguments to pass to :meth:`crem.crem.mutate_mol`.
save_pop_every_gen : int
Frequency to save the pickle file o fthe population during the optimazation.
checkpoint : bool
Safe chekpoint file, this help to restart a simualation.
deffnm : str
Prefix for the genereated files.
NumCalls : int
How many times the ``__call__`` method has been called.
NumGens : int
he number of generations performed by the class. Subsequent ``__call__``
executions update this number acordennly.
SawIndividuals : set[:meth:`moldrug.utils.Individuals`]
All the Individulas saw during the optimizations.
acceptance : dict
A dictionary with key the Generation id and as value another dictionary
with keys ``accepeted`` and ``generated`` with the number of accepted and genereated
individuals on the generation respectively.
AddHs : bool
In case explicit hydrogens should be added for all genreated molecules.
_seed_mol : list[Chem.rdchem.Mol]
The list of seed molecules.
InitIndividual : :meth:`moldrug.utils.Individuals`
The initial individual based on _seed_mol.
pop : list[:meth:`moldrug.utils.Individuals`]
The final population sorted by cost.
best_cost : list[float]
The list of best cost for each generations.
avg_cost : list[float]
The list of average cost for each generations.
TODO:
* Timing the simulation, add tracking variable for the timing of the evaluation and genereation of moleucles. Print at the end of each call
* Extend to other genereators:
- mutate_crem_kwargs = None and some other keyword that get the generator function, in this case the mutate method will be overwrite
with the user provided, this fucntion will take an Individual and return a new offspring, to be more copatible and not create issues,
I good idea will be that this fucntion accept a self as first arguemnt, and internally, it will use the self of the GA class
"""
[docs]
def __init__(self, seed_mol: Union[Chem.rdchem.Mol, Iterable[Chem.rdchem.Mol]],
costfunc: Callable, costfunc_kwargs: Dict, crem_db_path: str, maxiter: int = 10, popsize: int = 20,
beta: float = 0.001, pc: float = 1, get_similar: bool = False, mutate_crem_kwargs: Union[None, Dict] = None,
save_pop_every_gen: int = 0, checkpoint: bool = False, deffnm: str = 'ga',
AddHs: bool = False, randomseed: Union[None, int] = None) -> None:
"""Constructor
Parameters
----------
seed_mol : Union[Chem.rdchem.Mol, Iterable[Chem.rdchem.Mol]]
The seed molecule submitted to genetic algorithm optimization on the chemical space. Could be only one RDKit
molecule or more than one specified in an Iterable object.
costfunc : Callable
The cost function to work with (any from :mod:`moldrug.fitness` or a valid user defined).
costfunc_kwargs : Dict
The keyword arguments of the selected cost function
crem_db_path : str
Path to the CReM data base.
maxiter : int, optional
Maximum number of iteration (or generation), by default 10.
popsize : int, optional
Population size, by default 20.
beta : float, optional
Selection pressure. Higher values means that the best individual
are going to be sumitted for mutations more frquently, by default 0.001.
pc : float, optional
Proportion of children, by default 1
get_similar : bool, optional
If True the searching will be bias to similar molecules, by default False
mutate_crem_kwargs : Union[None, Dict], optional
Parameters for mutate_mol of CReM, by default {}
save_pop_every_gen : int, optional
Frequency to save the population, by default 0
checkpoint : bool, optional
If True the whole class will be saved as cpt with the frequency of save_pop_every_gen.
This means that if save_pop_every_gen = 0 and checkpoint = True, no checkpoint will be
output, by default False
deffnm : str, optional
Default prefix name for all generated files, by default 'ga'
AddHs : bool, optional
If True the explicit hydrogens will be added, by default False
randomseed : Union[None, int], optional
Set a random seed for reproducibility, by default None
Raises
------
TypeError
In case that seed_mol is a wrong input.
ValueError
In case of incorrect definition of mutate_crem_kwargs. It must be None or a dict instance.
ValueError
In case of crem_db_path deos not exist.
"""
self.randomseed = randomseed
if self.randomseed is not None:
random.seed(randomseed)
self.__moldrug_version__ = __version__
if mutate_crem_kwargs is None:
mutate_crem_kwargs = dict()
elif not isinstance(mutate_crem_kwargs, dict):
raise ValueError(f'mutate_crem_kwargs must be None or a dict instance. {mutate_crem_kwargs} was provided')
self.costfunc = costfunc
if os.path.exists(crem_db_path):
self.crem_db_path = os.path.abspath(crem_db_path)
else:
raise FileNotFoundError(f"{crem_db_path = } does not exists or is not accesible")
self.maxiter = maxiter
self.popsize = popsize
self.beta = beta
self.costfunc_kwargs = costfunc_kwargs
if 'ncores' in costfunc_kwargs:
self.costfunc_ncores = self.costfunc_kwargs['ncores']
else:
self.costfunc_ncores = 1
self.nc = round(pc * popsize)
self.get_similar = get_similar
# Internally update with default values, TODO, maybe I should remove this
# and the users should handled CReM parameters by themself.
# I think that this will avoid confusion.
self.mutate_crem_kwargs = {
'radius': 3,
'min_size': 1,
'max_size': 8,
'min_inc': -5,
'max_inc': 3,
'ncores': 1,
}
self.mutate_crem_kwargs.update(mutate_crem_kwargs)
# Saving parameters
self.save_pop_every_gen = save_pop_every_gen
self.checkpoint = checkpoint
self.deffnm = deffnm
# Tracking parameters
self.NumCalls = 0
self.NumGens = 0
self.SawIndividuals = set()
self.acceptance = dict()
# work with the seed molecule or population
self.AddHs = AddHs
# Convert to list the seed_mol in case that it is not
if not is_iter(seed_mol) and isinstance(seed_mol, Chem.rdchem.Mol):
self._seed_mol = [seed_mol]
elif all([isinstance(mol, Chem.rdchem.Mol) for mol in seed_mol]):
self._seed_mol = seed_mol
else:
raise TypeError("seed_mol is not Chem.rdchem.Mol neither a Iterable[Chem.rdchem.Mol]")
if self.AddHs:
self._seed_mol = [Chem.AddHs(mol) for mol in self._seed_mol]
# if 'protected_ids' in self.mutate_crem_kwargs or 'replace_ids' in self.mutate_crem_kwargs:
# _ = [atom.SetIntProp('label_moldrug', atom.GetIdx()) for atom in seed_mol.GetAtoms()]
# Create the first Individual
self.InitIndividual = Individual(self._seed_mol[0], idx=0, randomseed=self.randomseed)
self.pop = []
[docs]
def __call__(self, njobs: int = 1):
"""Call definition
Parameters
----------
njobs : int, optional
The number of jobs for parallelization, the module multiprocessing will be used, by default 1,
Raises
------
RuntimeError
Error during the initialization of the population.
"""
ts = time.time()
# Counting the calls
self.NumCalls += 1
# Check version of moldrug
if self.__moldrug_version__ != __version__:
warn(f"{self.__class__.__name__} was initialized with moldrug-{self.__moldrug_version__} "
f"but was called with moldrug-{__version__}")
# Here we will update if needed some parameters for
# the crem operations that could change between different calls.
# We need to return the molecule, so we override the possible user definition respect to this keyword
self.mutate_crem_kwargs['return_mol'] = True
# Initialize Population
# In case that the populating exist there is not need to initialize.
if len(self.pop) == 0:
GenInitStructs = []
# in case that the input has the popsize memebers there is not need to generate new structures
if len(self._seed_mol) < self.popsize:
for mol in self._seed_mol:
tmp_GenInitStructs = list(mutate_mol(mol, self.crem_db_path, **self.mutate_crem_kwargs))
tmp_GenInitStructs = [mol for (_, mol) in tmp_GenInitStructs]
GenInitStructs += tmp_GenInitStructs
# Checking for possible scenarios
if len(GenInitStructs) == 0:
raise RuntimeError("Something really strange happened. The seed_mol did not "
"generate any new molecule during the initialization of the population. "
"Check the provided crem parameters!")
if len(GenInitStructs) < (self.popsize - len(self._seed_mol)):
print('The initial population has repeated elements')
# temporal solution
GenInitStructs += random.choices(GenInitStructs,
k=self.popsize - len(GenInitStructs) - len(self._seed_mol))
elif len(GenInitStructs) > (self.popsize - 1):
# Selected random sample from the generation
GenInitStructs = random.sample(GenInitStructs, k=self.popsize - len(self._seed_mol))
else:
# Everything is ok!
pass
# Adding the inputs to the initial population
for i, mol in enumerate(self._seed_mol):
individual = Individual(mol, idx=i, randomseed=self.randomseed)
if individual.pdbqt:
self.pop.append(individual)
# Completing the population with the generated structures
for i, mol in enumerate(GenInitStructs):
if self.AddHs:
individual = Individual(Chem.AddHs(mol), idx=i + len(self._seed_mol), randomseed=self.randomseed)
else:
individual = Individual(mol, idx=i + len(self._seed_mol), randomseed=self.randomseed)
if individual.pdbqt:
self.pop.append(individual)
# Make sure that the population do not have more than popsize members and it is without repeated elements.
# That could happens if seed_mol has more molecules than popsize
self.pop = sorted(set(self.pop), key=lambda x: x.idx)[:self.popsize]
# Calculating cost of each individual
# Creating the arguments
args_list = []
# Make a copy of the self.costfunc_kwargs
# Make a copy of the self.costfunc_kwargs
kwargs_copy, costfunc_jobs_tmp_dir = _make_kwargs_copy(self.costfunc, self.costfunc_kwargs)
for individual in self.pop:
args_list.append((individual, kwargs_copy))
print(f'\n\nCreating the first population with {len(self.pop)} members:')
try:
pool = mp.Pool(njobs)
self.pop = [individual for individual in tqdm.tqdm(pool.imap(self.__costfunc__, args_list), total=len(args_list))]
pool.close()
except Exception as e1:
warn("Parallelization did not work. Trying with serial...")
try:
self.pop = [self.__costfunc__(args) for args in tqdm.tqdm(args_list, total=len(args_list))]
except Exception as e2:
raise RuntimeError("Serial did not work either. Here are the ucurred exceptions:\n"
f"=========Parellel=========:\n {e1}\n"
f"==========Serial==========:\n {e2}")
# Clean directory
costfunc_jobs_tmp_dir.cleanup()
# Adding generation information
for individual in self.pop:
individual.genID = self.NumGens
individual.kept_gens = set([self.NumGens])
self.acceptance[self.NumGens] = {
'accepted': len(self.pop[:]),
'generated': len(self.pop[:])
}
# Get the same order population in case cost is the same. Sorted by idx and then by cost
if self.randomseed:
self.pop = sorted(self.pop, key=lambda x: x.idx)
self.pop = sorted(self.pop)
# Print some information of the initial population
print(f"Initial Population: Best Individual: {self.pop[0]}")
print(f"Accepted rate: {self.acceptance[self.NumGens]['accepted']} / {self.acceptance[self.NumGens]['generated']}\n")
# Updating the info of the first individual (parent)
# to print at the end how well performed the method (cost function)
# Because How the population was initialized and because we are using pool.imap (ordered).
# The parent is the first Individual of self.pop.
# We have to use deepcopy because Individual is a mutable object
# Because above set were used, we have to sorter based on idx
self.InitIndividual = deepcopy(
min(
sorted(self.pop, key=lambda x: x.idx)[:len(self._seed_mol)]
)
)
# Best Cost of Iterations
self.best_cost = []
self.avg_cost = []
# Saving tracking variables, the first population, outside the if to take into account second calls
# with different population provided by the user.
self.SawIndividuals.update(self.pop)
# Saving population in disk if it was required
if self.save_pop_every_gen:
compressed_pickle(f"{self.deffnm}_pop", (self.NumGens, sorted(self.pop)))
make_sdf(sorted(self.pop), sdf_name=f"{self.deffnm}_pop")
if self.checkpoint:
compressed_pickle('cpt', self)
# Main Loop
# Another control variable. In case that the __call__ method is used more than ones.
number_of_previous_generations = len(self.best_cost)
for it in range(self.maxiter):
# Saving Number of Generations
self.NumGens += 1
# Probabilities Selections
probs = softmax((-self.beta * np.array(self.pop)).astype('float64'))
if any(np.isnan(probs)):
probs = np.nan_to_num(probs)
# TODO: This cycle should run in this way only if no user generetor was provided
# with and if, else statment I could correct, and then the genereator functions is completlly up to the user,
# then I do not need to worry in how the selection is made,
# In this case self.nc will not have any validity unless the user use it with its evaluator
# the checking of SawIndivduals must be done after the user funcrion return the popc
# The other that I need to change is that if the if it is a new genereator the genereation of the initil population is different
# the other is that checking for redundancy may be complicated in the case, that molecules are, for example peptides,
# in this case other identifier like the aa sequnce should be ued intead. For that the user may need a different Individual instance
# a one more efficient, there are a lot of if here :`-)
popc = []
for _ in range(self.nc):
# Perform Roulette Wheel Selection
parent = self.pop[roulette_wheel_selection(probs)]
# Perform Mutation (this mutation is some kind of crossover but with CReM library)
children = self.mutate(parent)
# Save offspring population
# I will save only those offsprings that were not seen and that have a correct pdbqt file
if children not in self.SawIndividuals and children not in popc and children.pdbqt:
children.genID = self.NumGens
children.kept_gens = set()
popc.append(children)
if popc: # Only if there are new members
# Calculating cost of each offspring individual (Doing Docking)
# Creating the arguments
args_list = []
# Make a copy of the self.costfunc_kwargs
kwargs_copy, costfunc_jobs_tmp_dir = _make_kwargs_copy(self.costfunc, self.costfunc_kwargs)
NumbOfSawIndividuals = len(self.SawIndividuals)
for (i, individual) in enumerate(popc):
# Add idx label to each individual
individual.idx = i + NumbOfSawIndividuals
# The problem here is that we are not being general for other possible Cost functions.
args_list.append((individual, kwargs_copy))
print(f'Evaluating generation {self.NumGens} / {self.maxiter + number_of_previous_generations}:')
# Calculating cost fucntion in parallel
try:
pool = mp.Pool(njobs)
popc = [individual for individual in tqdm.tqdm(pool.imap(self.__costfunc__, args_list), total=len(args_list))]
pool.close()
except Exception as e1:
warn("Parallelization did not work. Trying with serial...")
try:
popc = [self.__costfunc__(args) for args in tqdm.tqdm(args_list, total=len(args_list))]
except Exception as e2:
raise RuntimeError("Serial did not work either. Here are the ucurred exceptions:\n"
f"=========Parellel=========:\n {e1}\n"
f"==========Serial==========:\n {e2}")
# Clean directory
costfunc_jobs_tmp_dir.cleanup()
# Merge, Sort and Select
self.pop += popc
if self.randomseed:
self.pop = sorted(self.pop, key=lambda x: x.idx)
self.pop = sorted(self.pop)
self.pop = self.pop[:self.popsize]
# Update the kept_gens attribute
self.acceptance[self.NumGens] = {
'accepted': 0,
'generated': len(popc)
}
for individual in self.pop:
if not individual.kept_gens:
self.acceptance[self.NumGens]['accepted'] += 1
individual.kept_gens.add(self.NumGens)
# Store Best Cost
self.best_cost.append(self.pop[0].cost)
# Store Average cost
self.avg_cost.append(np.mean(self.pop))
# Saving tracking variables
self.SawIndividuals.update(popc)
# Saving population in disk if it was required
if self.save_pop_every_gen:
# Save every save_pop_every_gen and always the last population
if self.NumGens % self.save_pop_every_gen == 0 or it + 1 == self.maxiter:
compressed_pickle(f"{self.deffnm}_pop", (self.NumGens, self.pop))
make_sdf(self.pop, sdf_name=f"{self.deffnm}_pop")
if self.checkpoint:
compressed_pickle('cpt', self)
# Show Iteration Information
print(f"Generation {self.NumGens}: Best Individual: {self.pop[0]}.")
print(f"Accepted rate: {self.acceptance[self.NumGens]['accepted']} / {self.acceptance[self.NumGens]['generated']}\n")
# Printing summary information
print(f"\n{50*'=+'}\n")
print(f"The simulation finished successfully after {self.NumGens} generations with"
f"a population of {self.popsize} individuals. "
f"A total number of {len(self.SawIndividuals)} Individuals were seen during the simulation.")
print(f"Initial Individual: {self.InitIndividual}")
print(f"Final Individual: {self.pop[0]}")
print(f"The cost function dropped in {self.InitIndividual - self.pop[0]} units.")
print(f"\n{50*'=+'}\n")
# Tar errors
tar_errors('error')
# Printing how long was the simulation
print(f"Total time ({self.maxiter} generations): {time.time() - ts:>5.2f} (s).\n"
f"Finished at {datetime.datetime.now().strftime('%c')}.\n")
def __costfunc__(self, args_list):
individual, kwargs = args_list
# This is just to use the progress bar on pool.imap
return self.costfunc(individual, **kwargs)
[docs]
def mutate(self, individual: Individual):
"""Genetic operators
Parameters
----------
individual : Individual
The individual to mutate.
Returns
-------
Individual
A new Individual.
"""
# Here is were I have to check if replace_ids or protected_ids where provided.
mutate_crem_kwargs_to_work_with = self.mutate_crem_kwargs.copy()
if 'replace_ids' in self.mutate_crem_kwargs and 'protected_ids' in self.mutate_crem_kwargs:
mutate_crem_kwargs_to_work_with['replace_ids'], mutate_crem_kwargs_to_work_with['protected_ids'] = update_reactant_zone(
self.InitIndividual.mol, individual.mol, parent_replace_ids=self.mutate_crem_kwargs['replace_ids'],
parent_protected_ids=self.mutate_crem_kwargs['protected_ids'])
elif 'replace_ids' in self.mutate_crem_kwargs:
mutate_crem_kwargs_to_work_with['replace_ids'], _ = update_reactant_zone(
self.InitIndividual.mol, individual.mol, parent_replace_ids=self.mutate_crem_kwargs['replace_ids'])
elif 'protected_ids' in self.mutate_crem_kwargs:
_, mutate_crem_kwargs_to_work_with['protected_ids'] = update_reactant_zone(
self.InitIndividual.mol, individual.mol,
parent_protected_ids=self.mutate_crem_kwargs['protected_ids'])
try:
mutants = list(mutate_mol(individual.mol, self.crem_db_path, **mutate_crem_kwargs_to_work_with))
# Bias the searching to similar molecules
if self.get_similar:
mol = get_similar_mols(mols=[mol for _, mol in mutants],
ref_mol=self.InitIndividual.mol, pick=1, beta=0.01)[0]
else:
_, mol = random.choice(mutants) # nosec
except Exception:
print(f'Note: The mutation on {individual} did not work, it will be returned the same individual')
mol = individual.mol
if self.AddHs:
mol = Chem.AddHs(mol)
return Individual(mol, randomseed=self.randomseed)
[docs]
def pickle(self, title: str, compress: bool = False):
"""Method to pickle the whole GA class
Parameters
----------
title : str
Name of the object which will be completed with the corresponding
extension depending if compress is set to True or False.
compress : bool, optional
Use compression, by default False. If True :meth:`moldrug.utils.compressed_pickle` will be used;
if not :meth:`moldrug.utils.full_pickle` will be used instead.
"""
cls = self.__class__
result = cls.__new__(cls)
result.__dict__.update(self.__dict__)
if compress:
compressed_pickle(title, result)
else:
full_pickle(title, result)
[docs]
def to_dataframe(self, return_mol: bool = False):
"""Create a DataFrame from self.SawIndividuals.
Returns
-------
pandas.DataFrame
The DataFrame
"""
return to_dataframe(self.SawIndividuals, return_mol=return_mol)
if __name__ == '__main__':
pass