BrainTasks or how to customize your TPS simulation#

As already mentioned in the example notebooks that show how to run a TPS simulation with aimmd.distributed, the central object of the TPS simulation (the Brain) runs all its BrainTasks after every Monte Carlo step. This is very similar to the concept of a hook in openpathsampling with the difference that openpathsampling defines pre- and post-step hooks, while in aimmd.distributed the BrainTasks are only called after the Monte Carlo step, i.e. only post-step hooks are currently implemented. Importantly, in aimmd.distributed the BrainTaskss are run after completing but before saving the Monte Carlo step, which enables the tasks to add additional information to the Monte Carlo steps.

In addition the the predefined BrainTasks to e.g. train the model, save the trainset, and perform the density collection (for the density correction in \(\phi_B\)-space), users can easily define their own BrainTasks to modify the behavior of their TPS simulation. To this end one just needs to subclass the BrainTask abstract base class and attach the resulting user-defined BrainTask to the Brain as usual.

Required knowledge/recommended reading: This notebook assumes that you are familiar with setting up and running a TPS simulation using aimmd.distributed. If you are not familliar with running an aimmd.distributed TPS simulation, please have a look at the notebooks TPS_1_setup_and_run_simulation.ipynb to TPS_4_rerun_with_changed_parameters_or_recover_crashed_simulations.ipynb or TPS_with_EQ_SPs_1_generate_SPs_from_UmbrellaSampling.ipynb to TPS_with_EQ_SPs_4_rerun_with_changed_parameters_or_recover_crashed_simulations.ipynb first.

Imports and set working directory#

import os
import torch
import asyncmd
import asyncmd.gromacs as asyncgmx
from asyncmd import Trajectory
import aimmd
import aimmd.distributed as aimmdd
/home/think/.conda/envs/aimmd_dev/lib/python3.10/site-packages/tqdm/auto.py:22: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
  from .autonotebook import tqdm as notebook_tqdm
Could not initialize SLURM cluster handling. If you are sure SLURM (sinfo/sacct/etc) is available try calling `asyncmd.config.set_slurm_settings()` with the appropriate arguments.
Tensorflow/Keras not available
# setup working directory
#scratch_dir = "/homeloc/scratch/aimmd_distributed/"
scratch_dir = "."

workdir = os.path.join(scratch_dir, "TransitionPathSampling_ala_customizing_TPS_with_BrainTasks")
if not os.path.isdir(workdir):
    os.mkdir(workdir)

User-defined BrainTasks#

As mentioned above, we will subclass the aimmd.distributed.pathsampling.BrainTask abstract base class to define our own BrainTask.

The only two methods we need to define are __init__ (to initialize our class) and run (which will be called after every Monte Carlo step by the Brain). The run method will be called with three arguments: the Brain which performs the simulation, the Monte Carlo step that just finished and the index of the sampler that produced the Monte Carlo step. It will be called after the Monte Carlo step finished but before it is saved, i.e. the BrainTasks can add or modify attributes of the Monte Carlo step and these changes will be permanent.

Below are two (somewhat useless to dangerous) examples for user-defined BrainTasks:

  • The VerbosePrintTask just prints some info every time it gets called with a Monte Carlo step. Note that its job can be done by the Brain itself if you call the Brains run_for_n_steps or run_for_n_accepts methods with print_progress=1 when running the simulation. I.e. this BrainTask is just somewhat useless.

  • The StupidTask does something arguably very stupid, namely that it breaks the Markov Chain Monte Carlo by modifying the Monte Carlo step to a non-accepted step if its acceptance probability is smaller than p_cut. This is done here mostly to showcase that BrainTasks are a powerfull tool which enables you to do (almost) arbitrary things to modify the behavior of your TPS simulation, if they make sense is another story. As always: With great power comes great responsibility and great potential for mistakes ;) You could however use the capability to modify Monte Carlo steps in a useful way to e.g. perform swap moves between two different samplers using a BrainTask that waits for both of them to finish and then attempts a swap.

Note that each BrainTask will be run if the stepnumber is divisible by interval, e.g. a BrainTask with interval=3 will only be run after the 3rd, 6th, 9th, etc. Monte Carlo step. Furthermore, BrainTasks are called in the order in which they are passed to the Brain, e.g. if you pass tasks= [VerbosePrintTask(), StupidTask()] then the VerbosePrintTask will always run before the StupidTask (at least if they both are supposed to run at a given stepnumber).

import aimmd.distributed
from aimmd.distributed.pathmovers import MCstep  # only needed for the type hints


class VerbosePrintTask(aimmd.distributed.pathsampling.BrainTask):
    def __init__(self, interval: int = 1, name: str = "VerbosePrintTask"):
        super().__init__(interval)
        self._call_count = 0
        self.name = name

    async def run(self, brain, mcstep: MCstep, sampler_idx: int):
        # This will be called every `interval` finished Monte Carlo steps
        self._call_count += 1  # increment call count to see how many times we call run
        # We just print some basic info
        print(f"A sampler ({brain.samplers[sampler_idx]}) (index={sampler_idx}) "
              f"produced a Monte Carlo step ({mcstep}).\n"
              f"This BrainTask with name {self.name} ({self}) got called for the {self._call_count}th time."
              )


class StupidTask(aimmd.distributed.pathsampling.BrainTask):
    def __init__(self, interval: int = 1, p_cut: float = 0.5):
        super().__init__(interval)
        self.p_cut = p_cut

    async def run(self, brain, mcstep: MCstep, sampler_idx: int):
        if mcstep.p_acc <= self.p_cut:
            mcstep.accepted = False


class MakeEveryStepAcceptedTask(aimmd.distributed.pathsampling.BrainTask):
    def __init__(self, interval: int = 1):
        super().__init__(interval)

    async def run(self, brain, mcstep: MCstep, sampler_idx: int):
        if not mcstep.accepted:
            # modify non-accepted steps to be accepted
            # NOTE: this will (most likely) crash a sequential TPS simulation
            #       because steps that are not accepted do not need to contain
            #       a valid transition (i.e. their `path` attribute is not set)
            #       so we can not start a new Monte Carlo step by shooting from
            #       the last transition path
            #       In a TPS simulation with equilibrium shooting points this
            #       BrainTask will not have any effect because there every step
            #       is formally accepted (and has an associated weight which
            #       can be zero)
            mcstep.accepted = True

Setup the TPS simulation#

This is the same setup as used in TPS_1_setup_and_run_simulation.ipynb, just with less comments and explanations.

# number of samplers
n_samplers = 2  # results in 2*n_samplers gmx engines

# storage file
storage = aimmd.Storage(os.path.join(workdir, "storage.h5"))

# state functions and descriptor transform function
os.chdir("..")  # chdir to one level above to import state_funcs_mda.py
from state_funcs_mda import alpha_R, C7_eq, descriptor_func_ic
os.chdir("Advanced_topics")  # and back to the folder in which we run the notebook
# state functions
wrapped_alphaR = asyncmd.trajectory.PyTrajectoryFunctionWrapper(alpha_R)
wrapped_C7_eq = asyncmd.trajectory.PyTrajectoryFunctionWrapper(C7_eq)
# descriptor transform
# descriptor_func_ic gives us an internal coordinate representation (i.e. bond lengths, angles and dihedrals)
wrapped_transform = asyncmd.trajectory.PyTrajectoryFunctionWrapper(descriptor_func_ic,
                                                                   call_kwargs={"molecule_selection": "protein"},
                                                                   )

# Underlying dynamics/ Define the engine(s) for the PathMovers (they will all be the same)
gro = "../gmx_infiles/conf.gro"
top = "../gmx_infiles/topol_amber99sbildn.top"
ndx = "../gmx_infiles/index.ndx"
mdp = asyncgmx.MDP("../gmx_infiles/md.mdp")
gmx_engine_kwargs = {"mdconfig": mdp,
                     "gro_file": gro,
                     "top_file": top,
                     "ndx_file": ndx,
                     "output_traj_type": "XTC",
                     #"mdrun_extra_args": "-nt 2",
                     # use this for gmx sans (thread) MPI
                     "mdrun_extra_args": "-ntomp 2",
                     }
gmx_engine_cls = asyncgmx.GmxEngine

# initial transition
tp_initial = Trajectory(structure_file="../gmx_infiles/ala_300K_amber99sb-ildn.tpr",
                        trajectory_files="../gmx_infiles/TP_low_barrier_300K_amber99sbildn.trr",
                        )

# Model definition
# first get the descriptors for them to infer the number of inputs for our model
descriptors_for_initial_tp = await wrapped_transform(tp_initial)
# architecture specification
n_lay_pyramid = 5  # number of resunits
n_unit_top = 10  # number of units in the last layer before the log_predictor
dropout_base = 0.3  # dropot fraction in the first layer (will be reduced going to the top)
n_unit_base = cv_ndim = descriptors_for_initial_tp.shape[1]  # input dimension
# the factor by which we reduce the number of units per layer (the width) and the dropout fraction
fact = (n_unit_top / n_unit_base)**(1./(n_lay_pyramid))

modules = []
for i in range(1, n_lay_pyramid + 1):
    modules += [aimmd.pytorch.networks.FFNet(n_in=max(n_unit_top, int(n_unit_base * fact**(i-1))),
                                             n_hidden=[max(n_unit_top, int(n_unit_base * fact**i))],  # 1 hidden layer network
                                             activation=torch.nn.Identity(),
                                             dropout={"0": dropout_base * fact**i}
                                             )
                ]
    print(f"ResUnit {i} is {max(n_unit_top, int(n_unit_base * fact**(i)))} units wide.")
    print(f"Dropout before it is {dropout_base * fact**i}.")
    modules += [aimmd.pytorch.networks.ResNet(n_units=max(n_unit_top, int(n_unit_base * fact**i)),
                                              n_blocks=1)
                ]
torch_model = aimmd.pytorch.networks.ModuleStack(n_out=1, modules=modules)
# move model to GPU if CUDA is available
if torch.cuda.is_available():
    torch_model = torch_model.to('cuda')
# optimizer to train the model
optimizer = torch.optim.Adam(torch_model.parameters(), lr=1e-3)
# wrapp the pytorch neural network model in a RCModel class,
model = aimmd.pytorch.EEScalePytorchRCModelAsync(nnet=torch_model,
                                                 optimizer=optimizer,
                                                 states=[wrapped_C7_eq, wrapped_alphaR],
                                                 ee_params={'lr_0': 1e-3,
                                                            'lr_min': 5e-5,
                                                            'epochs_per_train': 3,
                                                            'window': 100,
                                                            'batch_size': 8192,
                                                           },
                                                 descriptor_transform=wrapped_transform,
                                                 cache_file=storage,
                                                 )

# Define the TPS sampling scheme
# shooting point selection
spselector = aimmdd.spselectors.RCModelSPSelectorFromTraj()
# and setup the movers lists (i.e. mover_cls and mover_kwargs for each sampler)
# here we just use one move-type per sampler and therefore only have one entry
# in each list
movers_cls = [aimmdd.pathmovers.TwoWayShootingPathMover]
movers_kwargs = [{'states': [wrapped_alphaR, wrapped_C7_eq],
                  'engine_cls': gmx_engine_cls,
                  'engine_kwargs': gmx_engine_kwargs,
                  'walltime_per_part': 0.00003125,  # 0.1125 s per part
                  #'walltime_per_part': 0.0000625,  # 0.225 s per part
                  'T': mdp["ref-t"][0],
                  "sp_selector": spselector,
                  "max_steps": 500 * 10**5,  # 500 steps * dt (2 fs) = 1 ps
                  }
                 ]

# Trainset
trainset = aimmd.TrainSet(n_states=2)
ResUnit 1 is 66 units wide.
Dropout before it is 0.18674307214231128.
ResUnit 2 is 41 units wide.
Dropout before it is 0.1162432499771616.
ResUnit 3 is 25 units wide.
Dropout before it is 0.07235873872180604.
ResUnit 4 is 16 units wide.
Dropout before it is 0.045041643884176266.
ResUnit 5 is 10 units wide.
Dropout before it is 0.02803738317757008.

Initialize BrainTasks#

Here we will initialize our user-defined BrainTasks and also the basic BrainTasks needed for a TPS simulation.

tasks = [
    # TrainingTask
    aimmdd.pathsampling.TrainingTask(model=model, trainset=trainset),
    # SaveTask
    aimmdd.pathsampling.SaveTask(storage=storage, model=model, trainset=trainset),
    # DensityCollectionTask
    aimmdd.pathsampling.DensityCollectionTask(model=model,
                                              first_collection=100,
                                              recreate_interval=250,
                                              ),
    # this task will print after every finished Monte Carlo step
    VerbosePrintTask(interval=1, name="VerbosePrintTask_interval=1"),
    # this task will print only every 3rd finished Monte Carlo step
    VerbosePrintTask(interval=3, name="VerbosePrintTask_interval=3"),
    # p_cut=100 should result in no accepted Monte Carlo steps as we will only
    # accept steps that have p_acc >= 100
    # (p_cut=0. would result in no modified Monte Carlo steps at all)
    StupidTask(interval=1, p_cut=100.),
    # uncomment the next line if you want to crash your TPS simulation :)
    #MakeEveryStepAcceptedTask(interval=1),
         ]

Initialize the Brain (and attach the BrainTasks)#

Also seed the initial transition.

brain = aimmdd.Brain.samplers_from_moverlist(model=model, workdir=workdir, storage=storage,
                                             n_sampler=n_samplers,
                                             movers_cls=movers_cls, movers_kwargs=movers_kwargs,
                                             samplers_use_same_stepcollection=False,
                                             tasks=tasks)
# seed initial transition for each Markov chain sampler
brain.seed_initial_paths(trajectories=[tp_initial])

Run the simulation#

# we should call the second VerbosePrintTask 3 times and the first one 7 times
await brain.run_for_n_steps(7)
A sampler (<aimmd.distributed.pathsampling.PathChainSampler object at 0x7f3c4f0b0640>) (index=0) produced a Monte Carlo step (MCStep(mover=TwoWayShootingPathMover, stepnum=1, states_reached=[1. 1.], accepted=True, p_acc=8.33322158584474,
       predicted_committors_sp=[0.536691], weight=1.0,
       directory=TransitionPathSampling_ala_customizing_TPS_with_BrainTasks/sampler_0/mcstep_1)).
This BrainTask with name VerbosePrintTask_interval=1 (<__main__.VerbosePrintTask object at 0x7f3c4f0b0f10>) got called for the 1th time.
A sampler (<aimmd.distributed.pathsampling.PathChainSampler object at 0x7f3c4f0b0640>) (index=0) produced a Monte Carlo step (MCStep(mover=TwoWayShootingPathMover, stepnum=1, states_reached=[1. 1.], accepted=True, p_acc=8.33322158584474,
       predicted_committors_sp=[0.536691], weight=1.0,
       directory=TransitionPathSampling_ala_customizing_TPS_with_BrainTasks/sampler_0/mcstep_1)).
This BrainTask with name VerbosePrintTask_interval=3 (<__main__.VerbosePrintTask object at 0x7f3c4f0b1ed0>) got called for the 1th time.
A sampler (<aimmd.distributed.pathsampling.PathChainSampler object at 0x7f3d357b7790>) (index=1) produced a Monte Carlo step (MCStep(mover=TwoWayShootingPathMover, stepnum=1, states_reached=[2. 0.], accepted=False, p_acc=0, predicted_committors_sp=[0.53626174],
       weight=1, directory=TransitionPathSampling_ala_customizing_TPS_with_BrainTasks/sampler_1/mcstep_1)).
This BrainTask with name VerbosePrintTask_interval=1 (<__main__.VerbosePrintTask object at 0x7f3c4f0b0f10>) got called for the 2th time.
A sampler (<aimmd.distributed.pathsampling.PathChainSampler object at 0x7f3c4f0b0640>) (index=0) produced a Monte Carlo step (MCStep(mover=TwoWayShootingPathMover, stepnum=2, states_reached=[2. 0.], accepted=False, p_acc=0, predicted_committors_sp=[0.53909445],
       weight=1, directory=TransitionPathSampling_ala_customizing_TPS_with_BrainTasks/sampler_0/mcstep_2)).
This BrainTask with name VerbosePrintTask_interval=1 (<__main__.VerbosePrintTask object at 0x7f3c4f0b0f10>) got called for the 3th time.
A sampler (<aimmd.distributed.pathsampling.PathChainSampler object at 0x7f3d357b7790>) (index=1) produced a Monte Carlo step (MCStep(mover=TwoWayShootingPathMover, stepnum=2, states_reached=[2. 0.], accepted=False, p_acc=0, predicted_committors_sp=[0.5362103],
       weight=1, directory=TransitionPathSampling_ala_customizing_TPS_with_BrainTasks/sampler_1/mcstep_2)).
This BrainTask with name VerbosePrintTask_interval=1 (<__main__.VerbosePrintTask object at 0x7f3c4f0b0f10>) got called for the 4th time.
A sampler (<aimmd.distributed.pathsampling.PathChainSampler object at 0x7f3d357b7790>) (index=1) produced a Monte Carlo step (MCStep(mover=TwoWayShootingPathMover, stepnum=2, states_reached=[2. 0.], accepted=False, p_acc=0, predicted_committors_sp=[0.5362103],
       weight=1, directory=TransitionPathSampling_ala_customizing_TPS_with_BrainTasks/sampler_1/mcstep_2)).
This BrainTask with name VerbosePrintTask_interval=3 (<__main__.VerbosePrintTask object at 0x7f3c4f0b1ed0>) got called for the 2th time.
A sampler (<aimmd.distributed.pathsampling.PathChainSampler object at 0x7f3c4f0b0640>) (index=0) produced a Monte Carlo step (MCStep(mover=TwoWayShootingPathMover, stepnum=3, states_reached=[2. 0.], accepted=False, p_acc=0, predicted_committors_sp=[0.50561476],
       weight=1, directory=TransitionPathSampling_ala_customizing_TPS_with_BrainTasks/sampler_0/mcstep_3)).
This BrainTask with name VerbosePrintTask_interval=1 (<__main__.VerbosePrintTask object at 0x7f3c4f0b0f10>) got called for the 5th time.
A sampler (<aimmd.distributed.pathsampling.PathChainSampler object at 0x7f3d357b7790>) (index=1) produced a Monte Carlo step (MCStep(mover=TwoWayShootingPathMover, stepnum=3, states_reached=[2. 0.], accepted=False, p_acc=0, predicted_committors_sp=[0.5066172],
       weight=1, directory=TransitionPathSampling_ala_customizing_TPS_with_BrainTasks/sampler_1/mcstep_3)).
This BrainTask with name VerbosePrintTask_interval=1 (<__main__.VerbosePrintTask object at 0x7f3c4f0b0f10>) got called for the 6th time.
A sampler (<aimmd.distributed.pathsampling.PathChainSampler object at 0x7f3c4f0b0640>) (index=0) produced a Monte Carlo step (MCStep(mover=TwoWayShootingPathMover, stepnum=4, states_reached=[2. 0.], accepted=False, p_acc=0, predicted_committors_sp=[0.50562847],
       weight=1, directory=TransitionPathSampling_ala_customizing_TPS_with_BrainTasks/sampler_0/mcstep_4)).
This BrainTask with name VerbosePrintTask_interval=1 (<__main__.VerbosePrintTask object at 0x7f3c4f0b0f10>) got called for the 7th time.
A sampler (<aimmd.distributed.pathsampling.PathChainSampler object at 0x7f3c4f0b0640>) (index=0) produced a Monte Carlo step (MCStep(mover=TwoWayShootingPathMover, stepnum=4, states_reached=[2. 0.], accepted=False, p_acc=0, predicted_committors_sp=[0.50562847],
       weight=1, directory=TransitionPathSampling_ala_customizing_TPS_with_BrainTasks/sampler_0/mcstep_4)).
This BrainTask with name VerbosePrintTask_interval=3 (<__main__.VerbosePrintTask object at 0x7f3c4f0b1ed0>) got called for the 3th time.
# check that no MCStep is accepted (if its acceptance probability is smaller than 100)
all_steps = []
for c_idx, collection in enumerate(storage.mcstep_collections):
    print(f"Working on mcstepcollection with index {c_idx}.")
    for s_idx, step in enumerate(collection):
        print(f"    Step with index {s_idx}: {step}")
        if s_idx != 0:
            all_steps += [step]
    print("-" * 120)

print(f"All steps (except for the initial seed steps) have been rejected: {not any(s.accepted for s in all_steps)}")
Working on mcstepcollection with index 0.
    Step with index 0: MCStep(mover=None, stepnum=0, states_reached=None, accepted=True, p_acc=1, predicted_committors_sp=None, weight=1,
       directory=TransitionPathSampling_ala_customizing_TPS_with_BrainTasks/sampler_0/mcstep_0)
    Step with index 1: MCStep(mover=TwoWayShootingPathMover, stepnum=1, states_reached=[1. 1.], accepted=False, p_acc=8.33322158584474,
       predicted_committors_sp=[0.536691], weight=1.0,
       directory=TransitionPathSampling_ala_customizing_TPS_with_BrainTasks/sampler_0/mcstep_1)
    Step with index 2: MCStep(mover=TwoWayShootingPathMover, stepnum=2, states_reached=[2. 0.], accepted=False, p_acc=0, predicted_committors_sp=[0.53909445],
       weight=1, directory=TransitionPathSampling_ala_customizing_TPS_with_BrainTasks/sampler_0/mcstep_2)
    Step with index 3: MCStep(mover=TwoWayShootingPathMover, stepnum=3, states_reached=[2. 0.], accepted=False, p_acc=0, predicted_committors_sp=[0.50561476],
       weight=1, directory=TransitionPathSampling_ala_customizing_TPS_with_BrainTasks/sampler_0/mcstep_3)
    Step with index 4: MCStep(mover=TwoWayShootingPathMover, stepnum=4, states_reached=[2. 0.], accepted=False, p_acc=0, predicted_committors_sp=[0.50562847],
       weight=1, directory=TransitionPathSampling_ala_customizing_TPS_with_BrainTasks/sampler_0/mcstep_4)
------------------------------------------------------------------------------------------------------------------------
Working on mcstepcollection with index 1.
    Step with index 0: MCStep(mover=None, stepnum=0, states_reached=None, accepted=True, p_acc=1, predicted_committors_sp=None, weight=1,
       directory=TransitionPathSampling_ala_customizing_TPS_with_BrainTasks/sampler_1/mcstep_0)
    Step with index 1: MCStep(mover=TwoWayShootingPathMover, stepnum=1, states_reached=[2. 0.], accepted=False, p_acc=0, predicted_committors_sp=[0.53626174],
       weight=1, directory=TransitionPathSampling_ala_customizing_TPS_with_BrainTasks/sampler_1/mcstep_1)
    Step with index 2: MCStep(mover=TwoWayShootingPathMover, stepnum=2, states_reached=[2. 0.], accepted=False, p_acc=0, predicted_committors_sp=[0.5362103],
       weight=1, directory=TransitionPathSampling_ala_customizing_TPS_with_BrainTasks/sampler_1/mcstep_2)
    Step with index 3: MCStep(mover=TwoWayShootingPathMover, stepnum=3, states_reached=[2. 0.], accepted=False, p_acc=0, predicted_committors_sp=[0.5066172],
       weight=1, directory=TransitionPathSampling_ala_customizing_TPS_with_BrainTasks/sampler_1/mcstep_3)
------------------------------------------------------------------------------------------------------------------------
All steps (except for the initial seed steps) have been rejected: True
# close the storage
storage.close()