Skip to content

Workflow and task concepts

Tasks are the basic unit of the system, the automation framework record the state of each task for each CMS run. A workflow is simply a collection of tasks related to each other.

Tasks are clearly defined as by a single instance of a TaskHandler class and are executed by the automation if they are activated by adding them to a specific campaign with ecalrunctrl.py --db 'dbname' rtype-update --campaign 'mycamp' --add 'task1'. A task can be viewed as an atomic action performed on a set of data (e.g. run the event reconstruction, upload to conddb, ...). A task at its core is essentially the execution of a single program (cmsRun, python myscript.py, etc). The TaskHandler wraps the execution of the single program, taking care of notifying the automation framework of every status change of the task (new, processing, done, failed, ...).

The definition of workflow is more loose. In general a workflow is a collection of tasks related to each other by a chain of dependencies.

Translation between the automation concepts and their implementationt through git, Jenkins and the Influxdb measurements is summarized in the table below:

Automation concept Git Jenkins Influxdb
Task a single python script within a workflow directory. stage(s), multiple stages in case submit/resubmit/check are ran asyncronously for each run the status of the task is recorded in the "run" measurement
Workflow a single Jenkinsfile within a workflow directory. item workflows are not recorded in the db as single entities

Job concept

Each task can run a single job (e.g. plotting script) or multiple jobs in parallel (e.g. running the reconstruction/ntuplizer). Jobs that requires the output of the first one to be fed to the second are generally split over two different task, within the same workflow.

Warning

Nothing in the system prevets (yet) the implementation of a taks that executes two (or more) jobs in series. All it takes is adding additional statuses (e.g. "done-job1", "done-job2") for the workflow besides the standard one ("new", "processing", "done", "failed", "merged"). This is strongly discouraged since it would require adding ad-hoc status changes and monitoring for each workflow, while the structure using tasks in series within a workflow allow one to concatenate tasks in a flexible way using only the standard statuses.

Jobs are created by the TaskHandler using the JobCtrl class:

jctrl = JobCtrl(workflow=self.task,
                campaign=self.campaign,
                tags={'run_number': run, 'fill': run_dict['fill']},
                dbname=self.opts.dbname) # (1)
if not jctrl.taskExist():
    jctrl.createTask(jids=[0], fields=[{'group' : ','.join([r['run_number'] for r in group[:-1]])}]) # (2)
  1. Create the JobCtrl class setting tags, campaign and workflow tp selected the wanted jobs. Note that campaign is accessed from self.campign since multiple campaigns can be specified (to handle multiple re-processing campaigns, for prompt processing usually there is a single campaign, prompt). The base class __call__ method takes care of switching the value of self.campaign before calling the method specified in the command line option (submit, resubmit, check, etc). The method is called multiple times, each time for a different campaign.

  2. If the task does not exist create it adding extra information (in this case the group field contains a comma separated list of runs merged into a single task)

The snippet above is extract from the HTCHandler class. The first line can be used also in scripts and interactive python sessions to connect to the influxdb and view/update job status information.

Warning

Updating the status of a job manually is clearly a destructive action that should only take place by maintainers in case of a major problem with the automation infrastructure that prevented the system to handle/recover the job by itself.

When running a single job within the TaskHandler the user should notify the system that the processing has started:

try: 
    jctrl.running(jid=0)
    self.rctrl.updateStatus(run=run['run_number'], status={self.task : 'processing'})

Note that the status of the task for a given run and the status of the job are updated separately. It is convenient to enclose the actual job execution in a try-except block (see below).

When multiple runs are processed together the convention is to set the status to "processing" for one of the runs (usually the most recent) and set the status of the task to "merged" for all other runs in the group:

for r in group[:-1]:
    self.rctrl.updateStatus(run=r['run_number'], status={self.task : 'merged'})

Upon job completion the TaskHandler should again notify the system about the outcome of the job:

# mark as completed
    jctrl.done(jid=0, fields={'output' : f'{eosdir}/pi0_fitMassInfo_{run["fill"]}.txt', 'plots' : plotsurl})
except Exception as e:
    jctrl.failed(jid=0)
    self.log.error(f'Failed producing the monitoring plots for fill {run["fill"]}: {e}')

The snippet above set the status of the job to either "done" or "failed". The status of the task for the runs being processed will be set by a call to the check function of the TaskHandler.