Warning
NOMAD Actions are a preview feature available starting from NOMAD version nomad-lab>=1.3.18.dev89
.
Please note that this is an early release: some functionality may change, and you might encounter bugs.
We’d love your feedback to help us improve NOMAD Actions, please don’t hesitate to reach out and share your thoughts.
How to define actions¶
Actions allow to define executable workflows in NOMAD. They are an alternative to normalizers and can be configured to use specialized workers instead of the NOMAD internal worker. It allows for better resource allocation like GPUs for specific actions.
Important
Defining actions is more complicated when compared to defining normalizers in the schema packages and parser packages. If the processing workflow involves trivial data manipulation (for example, populating a quantity based on another quantity, reading and importing data from a raw file, etc.), consider using normalizers and parsers. On other hand, if your workflow requires robust interaction with an external API, longer processing time (for example, days), or special resource allocation, use actions.
This documentation shows you how to write a plugin entry point for an action. You should read the introduction to plugins to have a basic understanding of how plugins and plugin entry points work in the NOMAD ecosystem.
Getting started¶
You can use our template repository to create an initial structure for a plugin containing an action. The relevant part of the repository layout will look something like this:
nomad-example
├── src
│ ├── nomad_example
│ │ ├── __init__.py
│ │ ├── actions
│ │ │ ├── __init__.py
│ │ │ ├── myaction
│ │ │ │ ├── __init__.py
│ │ │ │ ├── activities.py
│ │ │ │ ├── workflows.py
│ │ │ │ ├── models.py
├── LICENSE.txt
├── README.md
└── pyproject.toml
The boilerplate code makes it easier to define actions. Start with replacing
the example code in activities.py
with your code and adjusting the input data
model in the models.py
with the required fields for your activity.
See the documentation on plugin development guidelines for more details on the best development practices for plugins, including linting, testing and documenting.
By default, we provide CPU
and GPU
task queues. While defining an action,
it is assigned to one of these queues. If your action depends on packages that
are not required for the NOMAD installation, you can add them as
optional dependencies in the pyproject.toml
for the given task queue:
[project]
name = "nomad-example"
...
[optional-dependencies]
gpu-action = ["torch"]
cpu-action = ["aiohttp"]
Action entry point¶
The entry point defines basic information about your action and is used to
automatically load it into a NOMAD distribution. It is an instance of a
ActionEntryPoint
or its subclass and it contains a load
method which
returns a Action
instance.
The Action
instance can be used to add workflows and activities, along
with the task queue where they will be registered. You will learn more about
Action
class in the next section. The entry point should be defined
in */actions/__init__.py
like this:
from nomad.actions import TaskQueue
from pydantic import Field
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from nomad.config.models.plugins import ActionEntryPoint
class MyActionEntryPoint(ActionEntryPoint):
task_queue: str = Field(
default=TaskQueue.CPU, description='Determines the task queue for this action'
)
def load(self):
from nomad.actions import Action
from nomad_example.actions.myaction.activities import get_request
from nomad_example.actions.myaction.workflows import ExampleWorkflow
return Action(
task_queue=self.task_queue,
workflow=ExampleWorkflow,
activities=[get_request],
)
my_action = MyActionEntryPoint(
name='MyAction',
description='My custom action.',
)
Here you can see that a new subclass of MyActionEntryPoint
was defined. In
this new class you can override the load
method to determine how the
Action
class is loaded, but you can also extend the
ActionEntryPoint
model to add new configurable parameters for this schema
package as explained here.
We also instantiate an object my_action
from the new subclass. This is the
final entry point instance in which you specify the default parameterization
and other details about the action. In the reference you can see all of the
available
configuration options for a ActionEntryPoint
.
The entry point instance should then be added to the [project.entry-points.
'nomad.plugin']
table in pyproject.toml
in order for it to be automatically
detected:
Action
class¶
The load
-method of an action entry point returns an instance of a
nomad.actions.Action
class which describes the action through
a collection of activities and workflows that connect them. It also specifies the task queue for which the workflows and activities are registered. Once the
workflows are made available through the Action
, they can be triggered
using the start_action
funtion. This adds a workflow run instance to the specified task queue. You can learn more about it in the next section.
We use Temporal's workflow-activity abstraction here. Activities are the atomic unit of execution. They should ideally be defined as idempotent, allowing Temporal to retry automatically based on a policy until the activity is successfully completed. For example, an idempotent activity that gets data from an external resource via API can keep retrying until a status code 200 (successful response) is achieved. Once the activities are defined, a workflow arranges them in a sequence and defines flow of data from one activity to another.
You can add these definitions in */myaction/activities.py
and
*/myaction/workflows.py
. Temporal requires the input and output of the
activities and workflows to be serializable. We recommend defining Pydantic
models for them in */myaction/models.py
. These files could look like this:
nomad_example/actions/myaction/models.py
from pydantic import BaseModel, Field
class BaseWorkflowInput(BaseModel):
"""Base input model for workflows"""
upload_id: str = Field(
...,
description='Unique identifier for the upload associated with the workflow.',
)
user_id: str = Field(
..., description='Unique identifier for the user who initiated the workflow.'
)
class ExampleWorkflowInput(BaseWorkflowInput):
"""Input model for the workflow"""
cid: int = Field(
..., description='PubChem compound identifier for a chemical compound.'
)
class GetRequestInput(BaseModel):
"""Input model for the activity"""
url: str = Field(..., description='URL for get request.')
timeout: int = Field(..., description='Timeout for the request.')
Here we extend BaseWorkflowInput
for defining the input model
of the workflow and simply extend BaseModel
class from
Pydantic to define the input model of the activity.
Important
We provide a Pydantic base model BaseWorkflowInput
in the plugin template code and recommend to inherit it for defining the workflow's input
model. It provides fields like user_id
and upload_id
which are required
to execute a workflow in NOMAD. These fields are required to enable
database interaction via actions.
nomad_example/actions/myaction/activities.py
from temporalio import activity
from nomad_example.actions.myaction.models import GetRequestInput
@activity.defn
async def get_request(data: GetRequestInput):
"""
Perform a GET request to the specified URL with the provided timeout.
"""
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(
data.url,
timeout=data.timeout,
) as response:
response.raise_for_status()
return await response.json()
Here we define an activity by using the Temporal decorator activity.defn
on
the get_request
function. The activity interacts with an external API
asynchronously. Non-blocking activities allow Temporal to efficiently manage
the task queues, handling multiple workflows at once. We use GetRequestInput
model to define the argument of this activity.
nomad_example/actions/myaction/workflows.py
from datetime import timedelta
from temporalio import workflow
from temporalio.common import RetryPolicy
with workflow.unsafe.imports_passed_through():
from nomad_example.actions.myaction.activities import get_request
from nomad_example.actions.myaction.models import (
ExampleWorkflowInput,
GetRequestInput,
)
@workflow.defn
class ExampleWorkflow:
@workflow.run
async def run(self, data: ExampleWorkflowInput) -> dict:
retry_policy = RetryPolicy(
maximum_attempts=3,
)
get_request_input = GetRequestInput(
url='https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/'
f'cid/{data.cid}/property/Title,SMILES/JSON',
timeout=10,
)
result = await workflow.execute_activity(
get_request,
get_request_input,
start_to_close_timeout=timedelta(seconds=60),
retry_policy=retry_policy,
)
return result
Here we make a workflow definition by creating as a Python class and using
the Temporal decorator workflow.defn
.
The name of workflow is automatically taken from the unique ID of the
action entry point, which is nothing but the package path to its definition.
For our example, it will be nomad_example.actions.myaction:my_action
.
We define the workflow function in the run
method of the workflow
definition class and use the Temporal decorator workflow.run
. It describes the
sequence of activities and the flow of data from one to another. Using appropriate data models, we pass the data from the workflow input to the activity inputs.
Each activity is executed by workflow.execute_activity
function which
also specifies the activity's retry policy and timeouts.
Retry Policy
tells Temporal how to retry an activity that failed in the current execution.
Attributes like initial_interval
, backoff_coefficient
, and
maximum_interval
control the interval between retries. The attribute
maximum_attempts
specifies the maximum retries that can be made in case of
failures.
Activity timeouts can detect failures, simply because the activity exceeds the
maximum expected execution time. Temporal provides multiple
timeouts.
The attribute start_to_close_timeout
specifies the timeout for an activity
execution, i.e., the time spent after a worker starts executing an activity
till it is finished. For most cases, setting this alone is enough and
recommended. Make sure that the timeout is longer than the maximum possible
time for the activity execution to complete. For example, while setting one for
an activity that makes an API call, determine the median call time and add some
buffer to it.
Important
The default retry policy has unlimited maximum_attempts
. We strongly
recommend to always set a custom retry policy with finite maximum_attempts
to avoid forever running workflows.
In addition, always set appropriate timeouts for activities to prevent stuck executions.
Integrating action with schemas¶
After actions are defined, it is possible to intergrate their workflows with schemas and run them from NOMAD entries.
Every action has a unique name based on the package path of its entry point.
We can run a workflow using start_action
function, which takes the action name and an instance of its input model:
from nomad.actions.utils import start_action
from nomad_example.actions.myaction.models import ExampleWorkflowInput
workflow_id = start_action(
action_name='nomad_example.actions.myaction:my_action',
data=ExampleWorkflowInput(
user_id='NOMAD User ID',
upload_id='NOMAD Upload ID',
cid=962, # CID for Water
),
)
start_action
returns a string containing a unique workflow ID assigned to the
triggered workflow run. This can be used to get the current status of the
workflow using get_action_status
function which takes the workflow ID as an
input and returns a temporalio.client.WorkflowExecutionStatus
object:
from nomad.actions.utils import get_action_status
workflow_status = get_action_status(workflow_id)
print(workflow_status.name) # example output: RUNNING
You can add these functionalities in the normalize
of an
ELN schema and trigger actions from the ELN
entries. A schema that uses ELN quantities to trigger actions can look like this:
from nomad.actions.utils import get_action_status, start_action
from nomad.datamodel.data import EntryData
from nomad.datamodel.metainfo.annotations import ELNAnnotation, ELNComponentEnum
from nomad.datamodel.metainfo.basesections.v1 import PureSubstanceSection
from nomad.metainfo import Quantity, SchemaPackage, SubSection
from nomad_example.actions.myaction.models import ExampleWorkflowInput
m_package = SchemaPackage()
class ExampleWorkflow(EntryData):
"""A section to run an example workflow using a PubChem CID."""
cid = Quantity(
type=int,
description='PubChem CID of the compound.',
a_eln=ELNAnnotation(component=ELNComponentEnum.NumberEditQuantity),
)
workflow_id = Quantity(
type=str,
description='Unique ID of the workflow.',
)
workflow_status = Quantity(
type=str,
description='Status of the workflow based on the available workflow ID.',
)
pubchem_result = SubSection(
section_def=PureSubstanceSection,
description='Data populated based on PubChem API call for given CID.',
)
trigger_run_action = Quantity(
type=bool,
description='Starts an asynchronous run of the example action.',
a_eln=ELNAnnotation(
component=ELNComponentEnum.ActionEditQuantity,
label='Run Example Action',
),
)
trigger_get_action_status = Quantity(
type=bool,
description='Fetches the status for the available workflow ID.',
a_eln=ELNAnnotation(
component=ELNComponentEnum.ActionEditQuantity,
label='Get Action Status',
),
)
def run_action(self, archive, logger=None):
"""Run the action with the provided archive."""
try:
if not self.cid:
logger.warn(
'No CID provided for the workflow. Cannot run the workflow.'
)
return
self.pubchem_result = None
self.workflow_status = None
self.workflow_id = None
action_name = 'nomad_example.actions.myaction:my_action'
input_data = ExampleWorkflowInput(
user_id=archive.metadata.authors[0].user_id,
upload_id=archive.metadata.upload_id,
cid=self.cid,
)
self.workflow_id = start_action(action_name=action_name, data=input_data)
self.trigger_get_action_status = True
except Exception as e:
logger.error(f'Error running workflow: {e}')
def normalize(self, archive, logger=None):
super().normalize(archive, logger)
if self.trigger_run_action:
if self.workflow_status == 'RUNNING':
logger.warn('Workflow is already running. Cannot start a new one.')
else:
self.run_action(archive, logger)
self.trigger_run_action = False
if self.trigger_get_action_status:
if self.workflow_id:
try:
status = get_action_status(self.workflow_id)
self.workflow_status = status.name
except Exception as e:
logger.error(f'Error getting workflow status: {e}. ')
self.trigger_get_action_status = False
m_package.__init_metainfo__()
Here we define an EntryData
section with ELN quantities like cid
, which
takes a integer input, and trigger_run_action
, which is an actionable
button. When the trigger_run_action
button is clicked, the start_action
function is triggered from inside the run_action
method.
workflow_id
quantity is populated as a result, which is used in the next step
to get the status of the workflow.
When trigger_get_action_status
is clicked, the status for the available workflow_id
is requested and is saved as a string in workflow_status
quantity. This status can be mainly RUNNING
, COMPLETED
, FAILED
, CANCELLED
or TERMINATED
. Everytime a workflow run is triggered, the status for it is also requested.
It is also possible to re-trigger the workflow run if the status is not
RUNNING
. Of course, the new workflow run will now have a different workflow
ID.
After we run the workflow, we can also write back the results into the entry using the utilities described in the next section.
Utilities for actions¶
Interaction with your Oasis's database from Actions provides a powerful way of
manipulating it. For example, once you run an action, you might want to save
its output in an existing NOMAD entry, or even create new ones. We provide a
curated set of utils in nomad.actions.utils
module to perform these tasks.
Important
Since interacting with database directly (bypassing the API endpoint)
through Actions is highly risky, we strongly recommend to only do this
through the functions defined under nomad.actions.utils
module. If you have to perform a task that is not covered in the utils,
please use the available API endpoints and interact with the database via
the network.
Note
This part of the documentation is under development.
Adding to your oasis¶
Make sure your oasis repo is up to date with the template by following the
update guide. This ensures
that the necessary containers for temporal
is setup correctly.
In addition to configuring the temporal service, you’ll also need to build new Docker images for both the gpu and cpu workers. The relevant extras for these workflows can be set in the pyproject.toml of the distro:
[project]
name = "nomad-distro-template"
...
[optional-dependencies]
plugins = ["nomad-example"]
gpu-action = ["nomad-example[gpu-action]"]
cpu-action = ["nomad-example[cpu-action]"]
To implement the necessary changes, including image build steps and updates to docker-compose, the Dockerfile, and GitHub Actions, you can refer to this pull request as a guide.