AEP 008: Allow CalcJobs to be actively monitored and interrupted#

AEP number

008

Title

Allow CalcJobs to be actively monitored and interrupted

Authors

Sebastiaan P. Huber (sphuber)

Champions

Sebastiaan P. Huber (sphuber)

Type

S - Standard

Created

16-Sep-2022

Status

submitted

Table of contents#

Background#

The CalcJob interface allows to run code, external to AiiDA, on a pre-configured compute resource. Currently there is no way to control the termination of jobs, either manually or automatically, other than by killing the CalcJob. This will cause the engine to terminate the job through the job scheduler associated with the compute resource. The downside of this method is that a) it is not automated, and b) the calculation is killed in an abrupt manner, without giving it the opportunity to shut down cleanly.

There are a variety of use-cases where one would like a CalcJob to be monitored and, under certain conditions, have it shutdown, potentially in a clean manner, before it reaches the end. This AEP describes new functionality that will allow users to define such monitors and attach them to a CalcJob.

Design goals#

The new functionality should or may satisfy the following requirements, in order of importance:

  • Functionality: The CalcJob monitor interface should allow to retrieve files from the remote working directory of the job in order to inspect them and determine whether the job should be stopped

  • Functionality: The CalcJob monitor interface should allow to write files to the remote working directory of the job. This is useful for codes that actively look for files in the running directory to determine whether to shut down cleanly.

  • Functionality: The CalcJob monitor interface should allow to execute commands in the remote working directory of the job. This could be important to execute specific code that will pre-process output generated by the job. For example, this could be used to analyze large output files and produce smaller report files, which would allow to prevent retrieving large output files repeatedly. The main goal of this feature request is not to encourage post-processing entirely to within the monitor framework, but merely serves as a manner to reduce the transfer of data from the remote computer to the machine where AiiDA is running.

  • Interface: Defining new monitors and adding monitors to CalcJobs should be as simple as possible and should require as little custom setup and configuration as possible.

  • Performance: The implementation should minimize the impact on the performance of AiiDA. It should limit network usage, opening transport connections to remote compute resources, the load on daemon workers, and the load on the database.

Besides the main design goals described above, there was an additional goal that was discussed but that is not considered as required:

  • Functionality: The monitoring of jobs remains active even when AiiDA itself is not running.

The reason is that it is currently not clear from known use-cases whether this requirement is necessary and it would significantly complicate the implementation. It is therefore mentioned here for completeness but the implementation of this AEP will not satisfy it. See the section on “Design disadvantages” for a more detailed discussion on this topic.

Implementation#

The current engine design has the capability of processing multiple tasks for a calculation job asynchronously. Through the use of an event loop, the engine can handle multiple jobs and multiple tasks (uploading input files, submitting the job to the scheduler, etc.) per processes at the same time. One of these tasks is polling the status of the job through the scheduler in order to know whether it has terminated and the outputs can be retrieved. The implementation of this AEP extends this task to also execute all monitors that were attached to the CalcJob, if any. Each iteration of the job update task, the engine will loop over the monitors, call them one by one, and act based on their response, for example, with the instruction to kill the job. The advantages and disadvantages of this design are discussed in the section Design discussion.

User interface#

Two major considerations for the design of the user interface are:

  • The interface should be as simple as possible and require as little setup as possible.

  • The interface should limit the user as little as possible with respect to what behavior a monitor can implement.

Based on these considerations, the design opts to integrate the monitors tightly to the CalcJob itself. A monitor is implemented as a Python function that is registered with an entry point in the aiida.calculations.monitors entry point group. Monitors can be attached to a CalcJob by passing their entry point strings in the monitors input namespace.

Base functionality#

A CalcJob monitor is a function that implements the following signature:

from aiida.engine import CalcJobMonitorResult
from aiida.orm import CalcJobNode
from aiida.transports import Transport

def monitor(node: CalcJobNode, transport: Transport) -> str | CalcJobMonitorResult | None:
    """Retrieve and inspect files in working directory of job to determine whether the job should be killed.

    :param node: The node representing the calculation job.
    :param transport: The transport that can be used to retrieve files from remote working directory.
    :returns: A string if the job should be killed, `None` otherwise.
    """

The node and the transport arguments are required. The node is a reference to the calculation job node, which can be used to retrieve its input nodes, for example. The transport can be used to retrieve files from the working directory of the calculation running on the remote computer. This allows you to inspect the content and determine whether the job should be prematurely killed. In addition, it can also be used to write files to the working directory, or execute commands on the remote computer.

A monitor can define additional keyword arguments that a user can use to modify its behavior. The arguments can take any value, as long as it is JSON-serializable. This is necessary because the arguments that are passed to a monitor are stored in the database in order to preserve provenance.

from aiida.engine import CalcJobMonitorResult
from aiida.orm import CalcJobNode
from aiida.transports import Transport

def monitor(node: CalcJobNode, transport: Transport, custom_keyword: bool = False) -> str | CalcJobMonitorResult | None:
    """Retrieve and inspect files in working directory of job to determine whether the job should be killed.

    :param node: The node representing the calculation job.
    :param transport: The transport that can be used to retrieve files from remote working directory.
    :param custom_keyword: Optional keyword, when set to ``True`` will do something different.
    :returns: A string if the job should be killed, `None` otherwise.
    """

If the custom keyword arguments are written out explicitly, instead of the **kwargs catch-all, the user-specified values will be validated against the monitor specifications. If unsupported arguments are provided to a monitor, the calculation job will not start and the user will be notified of the mistake.

In order to attach a monitor to a CalcJob it first has to be registered with an entry point in the aiida.calculations.monitors group, for example:

[project.entry-points.'aiida.calculations.monitors']
'core.always_kill' = 'aiida.calculations.monitors.base:always_kill'

Once registered, the entry point can be used to attach the corresponding monitor to a CalcJob. The CalcJob input namespace contains the monitors namespace that accepts a dictionary of Dict nodes.

builder = load_code('bash@localhost').get_builder()
builder.x = Int(1)
builder.y = Int(2)
builder.monitors = {'always_kill': Dict({'entry_point': 'core.always_kill'})}
run.get_node(builder)

Each Dict node corresponds to a monitor that will be attached and has a single required keyword entry_point, which should correspond to the entry point name of the monitor. If the monitor supports custom keyword arguments, they can be specified in the kwargs attribute of the Dict node.

builder.monitors = {'always_kill': Dict({'entry_point': 'core.always_kill', 'kwargs': {'custom_keyword': True}})}

If during the runtime of the CalcJob, any of the monitors returns a string, the CalcJob will send the kill command through the scheduler, the current output files will be retrieved and the parser, if defined in the inputs, will be called. The CalcJob will eventually terminate and the CalcJob.exit_codes.STOPPED_BY_MONITOR exit code will be set.

A user can check whether a job was stopped by a monitor as follows:

assert node.exit_status == CalcJob.exit_codes.STOPPED_BY_MONITOR

Advanced functionality#

Monitor result#

The default operation for a monitor is to return a string, in which case the engine will kill the job through the scheduler and the CalcJob will proceed by retrieving and parsing the results. This behavior can be further customized through the MonitorResult object. This simple class has various attributes that can be used to control the response of the engine:

  • action: Instance of the MonitorResultAction enum, where the default is MonitorResultAction.kill. The available options are:

    • kill: Kill the job through the scheduler and proceed to the retrieve step of the CalcJob.

    • disable-self: Disable the current monitor for this CalcJob and do not call it anymore.

    • disable-all: Disable all monitors configured for this CalcJob.

  • retrieve: Boolean, True by default. If False, skip the retrieval of the output files.

  • parse: Boolean, True by default. If False, skip the parsing of the retrieved output files, if a parser was defined in the inputs of the CalcJob. Is ignored if MonitorResult.retrieve == True.

  • override_exit_code: Boolean, True by default. If False, the exit code returned by the parser, if available, will not be overridden by the CalcJob.exit_codes.STOPPED_BY_MONITOR exit code.

Note that if override_exit_code is set to False and a parser is defined in the inputs, the exit code returned by the parser will be set on the node. This means that the exit code STOPPED_BY_MONITOR that is set by default, will no longer be set and so it will not be obvious from the exit status on the node that it was stopped by a monitor.

Monitor execution order#

By default, the monitors are executed in alphabetical order based on their keys in the monitors input namespace. The order can be controlled using the priority key in the monitors input.

builder.monitors = {
    'monitor_one': Dict({'entry_point': 'entry_point_one', 'priority': 100})
    'monitor_one': Dict({'entry_point': 'entry_point_one'})
}

Higher priorities will be executed first. It is not necessary to define a priority for all monitors, in the absence of a priority, a priority of 0 is assumed. For monitors with identical priority, the order remains alphabetical based on their key in the monitors input namespace.

Monitor execution frequency#

By default, all monitors are executed during each scheduler update cycle. This interval is controlled by the minimum_scheduler_poll_interval property of the Computer, which can be retrieved and set through the get_minimum_job_poll_interval and set_minimum_job_poll_interval, respectively. The frequency of monitor execution can be reduced by setting a larger interval for the minimum_poll_interval key in the monitor input definition:

builder.monitors = {
    'monitor_one': Dict({'entry_point': 'entry_point_one', 'minimum_poll_interval': 600})
}

The engine will guarantee that the interval between calls of the monitor is at least the value specified by minimum_poll_interval. Due to a number of other intervals that are part of the CalcJob pipeline, it is possible however, that the effective interval between monitor calls will be larger than that.

Example monitor implementations#

Stopping based on content in output file#

To stop a calculation based on the content of a particular output file, first the file should be retrieved locally using the transport:

from aiida.engine import CalcJobMonitorResult
from aiida.orm import CalcJobNode
from aiida.transports import Transport

def monitor(node: CalcJobNode, transport: Transport) -> str | CalcJobMonitorResult | None:
    """Retrieve and inspect files in working directory of job to determine whether the job should be killed.

    :param node: The node representing the calculation job.
    :param transport: The transport that can be used to retrieve files from remote working directory.
    :returns: A string if the job should be killed, `None` otherwise.
    """
    with tempfile.NamedTemporaryFile('w+') as handle:
        transport.getfile('some-file.txt', handle.name)
        handle.seek(0)
        output = handle.read()

    if 'problem' in output:
        return 'The calculation has encountered a problem so were aborting.'

Then the file content can be analyzed to determine any particular problems. If a problem is detected and the calculation should be stopped, it suffices to return a string with a relevant message.

Performing a clean stop#

When a monitor instructs the engine to stop the calculation, the engine will signal the kill command to the scheduler. This will cause the calculation to be forcefully interrupted. In certain cases, this is undesirable as the calculation does not get the chance to shut down cleanly, and the output files cannot be reused for a restart.

Certain codes allow to shutdown cleanly prematurely through a sentinel file. A sentinel file is a file with a particular filename or format that the code will periodically look for in the working directory and when found will shutdown graciously. This functionality can be used by a monitor to shutdown a job cleanly:

from aiida.engine import CalcJobMonitorResult
from aiida.orm import CalcJobNode
from aiida.transports import Transport

def monitor(node: CalcJobNode, transport: Transport) -> str | CalcJobMonitorResult | None:
    """Retrieve and inspect files in working directory of job to determine whether the job should be killed.

    :param node: The node representing the calculation job.
    :param transport: The transport that can be used to retrieve files from remote working directory.
    :returns: A string if the job should be killed, `None` otherwise.
    """
    with tempfile.NamedTemporaryFile('w+') as handle:
        transport.getfile('some-file.txt', handle.name)
        handle.seek(0)
        output = handle.read()

    if 'problem' in output:
        with tempfile.NamedTemporaryFile('w+') as handle:
            handle.write('stop')
            transport.put(handle.name, 'EXIT')

        return CalcJobMonitorResult(
            action='disable-all',
            override_exit_code=False
        )

The sentinel file is written to the working directory using the Transport.put method. In this example, it writes the EXIT file in the remote working directory.

After the sentinel file is written, the job needs to be given the time to shut down graciously and so shouldn’t be interrupted. This is why the monitor returns a CalcJobMonitorResult instance, instead of just a simple string. This is because returning a string would have aborted the job immediately. The CalcJobMonitorResult instead allows the monitor to instruct the engine to disable all monitors (using action='disable-all'). In addition, we set override_exit_code=False such that the exit code of the parser is kept as if the code would have terminated nominally.

Execute command and retrieve file#

The monitor gets an instance of the Transport class, which besides implementing an interface to read and write files on the remote computer, it can also be used to execute commands. An example usage is where the code that is being run writes the output to a file that typically becomes very large. Retrieving this file in the monitor as done in the first example, can be very costly and should be avoided. Instead, one can run the tail bash command on the remote to just fetch a number of lines from the end of the file:

from aiida.engine import CalcJobMonitorResult
from aiida.orm import CalcJobNode
from aiida.transports import Transport

def monitor(node: CalcJobNode, transport: Transport) -> str | CalcJobMonitorResult | None:
    """Run ``tail`` on big output file and retrieve the partial output.

    :param node: The node representing the calculation job.
    :param transport: The transport that can be used to retrieve files from remote working directory.
    :returns: A string if the job should be killed, `None` otherwise.
    """
    filename_input = 'aiida.out'
    filename_output = 'tail.out'
    number_of_lines = 20

    command = f'tail -n {number_of_lines} {filename_input} > {filename_output}'
    exit_status, stdout, stderr = transport.exec_command_wait(command)

    if exit_status != 0:
        node.logger.warning(f'the command `{command}` executed by monitor failed: {stderr}')

    if 'problem' in stdout:
        return 'The calculation has encountered a problem so were aborting.'

Note that while the monitor is being executed, it is blocking the daemon worker from doing anything else. Good monitor design will therefore try to limit the execution time of the monitor. This includes avoiding retrieving large amounts of data from the computer as well as executing time-consuming commands.

Design discussion#

Advantages#

The main advantage of the proposed design is the simplicity in the user-interface as well as the implementation. By integrating it closely to the existing concept of the CalcJob, the feature feels like a natural part of AiiDA’s functionality of running external codes on various computing resources. Combined with the fact that all the setup that is required is registering a monitor through an entry point, the barrier for users to start using this new functionality is very low. There are investigations underway to see if it is technically possible to even define and attach monitors on the fly, for example in an interactive shell or a Jupyter notebook.

By choosing monitors to be implemented through a Python function and passing in the transport to interact with the remote computer, the user is restricted as little as possible. The user has full access to AiiDA’s Python API and the transport interface can be used to optimize the monitors implementation, minimizing the data transfer that is required.

Disadvantages#

The main disadvantage of the proposed implementation is the fact that the monitors are executed by the daemon workers. Since monitors are synchronous blocking operations, while being executed a daemon worker cannot perform any other operations. This means that if monitors perform time-consuming operations, such as transfering lots of data or executing long running commands, they will limit the throughput of the daemon. This design disadvantage is discussed in the light of an alternative design that was considered in the Alternatives section.

The second major disadvantage is that the design does not satisfy the last design goal of having monitors run independent of AiiDA’s activity status. Since the monitors are executed by the daemon workers, for calculation jobs to be actively monitored, the daemon has to be running. Solutions that could provide this functionality would essentially have to run the monitors as a separate background process on the remote computing resource where the calculation job is running.

With AiiDA’s premise of it not having to be installed on the remote resource, this implicates that the monitor implementation cannot use AiiDA’s API. Most likely it will have to use a bash script, as that is the fundamental assumption that AiiDA makes about the remote compute environment anyway. This would mean that an end-user, as well as the engine, will not be able to make use of AiiDA’s API and plugins to interact with the remote computer and scheduler and all of this knowledge would have to be reimplemented again, specifically for each monitor.

Since this would significantly complicate the implementation as well as the user-interface, and currently it is not clear whether this design goal addresses an actual use-case, it was decided to table this direction.

Alternatives#

One alternative design was proposed and implemented (see this repository for the implementation), which addresses the disadvantage of the design presented in this AEP of monitors being executed by daemon workers. The alternative design has monitors executed by a secondary CalcJob that is launched in parallel to the original CalcJob that is being monitored. The monitoring CalcJob executes a fixed Python script on the localhost (machine where AiiDA is running), which actually executes the monitors that were attached. In this way, the load of executing the monitors is born by a background process on the localhost and does not burden the daemon workers.

Although resolving the problem of increased daemon worker load by the execution of monitors, the design suffers from other problems. The load on daemon workers is actually still increased, since for each monitored job, a secondary job needs to be run. Although the overhead on the daemon workers may be minimal in computational cost, it still occupies slots and forces the daemon workers too cycle between multiple processes. In addition, the design results in a significant increase of nodes that have to be stored in the database since each monitored job requires the storage of a second monitoring CalcJobNode along with the associated input and outputs nodes, such as the remote_folder and retrieved nodes.

Moreover, the solution appears to be less user-friendly. It requires a dedicated Code to be configured which is always ran by the monitoring CalcJob. This code needs to be setup and if the implementation changes, this Code has to be recreated by the user. Additionally, since the monitoring job is an independent CalcJob but requires as an input the remote_folder of the monitored job, before it can be launched, one should wait for the monitored CalcJob to have been submitted and picked up by the daemon. This forces the user to always add the paradigm in scripts where first the original job is submitted and then a while-loop is started waiting for that job to have started, before submitting the monitoring job.

After careful evaluation of both designs, it was decided that the increased complexity in usability and implementation did not offset the advantage of a reduced load on daemon workers.

It should be noted that there already exists a plugin that provides functionality, similar to the monitoring feature described in this AEP. The aiida-cusp package provides AiiDA plugins that allow the use of the Custodian wrapper around the ab-initio simulation package VASP. If this approach is a better solution for users, they can already implement it with the current functionality of aiida-core and an AEP is not required.