Worker tasks as opposed to backend routines, are run by the Optimization Server. The execution of the task takes place in a worker microservice: each worker task is associated with a worker microservice, although a worker microservice can host several worker tasks.
Two types of workers are available:
Java worker tasks: those workers encapsulate an optimization engine that must be packaged in a Java library. For more details, refer to Section Implementing Java Worker Tasks.
Python worker tasks: those workers encapsulate an optimization engine that must be packaged in a Python library. For more details, refer to Section Implementing Python Worker Tasks.
The execution of a worker task happens as follows:
Data is prepared by the Data Service and is sent in a CSV format;
The CSV format is received inside the worker;
An adapter is used inside the worker to transform this data into a format that the processing engine can understand;
The task action performs its work (optimization, etc.);
An adapter is used to transform the results of the task action into the CSV format;
Data is sent back to the Data Service for synchronization.
As a consequence, the implementation of a worker task includes:
Coding the task actions in a class (or a set of classes) located in a Gradle project under the processing
folder.
The processing
folder is the place where you will put the code you write to implementing your processing logic. To implement your task, you will thus write one or several classes, and package them as a Gradle project under this folder to produce a library that will be used by the worker microservice responsible for executing the task.
The processing
folder initially contains several projects that can be used as a starting point for your development. They are:
engine
and checker
which are Java worker tasks. For more details, refer to Section Implementing Java Worker Tasks.
python-engine
which is a Python worker task. For more details, refer to Section Implementing Python Worker Tasks.
Test this implementation using JUnit tests and data that is compatible with the specific type of Worker. For instance, engine
contains a JUnit test class that illustrates how to run unit tests against your code, for example, with data snapshots can be used for Java worker tasks. For more details, refer to Sections Saving Data Snapshots, Implementing Java Worker Tasks and Implementing Python Worker Tasks.
Generating a worker microservice and add the task Optimization Server description in YAML format to this microservice. Alternately, add the YAML description of the task to an existing worker microservice.
DOC
includes a dedicated code generator for worker microservices.
Three predefined workers are generated:
engine-worker
and checker
for Java
python-engine
for Python
Each generated worker is a new service that will register itself with the Optimization Server master service. It can be started as a Java process as all the generated services or as a Docker image.
It is possible to generate additional workers. For more details, refer to Part Getting Started. For more details on how to author a worker microservice, refer to the Optimization Server documentation, Chapter Workers
.
Adding the adapter code in the worker code to process the input and output of the task actions as well as the call to the processing implementation.
Each worker task is described by an entry in the worker.yml
file, located in the resources of the associated worker microservice. The code generator for the engine
and checker
workers also generates this YAML section. For custom workers, however, you need to write the worker.yml
file yourself. For more details, refer to the Optimization Server documentation, Chapter Workers
.
Once these steps are completed, the execution of a worker can be triggered from a scripted task, as explained in Chapter Understanding the Execution Service.
Java worker tasks encapsulate java processing libraries.
Two such libraries and workers are predefined:
The 'engine worker' (in workers/engine-worker
) and its 'engine' library (in processing/engine
).
The 'model checker' worker (in workers/checker
) and its 'model checker' library (in processing/checker
).
It is possible to generate additional workers, as described in Part Getting Started.
By default, Java processing libraries rely on the Data Object Model to handle their input and output. For more details, refer to Chapter Understanding the Data Object Model (DOM).
![]() |
Each generated worker contains a <workerName>Task
class which is where the actual processing code is called in the worker context. The pattern is to call the processing library code with a collector as argument and retrieve the modified collector as a return value of the execution.
The extract below shows how this is happening:
public class EngineTask implements Task { ... private OptimizationEngine optimizationEngine; @Override public void execute(Parameter input, Parameter output, ExecutionContext executionContext) { // retrieve collector built from scenario data CapacityPlanning inputColl = extractInputCollector(input); // execute engine (from library code) and retrieve a modified collector as output CapacityPlanning outputColl = optimizationEngine.execute(inputColl, executionContext); // if this list is empty, the whole output collector will be saved in the scenario // If you add classes in this list, only those classes will be saved List<Class<? extends DbDomObject>> consideredClasses= new ArrayList(); // synchronize collector results with the original scenario data emitOutputCollector(outputColl, output, consideredClasses); }
Python worker tasks encapsulate Python processing libraries.
By default, DOC
generates:
workers/python-engine-worker
, a preconfigured Python worker project that encapsulates the python-engine
module.
processing/python-engine
, a Python module library with a run()
function that takes a path to a collector archive and uses the dbgene-data
library to read it. It also contains a pytest test that illustrates how to run unit tests against your code.
The dbgene-data
Python library, available in our Nexus PYPI repository, contains utilities for reading, writing and checking a collector archive. You can find help with:
help(dbgene.data.utils)
help(dbgene.data.DataFrameDict)
help(dbgene.data.DataChecker)
help(dbgene.data.constants)
In addition, the Optimization Server Python Bridge library helps you communicate with the worker-shell within your Python task. For more details, refer to the Optimization Server documentation, Chapter Workers
.
Python processing libraries rely on Dataframes data structure to handle their input and output.
![]() |
Each scenario table is represented as a dataframe. Each record is uniquely identified by a unique ID db_gene_internal_id
. Foreign key columns use the pointed record unique ID for reference.
In the example below, two records are presented which have a reference to a plant_id
. java db_gene_internal_id,plant_id,name,id,duration_in_hours 01ead727-c2be-1b7c-8cd8-90a2d2c6143c,01ead727-c2cf-1b7c-8cd8-90a2d2c6143c,Prepare,PRE,4 01ead727-c2bf-1b7c-8cd8-90a2d2c6143c,01ead727-c2d0-1b7c-8cd8-90a2d2c6143c,Unscrew,UNS,2
The handling of those Dataframes can be performed using DOC
and Optimization Server libraries.
A processing library code typically relies on the following structure:
# input data is received in the form of an archive_path def run(archive_path: str) -> DataFrameDict: # data is extracted as a dictionary of Dataframes input_data_frame_dict: DataFrame = DataFrameDict() input_data_frame_dict.load_collector_archive(CsvCollectorArchive(archive_path)) # run the engine engine_result_dict= run_engine(input_data_frame_dict) # finalize data output preparation data_frame_dict= prepare_output(engine_result_dict) # optional data output validation validate_output(data_frame_dict) return data_frame_dict # this function relies on DataChecker validation rules to ensure # the output dictionary is consistent def validate_output( data_frame_dict : DataFrameDict ) : print("Checking output...") checker = DataChecker() checker.internal_id(SCHEDULE) checker.uniqueness(SCHEDULE, INTERNAL_ID_FIELD) errors = checker.check_everything(data_frame_dict) if len(errors) > 0: print("There is(are) validation error(s):") for error in errors: print(" - " + error.get_message()) sys.exit(1) return None
Concerning the generated microservice, each Python worker contains a run-python-engine.py
file which is where the actual processing code is called in the worker context. The pattern is to call the processing library code with a dataframe dictionary as argument and retrieve the modified dataframe dictionary as a return value of the execution.
The extract below shows how this is happening:
# Init the worker bridge (ExecutionContext) execution_context = ExecutionContext() execution_context.start() print("Reading input...") input_collector_path = execution_context.get_input_path("inputCollector") print("Processing...") # If you need to emit metrics, you have to pass the execution_context to your engine. data_frame_dict = engine.run(input_collector_path) print("Writing output...") output_path = execution_context.get_output_path("outputCollector") output_archive = CsvCollectorArchive(output_path, "w") data_frame_dict.store_collector_archive(output_archive) # Stop the ExecutionContext properly execution_context.stop()
DOC
allows using IntelliJ to debug Python workers. For more details, refer to Section Using an Integrated Development Environment.