Source code for blues.simulation

"""
Provides classes for setting up and running the BLUES simulation.

- `SystemFactory` : setup and modifying the OpenMM System prior to the simulation.
- `SimulationFactory` : generates the OpenMM Simulations from the System.
- `BLUESSimulation` : runs the NCMC+MD hybrid simulation.
- `MonteCarloSimulation` : runs a pure Monte Carlo simulation.

Authors: Samuel C. Gill
Contributors: Nathan M. Lim, Meghan Osato, David L. Mobley
"""

import logging
import math
import sys

import numpy as np
import parmed
from openmmtools import alchemy
from simtk import openmm, unit
from simtk.openmm import app

from blues import utils
from blues.integrators import AlchemicalExternalLangevinIntegrator

finfo = np.finfo(np.float32)
rtol = finfo.precision
logger = logging.getLogger(__name__)


[docs]class SystemFactory(object): """ SystemFactory contains methods to generate/modify the OpenMM System object required for generating the openmm.Simulation using a given parmed.Structure() Examples -------- Load Parmed Structure, select move type, initialize `MoveEngine`, and generate the openmm.Systems >>> structure = parmed.load_file('eqToluene.prmtop', xyz='eqToluene.inpcrd') >>> ligand = RandomLigandRotationMove(structure, 'LIG') >>> ligand_mover = MoveEngine(ligand) >>> systems = SystemFactory(structure, ligand.atom_indices, config['system']) The MD and alchemical Systems are generated and stored as an attribute >>> systems.md >>> systems.alch Freeze atoms in the alchemical system >>> systems.alch = SystemFactory.freeze_atoms(systems.alch, freeze_distance=5.0, freeze_center='LIG' freeze_solvent='HOH,NA,CL') Parameters ---------- structure : parmed.Structure A chemical structure composed of atoms, bonds, angles, torsions, and other topological features. atom_indices : list of int Atom indicies of the move or designated for which the nonbonded forces (both sterics and electrostatics components) have to be alchemically modified. config : dict, parameters for generating the `openmm.System` for the MD and NCMC simulation. For complete parameters, see docs for `generateSystem` and `generateAlchSystem` """ def __init__(self, structure, atom_indices, config=None): self.structure = structure self.atom_indices = atom_indices self._config = config #If parameters for generating the openmm.System is given, make them. if self._config: if 'alchemical' in self._config.keys(): self.alch_config = self._config.pop('alchemical') else: #Use function defaults if none is provided self.alch_config = {} self.md = SystemFactory.generateSystem(self.structure, **self._config) self.alch = SystemFactory.generateAlchSystem(self.md, self.atom_indices, **self.alch_config)
[docs] @staticmethod def amber_selection_to_atomidx(structure, selection): """ Converts AmberMask selection [amber-syntax]_ to list of atom indices. Parameters ---------- structure : parmed.Structure() Structure of the system, used for atom selection. selection : str AmberMask selection that gets converted to a list of atom indices. Returns ------- mask_idx : list of int List of atom indices. References ---------- .. [amber-syntax] J. Swails, ParmEd Documentation (2015). http://parmed.github.io/ParmEd/html/amber.html#amber-mask-syntax """ mask = parmed.amber.AmberMask(structure, str(selection)) mask_idx = [i for i in mask.Selected()] return mask_idx
[docs] @staticmethod def atomidx_to_atomlist(structure, mask_idx): """ Goes through the structure and matches the previously selected atom indices to the atom type. Parameters ---------- structure : parmed.Structure() Structure of the system, used for atom selection. mask_idx : list of int List of atom indices. Returns ------- atom_list : list of atoms The atoms that were previously selected in mask_idx. """ atom_list = [] for i, at in enumerate(structure.atoms): if i in mask_idx: atom_list.append(structure.atoms[i]) logger.debug('\nFreezing {}'.format(atom_list)) return atom_list
[docs] @classmethod def generateSystem(cls, structure, **kwargs): """ Construct an OpenMM System representing the topology described by the prmtop file. This function is just a wrapper for parmed Structure.createSystem(). Parameters ---------- structure : parmed.Structure() The parmed.Structure of the molecular system to be simulated nonbondedMethod : cutoff method This is the cutoff method. It can be either the NoCutoff, CutoffNonPeriodic, CutoffPeriodic, PME, or Ewald objects from the simtk.openmm.app namespace nonbondedCutoff : float or distance Quantity The nonbonded cutoff must be either a floating point number (interpreted as nanometers) or a Quantity with attached units. This is ignored if nonbondedMethod is NoCutoff. switchDistance : float or distance Quantity The distance at which the switching function is turned on for van der Waals interactions. This is ignored when no cutoff is used, and no switch is used if switchDistance is 0, negative, or greater than the cutoff constraints : None, app.HBonds, app.HAngles, or app.AllBonds Which type of constraints to add to the system (e.g., SHAKE). None means no bonds are constrained. HBonds means bonds with hydrogen are constrained rigidWater : bool=True If True, water is kept rigid regardless of the value of constraints. A value of False is ignored if constraints is not None. implicitSolvent : None, app.HCT, app.OBC1, app.OBC2, app.GBn, app.GBn2 The Generalized Born implicit solvent model to use. implicitSolventKappa : float or 1/distance Quantity = None This is the Debye kappa property related to modeling saltwater conditions in GB. It should have units of 1/distance (1/nanometers is assumed if no units present). A value of None means that kappa will be calculated from implicitSolventSaltConc (below) implicitSolventSaltConc : float or amount/volume Quantity=0 moles/liter If implicitSolventKappa is None, the kappa will be computed from the salt concentration. It should have units compatible with mol/L temperature : float or temperature Quantity = 298.15 kelvin This is only used to compute kappa from implicitSolventSaltConc soluteDielectric : float=1.0 The dielectric constant of the protein interior used in GB solventDielectric : float=78.5 The dielectric constant of the water used in GB useSASA : bool=False If True, use the ACE non-polar solvation model. Otherwise, use no SASA-based nonpolar solvation model. removeCMMotion : bool=True If True, the center-of-mass motion will be removed periodically during the simulation. If False, it will not. hydrogenMass : float or mass quantity = None If not None, hydrogen masses will be changed to this mass and the difference subtracted from the attached heavy atom (hydrogen mass repartitioning) ewaldErrorTolerance : float=0.0005 When using PME or Ewald, the Ewald parameters will be calculated from this value flexibleConstraints : bool=True If False, the energies and forces from the constrained degrees of freedom will NOT be computed. If True, they will (but those degrees of freedom will *still* be constrained). verbose : bool=False If True, the progress of this subroutine will be printed to stdout splitDihedrals : bool=False If True, the dihedrals will be split into two forces -- proper and impropers. This is primarily useful for debugging torsion parameter assignments. Returns ------- openmm.System System formatted according to the prmtop file. Notes ----- This function calls prune_empty_terms if any Topology lists have changed. """ return structure.createSystem(**kwargs)
[docs] @classmethod def generateAlchSystem(cls, system, atom_indices, softcore_alpha=0.5, softcore_a=1, softcore_b=1, softcore_c=6, softcore_beta=0.0, softcore_d=1, softcore_e=1, softcore_f=2, annihilate_electrostatics=True, annihilate_sterics=False, disable_alchemical_dispersion_correction=True, alchemical_pme_treatment='direct-space', suppress_warnings=True, **kwargs): """Returns the OpenMM System for alchemical perturbations. This function calls `openmmtools.alchemy.AbsoluteAlchemicalFactory` and `openmmtools.alchemy.AlchemicalRegion` to generate the System for the NCMC simulation. Parameters ---------- system : openmm.System The OpenMM System object corresponding to the reference system. atom_indices : list of int Atom indicies of the move or designated for which the nonbonded forces (both sterics and electrostatics components) have to be alchemically modified. annihilate_electrostatics : bool, optional If True, electrostatics should be annihilated, rather than decoupled (default is True). annihilate_sterics : bool, optional If True, sterics (Lennard-Jones or Halgren potential) will be annihilated, rather than decoupled (default is False). softcore_alpha : float, optional Alchemical softcore parameter for Lennard-Jones (default is 0.5). softcore_a, softcore_b, softcore_c : float, optional Parameters modifying softcore Lennard-Jones form. Introduced in Eq. 13 of Ref. [TTPham-JChemPhys135-2011]_ (default is 1). softcore_beta : float, optional Alchemical softcore parameter for electrostatics. Set this to zero to recover standard electrostatic scaling (default is 0.0). softcore_d, softcore_e, softcore_f : float, optional Parameters modifying softcore electrostatics form (default is 1). disable_alchemical_dispersion_correction : bool, optional, default=True If True, the long-range dispersion correction will not be included for the alchemical region to avoid the need to recompute the correction (a CPU operation that takes ~ 0.5 s) every time 'lambda_sterics' is changed. If using nonequilibrium protocols, it is recommended that this be set to True since this can lead to enormous (100x) slowdowns if the correction must be recomputed every time step. alchemical_pme_treatment : str, optional, default = 'direct-space' Controls how alchemical region electrostatics are treated when PME is used. Options are 'direct-space', 'coulomb', 'exact'. - 'direct-space' only models the direct space contribution - 'coulomb' includes switched Coulomb interaction - 'exact' includes also the reciprocal space contribution, but it's only possible to annihilate the charges and the softcore parameters controlling the electrostatics are deactivated. Also, with this method, modifying the global variable `lambda_electrostatics` is not sufficient to control the charges. The recommended way to change them is through the `AlchemicalState` class. Returns ------- alch_system : alchemical_system System to be used for the NCMC simulation. References ---------- .. [TTPham-JChemPhys135-2011] T. T. Pham and M. R. Shirts, J. Chem. Phys 135, 034114 (2011). http://dx.doi.org/10.1063/1.3607597 """ if suppress_warnings: #Lower logger level to suppress excess warnings logging.getLogger("openmmtools.alchemy").setLevel(logging.ERROR) #Disabled correction term due to increased computational cost factory = alchemy.AbsoluteAlchemicalFactory( disable_alchemical_dispersion_correction=disable_alchemical_dispersion_correction, alchemical_pme_treatment=alchemical_pme_treatment) alch_region = alchemy.AlchemicalRegion( alchemical_atoms=atom_indices, softcore_alpha=softcore_alpha, softcore_a=softcore_a, softcore_b=softcore_b, softcore_c=softcore_c, softcore_beta=softcore_beta, softcore_d=softcore_d, softcore_e=softcore_e, softcore_f=softcore_f, annihilate_electrostatics=annihilate_electrostatics, annihilate_sterics=annihilate_sterics) alch_system = factory.create_alchemical_system(system, alch_region) return alch_system
[docs] @classmethod def restrain_positions(cls, structure, system, selection="(@CA,C,N)", weight=5.0, **kwargs): """ Applies positional restraints to atoms in the openmm.System by the given parmed selection [amber-syntax]_. Parameters ---------- system : openmm.System The OpenMM System object to be modified. structure : parmed.Structure() Structure of the system, used for atom selection. selection : str, Default = "(@CA,C,N)" AmberMask selection to apply positional restraints to weight : float, Default = 5.0 Restraint weight for xyz atom restraints in kcal/(mol A^2) Returns ------- system : openmm.System Modified with positional restraints applied. """ mask_idx = cls.amber_selection_to_atomidx(structure, selection) logger.info("{} positional restraints applied to selection: '{}' ({} atoms) on {}".format( weight, selection, len(mask_idx), system)) # define the custom force to restrain atoms to their starting positions force = openmm.CustomExternalForce('k_restr*periodicdistance(x, y, z, x0, y0, z0)^2') # Add the restraint weight as a global parameter in kcal/mol/A^2 force.addGlobalParameter("k_restr", weight) #force.addGlobalParameter("k_restr", weight*unit.kilocalories_per_mole/unit.angstroms**2) # Define the target xyz coords for the restraint as per-atom (per-particle) parameters force.addPerParticleParameter("x0") force.addPerParticleParameter("y0") force.addPerParticleParameter("z0") for i, atom_crd in enumerate(structure.positions): if i in mask_idx: logger.debug(i, structure.atoms[i]) force.addParticle(i, atom_crd.value_in_unit(unit.nanometers)) system.addForce(force) return system
[docs] @classmethod def freeze_atoms(cls, structure, system, freeze_selection=":LIG", **kwargs): """ Zeroes the masses of atoms from the given parmed selection [amber-syntax]_. Massless atoms will be ignored by the integrator and will not change positions. Parameters ---------- system : openmm.System The OpenMM System object to be modified. structure : parmed.Structure() Structure of the system, used for atom selection. freeze_selection : str, Default = ":LIG" AmberMask selection for the center in which to select atoms for zeroing their masses. Defaults to freezing protein backbone atoms. Returns ------- system : openmm.System The modified system with the selected atoms """ mask_idx = cls.amber_selection_to_atomidx(structure, freeze_selection) logger.info("Freezing selection '{}' ({} atoms) on {}".format(freeze_selection, len(mask_idx), system)) cls.atomidx_to_atomlist(structure, mask_idx) system = utils.zero_masses(system, mask_idx) return system
[docs] @classmethod def freeze_radius(cls, structure, system, freeze_distance=5.0 * unit.angstrom, freeze_center=':LIG', freeze_solvent=':HOH,NA,CL', **kwargs): """ Zero the masses of atoms outside the given raidus of the `freeze_center` parmed selection [amber-syntax]_. Massless atoms will be ignored by the integrator and will not change positions.This is intended to freeze the solvent and protein atoms around the ligand binding site. Parameters ---------- system : openmm.System The OpenMM System object to be modified. structure : parmed.Structure() Structure of the system, used for atom selection. freeze_distance : float, Default = 5.0 Distance (angstroms) to select atoms for retaining their masses. Atoms outside the set distance will have their masses set to 0.0. freeze_center : str, Default = ":LIG" AmberMask selection for the center in which to select atoms for zeroing their masses. Default: LIG freeze_solvent : str, Default = ":HOH,NA,CL" AmberMask selection in which to select solvent atoms for zeroing their masses. Returns ------- system : openmm.System Modified system with masses outside the `freeze center` zeroed. """ N_atoms = system.getNumParticles() #Select the LIG and atoms within 5 angstroms, except for WAT or IONS (i.e. selects the binding site) if hasattr(freeze_distance, '_value'): freeze_distance = freeze_distance._value selection = "(%s<:%f)&!(%s)" % (freeze_center, freeze_distance, freeze_solvent) logger.info('Inverting parmed selection for freezing: %s' % selection) site_idx = cls.amber_selection_to_atomidx(structure, selection) #Invert that selection to freeze everything but the binding site. freeze_idx = set(range(N_atoms)) - set(site_idx) #Check if freeze selection has selected all atoms if len(freeze_idx) == N_atoms: err = 'All %i atoms appear to be selected for freezing. Check your atom selection.' % len(freeze_idx) logger.error(err) sys.exit(1) freeze_threshold = 0.98 if len(freeze_idx) / N_atoms == freeze_threshold: err = '%.0f%% of your system appears to be selected for freezing. Check your atom selection' % ( 100 * freeze_threshold) logger.error(err) sys.exit(1) #Ensure that the freeze selection is larger than the center selection of atoms center_idx = cls.amber_selection_to_atomidx(structure, freeze_center) if len(site_idx) <= len(center_idx): err = "%i unfrozen atoms is less than (or equal to) the number of atoms used as the selection center '%s' (%i atoms). Check your atom selection." % ( len(site_idx), freeze_center, len(center_idx)) logger.error(err) sys.exit(1) freeze_warning = 0.80 if len(freeze_idx) / N_atoms == freeze_warning: warn = '%.0f%% of your system appears to be selected for freezing. This may cause unexpected behaviors.' % ( 100 * freeze_warning) logger.warm(warn) sys.exit(1) #Ensure that the freeze selection is larger than the center selection point center_idx = cls.amber_selection_to_atomidx(structure, freeze_center) if len(site_idx) <= len(center_idx): err = "%i unfrozen atoms is less than (or equal to) the number of atoms from the selection center '%s' (%i atoms). Check your atom selection." % ( len(site_idx), freeze_center, len(center_idx)) logger.error(err) sys.exit(1) logger.info("Freezing {} atoms {} Angstroms from '{}' on {}".format( len(freeze_idx), freeze_distance, freeze_center, system)) cls.atomidx_to_atomlist(structure, freeze_idx) system = utils.zero_masses(system, freeze_idx) return system
[docs]class SimulationFactory(object): """SimulationFactory is used to generate the 3 required OpenMM Simulation objects (MD, NCMC, ALCH) required for the BLUES run. This class can take a list of reporters for the MD or NCMC simulation in the arguments `md_reporters` or `ncmc_reporters`. Parameters ---------- systems : blues.simulation.SystemFactory object The object containing the MD and alchemical openmm.Systems move_engine : blues.moves.MoveEngine object MoveEngine object which contains the list of moves performed in the NCMC simulation. config : dict Simulation parameters which include: nIter, nstepsNC, nstepsMD, nprop, propLambda, temperature, dt, propSteps, write_move md_reporters : (optional) list of Reporter objects for the MD openmm.Simulation ncmc_reporters : (optional) list of Reporter objects for the NCMC openmm.Simulation Examples -------- Load Parmed Structure from our input files, select the move type, initialize the MoveEngine, and generate the openmm systems. >>> structure = parmed.load_file('eqToluene.prmtop', xyz='eqToluene.inpcrd') >>> ligand = RandomLigandRotationMove(structure, 'LIG') >>> ligand_mover = MoveEngine(ligand) >>> systems = SystemFactory(structure, ligand.atom_indices, config['system']) Now, we can generate the Simulations from our openmm Systems using the SimulationFactory class. If a configuration is provided at on initialization, it will call `generateSimulationSet()` for convenience. Otherwise, the class can be instantiated like a normal python class. Below is an example of initializing the class like a normal python object. >>> simulations = SimulationFactory(systems, ligand_mover) >>> hasattr(simulations, 'md') False >>> hasattr(simulations, 'ncmc') False Below, we provide a dict for configuring the Simulations and then generate them by calling `simulations.generateSimulationSet()`. The MD/NCMC simulation objects can be accessed separately as class attributes. >>> sim_cfg = { 'platform': 'OpenCL', 'properties' : { 'OpenCLPrecision': 'single', 'OpenCLDeviceIndex' : 2}, 'nprop' : 1, 'propLambda' : 0.3, 'dt' : 0.001 * unit.picoseconds, 'friction' : 1 * 1/unit.picoseconds, 'temperature' : 100 * unit.kelvin, 'nIter': 1, 'nstepsMD': 10, 'nstepsNC': 10,} >>> simulations.generateSimulationSet(sim_cfg) >>> hasattr(simulations, 'md') True >>> hasattr(simulations, 'ncmc') True After generating the Simulations, attach your own reporters by providing the reporters in a list. Be sure to attach to either the MD or NCMC simulation. >>> from simtk.openmm.app import StateDataReporter >>> md_reporters = [ StateDataReporter('test.log', 5) ] >>> ncmc_reporters = [ StateDataReporter('test-ncmc.log', 5) ] >>> simulations.md = simulations.attachReporters( simulations.md, md_reporters) >>> simulations.ncmc = simulations.attachReporters( simulations.ncmc, ncmc_reporters) Alternatively, you can pass the configuration dict and list of reporters upon class initialization. This will do all of the above for convenience. >>> simulations = SimulationFactory(systems, ligand_mover, sim_cfg, md_reporters, ncmc_reporters) >>> print(simulations) >>> print(simulations.md) >>> print(simulations.ncmc) <blues.simulation.SimulationFactory object at 0x7f461b7a8b00> <simtk.openmm.app.simulation.Simulation object at 0x7f461b7a8780> <simtk.openmm.app.simulation.Simulation object at 0x7f461b7a87b8> >>> print(simulations.md.reporters) >>> print(simulations.ncmc.reporters) [<simtk.openmm.app.statedatareporter.StateDataReporter object at 0x7f1b4d24cac8>] [<simtk.openmm.app.statedatareporter.StateDataReporter object at 0x7f1b4d24cb70>] """ def __init__(self, systems, move_engine, config=None, md_reporters=None, ncmc_reporters=None): #Hide these properties since they exist on the SystemsFactory object self._structure = systems.structure self._system = systems.md self._alch_system = systems.alch #Atom indicies from move_engine #TODO: change atom_indices selection for multiple regions self._atom_indices = move_engine.moves[0].atom_indices self._move_engine = move_engine self.config = config #If parameters for generating the openmm.Simulation are given, make them. if self.config: try: self.generateSimulationSet() except Exception as e: logger.exception(e) raise e if md_reporters: self._md_reporters = md_reporters self.md = SimulationFactory.attachReporters(self.md, self._md_reporters) if ncmc_reporters: self._ncmc_reporters = ncmc_reporters self.ncmc = SimulationFactory.attachReporters(self.ncmc, self._ncmc_reporters)
[docs] @classmethod def addBarostat(cls, system, temperature=300 * unit.kelvin, pressure=1 * unit.atmospheres, frequency=25, **kwargs): """ Adds a MonteCarloBarostat to the MD system. Parameters ---------- system : openmm.System The OpenMM System object corresponding to the reference system. temperature : float, default=300 temperature (Kelvin) to be simulated at. pressure : int, configional, default=None Pressure (atm) for Barostat for NPT simulations. frequency : int, default=25 Frequency at which Monte Carlo pressure changes should be attempted (in time steps) Returns ------- system : openmm.System The OpenMM System with the MonteCarloBarostat attached. """ logger.info('Adding MonteCarloBarostat with {}. MD simulation will be {} NPT.'.format(pressure, temperature)) # Add Force Barostat to the system system.addForce(openmm.MonteCarloBarostat(pressure, temperature, frequency)) return system
[docs] @classmethod def generateIntegrator(cls, temperature=300 * unit.kelvin, dt=0.002 * unit.picoseconds, friction=1, **kwargs): """ Generates a LangevinIntegrator for the Simulations. Parameters ---------- temperature : float, default=300 temperature (Kelvin) to be simulated at. friction: float, default=1 friction coefficient which couples to the heat bath, measured in 1/ps dt: int, configional, default=0.002 The timestep of the integrator to use (in ps). Returns ------- integrator : openmm.LangevinIntegrator The LangevinIntegrator object intended for the System. """ integrator = openmm.LangevinIntegrator(temperature, friction, dt) return integrator
[docs] @classmethod def generateNCMCIntegrator( cls, nstepsNC=None, alchemical_functions={ 'lambda_sterics': 'min(1, (1/0.3)*abs(lambda-0.5))', 'lambda_electrostatics': 'step(0.2-lambda) - 1/0.2*lambda*step(0.2-lambda) + 1/0.2*(lambda-0.8)*step(lambda-0.8)' }, splitting="H V R O R V H", temperature=300 * unit.kelvin, dt=0.002 * unit.picoseconds, nprop=1, propLambda=0.3, **kwargs): """ Generates the AlchemicalExternalLangevinIntegrator using openmmtools. Parameters ----------- nstepsNC : int The number of NCMC relaxation steps to use. alchemical_functions : dict default = `{'lambda_sterics' : 'min(1, (1/0.3)*abs(lambda-0.5))', lambda_electrostatics' : 'step(0.2-lambda) - 1/0.2*lambda*step(0.2-lambda) + 1/0.2*(lambda-0.8)*step(lambda-0.8)'}` key : value pairs such as "global_parameter" : function_of_lambda where function_of_lambda is a Lepton-compatible string that depends on the variable "lambda". splitting : string, default: "H V R O R V H" Sequence of R, V, O (and optionally V{i}), and { }substeps to be executed each timestep. There is also an H option, which increments the global parameter `lambda` by 1/nsteps_neq for each step. Forces are only used in V-step. Handle multiple force groups by appending the force group index to V-steps, e.g. "V0" will only use forces from force group 0. "V" will perform a step using all forces. ( will cause metropolization, and must be followed later by a ). temperature : float, default=300 temperature (Kelvin) to be simulated at. dt: int, optional, default=0.002 The timestep of the integrator to use (in ps). nprop : int (Default: 1) Controls the number of propagation steps to add in the lambda region defined by `propLambda` propLambda: float, optional, default=0.3 The range which additional propogation steps are added, defined by [0.5-propLambda, 0.5+propLambda]. Returns ------- ncmc_integrator : blues.integrator.AlchemicalExternalLangevinIntegrator The NCMC integrator for the alchemical process in the NCMC simulation. """ #During NCMC simulation, lambda parameters are controlled by function dict below # Keys correspond to parameter type (i.e 'lambda_sterics', 'lambda_electrostatics') # 'lambda' = step/totalsteps where step corresponds to current NCMC step, ncmc_integrator = AlchemicalExternalLangevinIntegrator( alchemical_functions=alchemical_functions, splitting=splitting, temperature=temperature, nsteps_neq=nstepsNC, timestep=dt, nprop=nprop, prop_lambda=propLambda) return ncmc_integrator
[docs] @classmethod def generateSimFromStruct(cls, structure, system, integrator, platform=None, properties={}, **kwargs): """Generate the OpenMM Simulation objects from a given parmed.Structure() Parameters ---------- structure : parmed.Structure ParmEd Structure object of the entire system to be simulated. system : openmm.System The OpenMM System object corresponding to the reference system. integrator : openmm.Integrator The OpenMM Integrator object for the simulation. platform : str, default = None Valid choices: 'Auto', 'OpenCL', 'CUDA' If None is specified, the fastest available platform will be used. Returns ------- simulation : openmm.Simulation The generated OpenMM Simulation from the parmed.Structure, openmm.System, amd the integrator. """ #Specifying platform properties here used for local development. if platform is None: #Use the fastest available platform simulation = app.Simulation(structure.topology, system, integrator) else: platform = openmm.Platform.getPlatformByName(platform) #Make sure key/values are strings properties = {str(k): str(v) for k, v in properties.items()} simulation = app.Simulation(structure.topology, system, integrator, platform, properties) # Set initial positions/velocities if structure.box_vectors: simulation.context.setPeriodicBoxVectors(*structure.box_vectors) simulation.context.setPositions(structure.positions) simulation.context.setVelocitiesToTemperature(integrator.getTemperature()) return simulation
[docs] @staticmethod def attachReporters(simulation, reporter_list): """Attach the list of reporters to the Simulation object Parameters ---------- simulation : openmm.Simulation The Simulation object to attach reporters to. reporter_list : list of openmm Reporeters The list of reporters to attach to the OpenMM Simulation. Returns ------- simulation : openmm.Simulation The Simulation object with the reporters attached. """ for rep in reporter_list: simulation.reporters.append(rep) return simulation
[docs] def generateSimulationSet(self, config=None): """Generates the 3 OpenMM Simulation objects. Parameters ---------- config : dict Dictionary of parameters for configuring the OpenMM Simulations """ if not config: config = self.config #Construct MD Integrator and Simulation self.integrator = self.generateIntegrator(**config) #Check for pressure parameter to set simulation to NPT if 'pressure' in config.keys(): self._system = self.addBarostat(self._system, **config) logger.warning( 'NCMC simulation will NOT have pressure control. NCMC will use pressure from last MD state.') else: logger.info('MD simulation will be {} NVT.'.format(config['temperature'])) self.md = self.generateSimFromStruct(self._structure, self._system, self.integrator, **config) #Alchemical Simulation is used for computing correction term from MD simulation. alch_integrator = self.generateIntegrator(**config) self.alch = self.generateSimFromStruct(self._structure, self._system, alch_integrator, **config) #If the moveStep hasn't been calculated, recheck the NCMC steps. if 'moveStep' not in config.keys(): logger.warning('Did not find `moveStep` in configuration. Checking NCMC paramters') ncmc_parameters = utils.calculateNCMCSteps(**config) for k, v in ncmc_parameters.items(): config[k] = v self.config = config #Construct NCMC Integrator and Simulation self.ncmc_integrator = self.generateNCMCIntegrator(**config) #Initialize the Move Engine with the Alchemical System and NCMC Integrator for move in self._move_engine.moves: self._alch_system, self.ncmc_integrator = move.initializeSystem(self._alch_system, self.ncmc_integrator) self.ncmc = self.generateSimFromStruct(self._structure, self._alch_system, self.ncmc_integrator, **config) utils.print_host_info(self.ncmc)
[docs]class BLUESSimulation(object): """BLUESSimulation class provides methods to execute the NCMC+MD simulation. Parameters ---------- simulations : blues.simulation.SimulationFactory object SimulationFactory Object which carries the 3 required OpenMM Simulation objects (MD, NCMC, ALCH) required to run BLUES. config : dict Dictionary of parameters for configuring the OpenMM Simulations If None, will search for configuration parameters on the `simulations` object. Examples -------- Create our SimulationFactory object and run `BLUESSimulation` >>> sim_cfg = { 'platform': 'OpenCL', 'properties' : { 'OpenCLPrecision': 'single', 'OpenCLDeviceIndex' : 2}, 'nprop' : 1, 'propLambda' : 0.3, 'dt' : 0.001 * unit.picoseconds, 'friction' : 1 * 1/unit.picoseconds, 'temperature' : 100 * unit.kelvin, 'nIter': 1, 'nstepsMD': 10, 'nstepsNC': 10,} >>> simulations = SimulationFactory(systems, ligand_mover, sim_cfg) >>> blues = BLUESSimulation(simulations) >>> blues.run() """ def __init__(self, simulations, config=None): self._move_engine = simulations._move_engine self._md_sim = simulations.md self._alch_sim = simulations.alch self._ncmc_sim = simulations.ncmc # Check if configuration has been specified in `SimulationFactory` object if not config: if hasattr(simulations, 'config'): self._config = simulations.config else: #Otherwise take specified config self._config = config if self._config: self._printSimulationTiming() self.accept = 0 self.reject = 0 self.acceptRatio = 0 self.currentIter = 0 #Dict to keep track of each simulation state before/after each iteration self.stateTable = {'md': {'state0': {}, 'state1': {}}, 'ncmc': {'state0': {}, 'state1': {}}} #specify nc integrator variables to report in verbose output self._integrator_keys_ = ['lambda', 'shadow_work', 'protocol_work', 'Eold', 'Enew'] self._state_keys = { 'getPositions': True, 'getVelocities': True, 'getForces': False, 'getEnergy': True, 'getParameters': True, 'enforcePeriodicBox': True }
[docs] @classmethod def getStateFromContext(cls, context, state_keys): """Gets the State information from the given context and list of state_keys to query it with. Returns the state data as a dict. Parameters ---------- context : openmm.Context Context of the OpenMM Simulation to query. state_keys : list Default: [ positions, velocities, potential_energy, kinetic_energy ] A list that defines what information to get from the context State. Returns ------- stateinfo : dict Current positions, velocities, energies and box vectors of the context. """ stateinfo = {} state = context.getState(**state_keys) stateinfo['positions'] = state.getPositions(asNumpy=True) stateinfo['velocities'] = state.getVelocities(asNumpy=True) stateinfo['potential_energy'] = state.getPotentialEnergy() stateinfo['kinetic_energy'] = state.getKineticEnergy() stateinfo['box_vectors'] = state.getPeriodicBoxVectors() return stateinfo
[docs] @classmethod def getIntegratorInfo(cls, ncmc_integrator, integrator_keys=['lambda', 'shadow_work', 'protocol_work', 'Eold', 'Enew']): """Returns a dict of alchemical/ncmc-swtiching data from querying the the NCMC integrator. Parameters ---------- ncmc_integrator : openmm.Context.Integrator The integrator from the NCMC Context integrator_keys : list list containing strings of the values to get from the integrator. Default = ['lambda', 'shadow_work', 'protocol_work', 'Eold', 'Enew','Epert'] Returns ------- integrator_info : dict Work values and energies from the NCMC integrator. """ integrator_info = {} for key in integrator_keys: integrator_info[key] = ncmc_integrator.getGlobalVariableByName(key) return integrator_info
[docs] @classmethod def setContextFromState(cls, context, state, box=True, positions=True, velocities=True): """Update a given Context from the given State. Parameters ---------- context : openmm.Context The Context to be updated from the given State. state : openmm.State The current state (box_vectors, positions, velocities) of the Simulation to update the given context. Returns ------- context : openmm.Context The updated Context whose box_vectors, positions, and velocities have been updated. """ # Replace ncmc data from the md context if box: context.setPeriodicBoxVectors(*state['box_vectors']) if positions: context.setPositions(state['positions']) if velocities: context.setVelocities(state['velocities']) return context
[docs] def _printSimulationTiming(self): """Prints the simulation timing and related information.""" dt = self._config['dt'].value_in_unit(unit.picoseconds) nIter = self._config['nIter'] nprop = self._config['nprop'] propLambda = self._config['propLambda'] propSteps = self._config['propSteps'] nstepsNC = self._config['nstepsNC'] nstepsMD = self._config['nstepsMD'] force_eval = nIter * (propSteps + nstepsMD) time_ncmc_iter = propSteps * dt time_ncmc_total = time_ncmc_iter * nIter time_md_iter = nstepsMD * dt time_md_total = time_md_iter * nIter time_iter = time_ncmc_iter + time_md_iter time_total = time_iter * nIter msg = 'Total BLUES Simulation Time = %s ps (%s ps/Iter)\n' % (time_total, time_iter) msg += 'Total Force Evaluations = %s \n' % force_eval msg += 'Total NCMC time = %s ps (%s ps/iter)\n' % (time_ncmc_total, time_ncmc_iter) # Calculate number of lambda steps inside/outside region with extra propgation steps steps_in_prop = int(nprop * (2 * math.floor(propLambda * nstepsNC))) steps_out_prop = int((2 * math.ceil((0.5 - propLambda) * nstepsNC))) prop_lambda_window = self._ncmc_sim.context._integrator._prop_lambda # prop_range = round(prop_lambda_window[1] - prop_lambda_window[0], 4) if propSteps != nstepsNC: msg += '\t%s lambda switching steps within %s total propagation steps.\n' % (nstepsNC, propSteps) msg += '\tExtra propgation steps between lambda [%s, %s]\n' % (prop_lambda_window[0], prop_lambda_window[1]) msg += '\tLambda: 0.0 -> %s = %s propagation steps\n' % (prop_lambda_window[0], int(steps_out_prop / 2)) msg += '\tLambda: %s -> %s = %s propagation steps\n' % (prop_lambda_window[0], prop_lambda_window[1], steps_in_prop) msg += '\tLambda: %s -> 1.0 = %s propagation steps\n' % (prop_lambda_window[1], int(steps_out_prop / 2)) msg += 'Total MD time = %s ps (%s ps/iter)\n' % (time_md_total, time_md_iter) #Get trajectory frame interval timing for BLUES simulation if 'md_trajectory_interval' in self._config.keys(): frame_iter = nstepsMD / self._config['md_trajectory_interval'] timetraj_frame = (time_ncmc_iter + time_md_iter) / frame_iter msg += 'Trajectory Interval = %s ps/frame (%s frames/iter)' % (timetraj_frame, frame_iter) logger.info(msg)
[docs] def _setStateTable(self, simkey, stateidx, stateinfo): """Updates `stateTable` (dict) containing: Positions, Velocities, Potential/Kinetic energies of the state before and after a NCMC step or iteration. Parameters ---------- simkey : str (key: 'md', 'ncmc', 'alch') Key corresponding to the simulation. stateidx : str (key: 'state0' or 'state1') Key corresponding to the state information being stored. stateinfo : dict Dictionary containing the State information. """ self.stateTable[simkey][stateidx] = stateinfo
[docs] def _syncStatesMDtoNCMC(self): """Retrieves data on the current State of the MD context to replace the box vectors, positions, and velocties in the NCMC context. """ # Retrieve MD state from previous iteration md_state0 = self.getStateFromContext(self._md_sim.context, self._state_keys) self._setStateTable('md', 'state0', md_state0) # Sync MD state to the NCMC context self._ncmc_sim.context = self.setContextFromState(self._ncmc_sim.context, md_state0)
[docs] def _stepNCMC(self, nstepsNC, moveStep, move_engine=None): """Advance the NCMC simulation. Parameters ---------- nstepsNC : int The number of NCMC switching steps to advance by. moveStep : int The step number to perform the chosen move, which should be half the number of nstepsNC. move_engine : blues.moves.MoveEngine The object that executes the chosen move. """ logger.info('Advancing %i NCMC switching steps...' % (nstepsNC)) # Retrieve NCMC state before proposed move ncmc_state0 = self.getStateFromContext(self._ncmc_sim.context, self._state_keys) self._setStateTable('ncmc', 'state0', ncmc_state0) #choose a move to be performed according to move probabilities #TODO: will have to change to work with multiple alch region if not move_engine: move_engine = self._move_engine self._ncmc_sim.currentIter = self.currentIter move_engine.selectMove() lastStep = nstepsNC - 1 for step in range(int(nstepsNC)): try: #Attempt anything related to the move before protocol is performed if not step: self._ncmc_sim.context = move_engine.selected_move.beforeMove(self._ncmc_sim.context) # Attempt selected MoveEngine Move at the halfway point #to ensure protocol is symmetric if step == moveStep: if hasattr(logger, 'report'): logger.info = logger.report #Do move logger.info('Performing %s...' % move_engine.move_name) self._ncmc_sim.context = move_engine.runEngine(self._ncmc_sim.context) # Do 1 NCMC step with the integrator self._ncmc_sim.step(1) #Attempt anything related to the move after protocol is performed if step == lastStep: self._ncmc_sim.context = move_engine.selected_move.afterMove(self._ncmc_sim.context) except Exception as e: import traceback traceback.print_tb(e.__traceback__) logger.error(e) move_engine.selected_move._error(self._ncmc_sim.context) break # ncmc_state1 stores the state AFTER a proposed move. ncmc_state1 = self.getStateFromContext(self._ncmc_sim.context, self._state_keys) self._setStateTable('ncmc', 'state1', ncmc_state1)
[docs] def _computeAlchemicalCorrection(self): """Computes the alchemical correction term from switching between the NCMC and MD potentials.""" # Retrieve the MD/NCMC state before the proposed move. md_state0_PE = self.stateTable['md']['state0']['potential_energy'] ncmc_state0_PE = self.stateTable['ncmc']['state0']['potential_energy'] # Retreive the NCMC state after the proposed move. ncmc_state1 = self.stateTable['ncmc']['state1'] ncmc_state1_PE = ncmc_state1['potential_energy'] # Set the box_vectors and positions in the alchemical simulation to after the proposed move. self._alch_sim.context = self.setContextFromState(self._alch_sim.context, ncmc_state1, velocities=False) # Retrieve potential_energy for alch correction alch_PE = self._alch_sim.context.getState(getEnergy=True).getPotentialEnergy() correction_factor = (ncmc_state0_PE - md_state0_PE + alch_PE - ncmc_state1_PE) * ( -1.0 / self._ncmc_sim.context._integrator.kT) return correction_factor
[docs] def _acceptRejectMove(self, write_move=False): """Choose to accept or reject the proposed move based on the acceptance criterion. Parameters ---------- write_move : bool, default=False If True, writes the proposed NCMC move to a PDB file. """ work_ncmc = self._ncmc_sim.context._integrator.getLogAcceptanceProbability(self._ncmc_sim.context) randnum = math.log(np.random.random()) # Compute correction if work_ncmc is not NaN if not np.isnan(work_ncmc): correction_factor = self._computeAlchemicalCorrection() logger.debug( 'NCMCLogAcceptanceProbability = %.6f + Alchemical Correction = %.6f' % (work_ncmc, correction_factor)) work_ncmc = work_ncmc + correction_factor if work_ncmc > randnum: self.accept += 1 logger.info('NCMC MOVE ACCEPTED: work_ncmc {} > randnum {}'.format(work_ncmc, randnum)) # If accept move, sync NCMC state to MD context ncmc_state1 = self.stateTable['ncmc']['state1'] self._md_sim.context = self.setContextFromState(self._md_sim.context, ncmc_state1, velocities=False) if write_move: utils.saveSimulationFrame(self._md_sim, '{}acc-it{}.pdb'.format(self._config['outfname'], self.currentIter)) else: self.reject += 1 logger.info('NCMC MOVE REJECTED: work_ncmc {} < {}'.format(work_ncmc, randnum)) # If reject move, do nothing, # NCMC simulation be updated from MD Simulation next iteration. # Potential energy should be from last MD step in the previous iteration md_state0 = self.stateTable['md']['state0'] md_PE = self._md_sim.context.getState(getEnergy=True).getPotentialEnergy() if not math.isclose(md_state0['potential_energy']._value, md_PE._value, rel_tol=float('1e-%s' % rtol)): logger.error( 'Last MD potential energy %s != Current MD potential energy %s. Potential energy should match the prior state.' % (md_state0['potential_energy'], md_PE)) sys.exit(1)
[docs] def _resetSimulations(self, temperature=None): """At the end of each iteration: 1. Reset the step number in the NCMC context/integrator 2. Set the velocities to random values chosen from a Boltzmann distribution at a given `temperature`. Parameters ---------- temperature : float The target temperature for the simulation. """ if not temperature: temperature = self._md_sim.context._integrator.getTemperature() self._ncmc_sim.currentStep = 0 self._ncmc_sim.context._integrator.reset() #Reinitialize velocities, preserving detailed balance? self._md_sim.context.setVelocitiesToTemperature(temperature)
[docs] def _stepMD(self, nstepsMD): """Advance the MD simulation. Parameters ---------- nstepsMD : int The number of steps to advance the MD simulation. """ logger.info('Advancing %i MD steps...' % (nstepsMD)) self._md_sim.currentIter = self.currentIter # Retrieve MD state before proposed move # Helps determine if previous iteration placed ligand poorly md_state0 = self.stateTable['md']['state0'] for md_step in range(int(nstepsMD)): try: self._md_sim.step(1) except Exception as e: logger.error(e, exc_info=True) logger.error('potential energy before NCMC: %s' % md_state0['potential_energy']) logger.error('kinetic energy before NCMC: %s' % md_state0['kinetic_energy']) #Write out broken frame utils.saveSimulationFrame(self._md_sim, 'MD-fail-it%s-md%i.pdb' % (self.currentIter, self._md_sim.currentStep)) sys.exit(1)
[docs] def run(self, nIter=0, nstepsNC=0, moveStep=0, nstepsMD=0, temperature=300, write_move=False, **config): """Executes the BLUES engine to iterate over the actions: Perform NCMC simulation, perform proposed move, accepts/rejects move, then performs the MD simulation from the NCMC state, niter number of times. **Note:** If the parameters are not given explicitly, will look for the parameters in the provided configuration on the `SimulationFactory` object. Parameters ---------- nIter : int, default = None Number of iterations of NCMC+MD to perform. nstepsNC : int The number of NCMC switching steps to advance by. moveStep : int The step number to perform the chosen move, which should be half the number of nstepsNC. nstepsMD : int The number of steps to advance the MD simulation. temperature : float The target temperature for the simulation. write_move : bool, default=False If True, writes the proposed NCMC move to a PDB file. """ if not nIter: nIter = self._config['nIter'] if not nstepsNC: nstepsNC = self._config['nstepsNC'] if not nstepsMD: nstepsMD = self._config['nstepsMD'] if not moveStep: moveStep = self._config['moveStep'] logger.info('Running %i BLUES iterations...' % (nIter)) for N in range(int(nIter)): self.currentIter = N logger.info('BLUES Iteration: %s' % N) self._syncStatesMDtoNCMC() self._stepNCMC(nstepsNC, moveStep) self._acceptRejectMove(write_move) self._resetSimulations(temperature) self._stepMD(nstepsMD) # END OF NITER self.acceptRatio = self.accept / float(nIter) logger.info('Acceptance Ratio: %s' % self.acceptRatio) logger.info('nIter: %s ' % nIter)
class MonteCarloSimulation(BLUESSimulation): """Simulation class provides the functions that perform the MonteCarlo run. Parameters ---------- simulations : SimulationFactory Contains 3 required OpenMM Simulationobjects config : dict, default = None Dict with configuration info. """ def __init__(self, simulations, config=None): super(MonteCarloSimulation, self).__init__(simulations, config) def _stepMC_(self): """Function that performs the MC simulation. """ #choose a move to be performed according to move probabilities self._move_engine.selectMove() #change coordinates according to Moves in MoveEngine new_context = self._move_engine.runEngine(self._md_sim.context) md_state1 = self.getStateFromContext(new_context, self._state_keys) self._setStateTable('md', 'state1', md_state1) def _acceptRejectMove(self, temperature=None): """Function that chooses to accept or reject the proposed move. """ md_state0 = self.stateTable['md']['state0'] md_state1 = self.stateTable['md']['state1'] work_mc = (md_state1['potential_energy'] - md_state0['potential_energy']) * ( -1.0 / self._ncmc_sim.context._integrator.kT) randnum = math.log(np.random.random()) if work_mc > randnum: self.accept += 1 logger.info('MC MOVE ACCEPTED: work_mc {} > randnum {}'.format(work_mc, randnum)) self._md_sim.context.setPositions(md_state1['positions']) else: self.reject += 1 logger.info('MC MOVE REJECTED: work_mc {} < {}'.format(work_mc, randnum)) self._md_sim.context.setPositions(md_state0['positions']) self._md_sim.context.setVelocitiesToTemperature(temperature) def run(self, nIter=0, mc_per_iter=0, nstepsMD=0, temperature=300, write_move=False): """Function that runs the BLUES engine to iterate over the actions: perform proposed move, accepts/rejects move, then performs the MD simulation from the accepted or rejected state. Parameters ---------- nIter : None or int, optional default = None The number of iterations to perform. If None, then uses the nIter specified in the opt dictionary when the Simulation class was created. mc_per_iter : int, default = 1 Number of Monte Carlo iterations. nstepsMD : int, default = None Number of steps the MD simulation will advance write_move : bool, default = False Writes the move if True """ if not nIter: nIter = self._config['nIter'] if not nstepsMD: nstepsMD = self._config['nstepsMD'] #controls how many mc moves are performed during each iteration if not mc_per_iter: mc_per_iter = self._config['mc_per_iter'] self._syncStatesMDtoNCMC() for N in range(nIter): self.currentIter = N logger.info('MonteCarlo Iteration: %s' % N) for i in range(mc_per_iter): self._syncStatesMDtoNCMC() self._stepMC_() self._acceptRejectMove(temperature) self._stepMD(nstepsMD)