Submitting an unofficial workflow
This section shows the usage of the DatomaWorkflow module, exemplifying how to submit unofficial (ephemeral) workflows to Datoma’s infrastructure.
First of all, you should have prepared a workflow file with YAML extension (e.g. my_workflow.yml) and a layout file with JSON extension (e.g. my_layout.json).
my_workflow.yml: This file contains the workflow definition. It defines:
The different jobs that will be executed.
If a job input depends on the output of another job (
depends_on). You can filter the files that are passed to the dependant job usingregex.Which inputs are considered global (
input_mapping).Which parameters are considered global (
paremeter_mapping).
workflow:
- id: my_first_step
name: "First step title"
tool: timsconvert
task: timsconvert
input_mapping:
samples: my_global_input
- id: my_second_step
name: "Second step title"
depends_on:
- my_first_step
tool: rmsistd
task: rmsi
parameter_mapping:
"preprocessing:peakbinning:enable": my_global_param
input_mapping:
input:
origin: my_first_step
regex:
- .imzML$
- .ibd$
- id: my_third_step
name: "Third step title"
tool: xcms
task: xcmspeakpicking
my_layout.json: This file contains the layout of the workflow. It defines:
The global inputs that will be used by the workflow (
globalInputs).The global parameters that will be used by the workflow (
globalParameters).The parameters that will be overriden for a specific job (
parameterOverrides).
{
"globalInputs": [
{
"key": "my_global_input",
"label": "rMSI input files",
"model": [],
"useROI": true
}
],
"globalParameters": [
{
"label": "Column A",
"parameters": [
{
"key": "my_global_param",
"label": "Global param",
"type": "InputSwitch",
"model": false
}
]
}
],
"parameterOverrides": [
{
"stepKey": "my_second_step",
"parameterKey": "preprocessing:peakbinning:tolerance",
"parameterModel": 500
},
{
"stepKey": "my_second_step",
"parameterKey": "preprocessing:peakbinning:tolerance_in_ppm",
"parameterModel": true
},
{
"stepKey": "my_second_step",
"parameterKey": "isPeaklist",
"parameterModel": true
}
]
}
In this example, we submit an unofficial workflow that executes TIMSCONVERT, RMSI and XCMSPEAKPICKING:
We first create the
DatomaWorkflowobject, specifying the paths to the necessary files mentioned above (path_yamlandpath_json).We set the global input. In this example, it is only used as input for the first job. You can modify the
YAMLfile to use it for additional jobs.Then, we set the input for the third job, which is executed independently (unlike
my_second_step, which depends from another job’s output).We modify the global parameter’s value, which will change the value for all parameters depending on it.
Next, we set the parameters of the job that we want to modify from the standard
my_third_stepmodel (which is the key identifying ourxcmspeakpickingtask).After that, we submit the workflow to Datoma’s infrastructure and download the output files.
Finally, we print the running time of the workflow.
# Make the necessary imports
from datoma import DatomaWorkflow
# Create an ephemeral DatomaWorkflow object
dw = DatomaWorkflow(path_yaml="path/to/yaml/file.yml", path_json="path/to/json/file.json")
# Set the global input
dw.set_global_input({"my_global_input": ["/path/to/folder/tims.d/"]}, True)
# modify the global parameter's value
dw.set_global_params({"my_global_param": True})
# Create a dictionary with the input files for the third step
input_dict = {"samples": ["path/to/file.mzML"]}
# Specify the step to which you want to set the input
dw.set_input('my_third_step', input_dict)
# Create a dictionary with parameters to modify from default values
params_dict = {'do_group_features': False,
'prefilter_n': 4}
# Set parameters of the job you want to modify
dw.set_params('my_third_step', params_dict)
# Submit the workflow to Datoma's infrastructure, you can name it if you want
dw.submit(name = "custom_workflow_execution")
# Check the status of the workflow, when it finishes, the output files will be downloaded
await dw.download(output_path="path/to/output/folder")
# Check the status of the workflow, when it finishes, the output files will be listed
print(await dw.list_outputs(regex=".*\.pkmat"))
# Printing the running time of the workflow (in seconds)
running_time = dw.finished_at - dw.running_at
print(running_time)
To see a more complex usage of DatomaWorkflow, refer to Leveraging Datoma’s potential.