Submitting an unofficial workflow

This section shows the usage of the DatomaWorkflow module, exemplifying how to submit unofficial (ephemeral) workflows to Datoma’s infrastructure.

First of all, you should have prepared a workflow file with YAML extension (e.g. my_workflow.yml) and a layout file with JSON extension (e.g. my_layout.json).

my_workflow.yml: This file contains the workflow definition. It defines:

  • The different jobs that will be executed.

  • If a job input depends on the output of another job (depends_on). You can filter the files that are passed to the dependant job using regex.

  • Which inputs are considered global (input_mapping).

  • Which parameters are considered global (paremeter_mapping).

workflow:
  - id: my_first_step
    name: "First step title" 
    tool: timsconvert
    task: timsconvert
    input_mapping:
      samples: my_global_input
  - id: my_second_step
    name: "Second step title"
    depends_on:
      - my_first_step
    tool: rmsistd
    task: rmsi
    parameter_mapping:
       "preprocessing:peakbinning:enable": my_global_param
    input_mapping:
      input:
        origin: my_first_step
        regex:
          - .imzML$
          - .ibd$
  - id: my_third_step
    name: "Third step title"
    tool: xcms
    task: xcmspeakpicking

my_layout.json: This file contains the layout of the workflow. It defines:

  • The global inputs that will be used by the workflow (globalInputs).

  • The global parameters that will be used by the workflow (globalParameters).

  • The parameters that will be overriden for a specific job (parameterOverrides).

{
  "globalInputs": [
    {
      "key": "my_global_input",
      "label": "rMSI input files",
      "model": [],
      "useROI": true
    }
  ],
  "globalParameters": [
    {
      "label": "Column A",
      "parameters": [
        {
          "key": "my_global_param",
          "label": "Global param",
          "type": "InputSwitch",
          "model": false
        }
      ]
    }
  ],
  "parameterOverrides": [
    {
      "stepKey": "my_second_step",
      "parameterKey": "preprocessing:peakbinning:tolerance",
      "parameterModel": 500
    },
    {
      "stepKey": "my_second_step",
      "parameterKey": "preprocessing:peakbinning:tolerance_in_ppm",
      "parameterModel": true
    },
    {
      "stepKey": "my_second_step",
      "parameterKey": "isPeaklist",
      "parameterModel": true
    }
  ]
}

In this example, we submit an unofficial workflow that executes TIMSCONVERT, RMSI and XCMSPEAKPICKING:

  • We first create the DatomaWorkflow object, specifying the paths to the necessary files mentioned above (path_yaml and path_json).

  • We set the global input. In this example, it is only used as input for the first job. You can modify the YAML file to use it for additional jobs.

  • Then, we set the input for the third job, which is executed independently (unlike my_second_step, which depends from another job’s output).

  • We modify the global parameter’s value, which will change the value for all parameters depending on it.

  • Next, we set the parameters of the job that we want to modify from the standard my_third_step model (which is the key identifying our xcmspeakpicking task).

  • After that, we submit the workflow to Datoma’s infrastructure and download the output files.

  • Finally, we print the running time of the workflow.

# Make the necessary imports
from datoma import DatomaWorkflow

# Create an ephemeral DatomaWorkflow object
dw = DatomaWorkflow(path_yaml="path/to/yaml/file.yml", path_json="path/to/json/file.json")

# Set the global input
dw.set_global_input({"my_global_input": ["/path/to/folder/tims.d/"]}, True)

# modify the global parameter's value
dw.set_global_params({"my_global_param": True})

# Create a dictionary with the input files for the third step
input_dict = {"samples": ["path/to/file.mzML"]}

# Specify the step to which you want to set the input
dw.set_input('my_third_step', input_dict)

# Create a dictionary with parameters to modify from default values
params_dict = {'do_group_features': False, 
               'prefilter_n': 4}

# Set parameters of the job you want to modify
dw.set_params('my_third_step', params_dict)

# Submit the workflow to Datoma's infrastructure, you can name it if you want
dw.submit(name = "custom_workflow_execution")

# Check the status of the workflow, when it finishes, the output files will be downloaded
await dw.download(output_path="path/to/output/folder")

# Check the status of the workflow, when it finishes, the output files will be listed
print(await dw.list_outputs(regex=".*\.pkmat"))

# Printing the running time of the workflow (in seconds)
running_time = dw.finished_at - dw.running_at
print(running_time)

To see a more complex usage of DatomaWorkflow, refer to Leveraging Datoma’s potential.