Chapter 21. Implementing Worker Tasks

Worker tasks as opposed to backend routines, are run by the Optimization Server. The execution of the task takes place in a worker microservice: each worker task is associated with a worker microservice, although a worker microservice can host several worker tasks.

Two types of workers are available:

  • Java worker tasks: those workers encapsulate an optimization engine that must be packaged in a Java library. For more details, refer to Section Implementing Java Worker Tasks.

  • Python worker tasks: those workers encapsulate an optimization engine that must be packaged in a Python library. For more details, refer to Section Implementing Python Worker Tasks.

The execution of a worker task happens as follows:

  1. Data is prepared by the Data Service and is sent in a CSV format;

  2. The CSV format is received inside the worker;

  3. An adapter is used inside the worker to transform this data into a format that the processing engine can understand;

  4. The task action performs its work (optimization, etc.);

  5. An adapter is used to transform the results of the task action into the CSV format;

  6. Data is sent back to the Data Service for synchronization.

As a consequence, the implementation of a worker task includes:

  1. Coding the task actions in a class (or a set of classes) located in a Gradle project under the processing folder.

    The processing folder is the place where you will put the code you write to implementing your processing logic. To implement your task, you will thus write one or several classes, and package them as a Gradle project under this folder to produce a library that will be used by the worker microservice responsible for executing the task.

    The processing folder initially contains several projects that can be used as a starting point for your development. They are:

    Test this implementation using JUnit tests and data that is compatible with the specific type of Worker. For instance, engine contains a JUnit test class that illustrates how to run unit tests against your code, for example, with data snapshots can be used for Java worker tasks. For more details, refer to Sections Saving Data Snapshots, Implementing Java Worker Tasks and Implementing Python Worker Tasks.

  2. Generating a worker microservice and add the task Optimization Server description in YAML format to this microservice. Alternately, add the YAML description of the task to an existing worker microservice.

    DOC includes a dedicated code generator for worker microservices.

    Three predefined workers are generated:

    • engine-worker and checker for Java

    • python-engine for Python

    Each generated worker is a new service that will register itself with the Optimization Server master service. It can be started as a Java process as all the generated services or as a Docker image.

    It is possible to generate additional workers. For more details, refer to Part Getting Started. For more details on how to author a worker microservice, refer to the Optimization Server documentation, Chapter Workers.

  3. Adding the adapter code in the worker code to process the input and output of the task actions as well as the call to the processing implementation.

    Each worker task is described by an entry in the worker.yml file, located in the resources of the associated worker microservice. The code generator for the engine and checker workers also generates this YAML section. For custom workers, however, you need to write the worker.yml file yourself. For more details, refer to the Optimization Server documentation, Chapter Workers.

Once these steps are completed, the execution of a worker can be triggered from a scripted task, as explained in Chapter Understanding the Execution Service.

1. Implementing Java Worker Tasks

Java worker tasks encapsulate java processing libraries.

Two such libraries and workers are predefined:

  • The 'engine worker' (in workers/engine-worker) and its 'engine' library (in processing/engine).

  • The 'model checker' worker (in workers/checker) and its 'model checker' library (in processing/checker).

It is possible to generate additional workers, as described in Part Getting Started.

By default, Java processing libraries rely on the Data Object Model to handle their input and output. For more details, refer to Chapter Understanding the Data Object Model (DOM).

Figure 21.1. Understanding Java Workers
Understanding Java Workers

Each generated worker contains a <workerName>Task class which is where the actual processing code is called in the worker context. The pattern is to call the processing library code with a collector as argument and retrieve the modified collector as a return value of the execution.

The extract below shows how this is happening:

public class EngineTask implements Task {
  ...
  private OptimizationEngine optimizationEngine;

  @Override
  public void execute(Parameter input, Parameter output, ExecutionContext executionContext) {
    // retrieve collector built from scenario data
    CapacityPlanning inputColl = extractInputCollector(input);

    // execute engine (from library code) and retrieve a modified collector as output
    CapacityPlanning outputColl = optimizationEngine.execute(inputColl, executionContext);

    // if this list is empty, the whole output collector will be saved in the scenario
    // If you add classes in this list, only those classes will be saved
    List<Class<? extends DbDomObject>> consideredClasses= new ArrayList();

    // synchronize collector results with the original scenario data
    emitOutputCollector(outputColl, output, consideredClasses);
  }

2. Implementing Python Worker Tasks

Python worker tasks encapsulate Python processing libraries.

By default, DOC generates:

  • workers/python-engine-worker, a preconfigured Python worker project that encapsulates the python-engine module.

  • processing/python-engine, a Python module library with a run() function that takes a path to a collector archive and uses the dbgene-data library to read it. It also contains a pytest test that illustrates how to run unit tests against your code.

    The dbgene-data Python library, available in our Nexus PYPI repository, contains utilities for reading, writing and checking a collector archive. You can find help with:

    • help(dbgene.data.utils)

    • help(dbgene.data.DataFrameDict)

    • help(dbgene.data.DataChecker)

    • help(dbgene.data.constants)

    In addition, the Optimization Server Python Bridge library helps you communicate with the worker-shell within your Python task. For more details, refer to the Optimization Server documentation, Chapter Workers.

Python processing libraries rely on Dataframes data structure to handle their input and output.

Figure 21.2. Understanding Python Workers
Understanding Python Workers

Each scenario table is represented as a dataframe. Each record is uniquely identified by a unique ID db_gene_internal_id. Foreign key columns use the pointed record unique ID for reference.

In the example below, two records are presented which have a reference to a plant_id. java db_gene_internal_id,plant_id,name,id,duration_in_hours 01ead727-c2be-1b7c-8cd8-90a2d2c6143c,01ead727-c2cf-1b7c-8cd8-90a2d2c6143c,Prepare,PRE,4 01ead727-c2bf-1b7c-8cd8-90a2d2c6143c,01ead727-c2d0-1b7c-8cd8-90a2d2c6143c,Unscrew,UNS,2

The handling of those Dataframes can be performed using DOC and Optimization Server libraries.

A processing library code typically relies on the following structure:

# input data is received in the form of an archive_path 
def run(archive_path: str) -> DataFrameDict:
    # data is extracted as a dictionary of Dataframes
    input_data_frame_dict: DataFrame = DataFrameDict()
    input_data_frame_dict.load_collector_archive(CsvCollectorArchive(archive_path))

    # run the engine
    engine_result_dict= run_engine(input_data_frame_dict)

    # finalize data output preparation
    data_frame_dict= prepare_output(engine_result_dict)

    # optional data output validation
    validate_output(data_frame_dict)

    return data_frame_dict

# this function relies on DataChecker validation rules to ensure
# the output dictionary is consistent
def validate_output( data_frame_dict : DataFrameDict ) :
    print("Checking output...")
    checker = DataChecker()
    checker.internal_id(SCHEDULE)
    checker.uniqueness(SCHEDULE, INTERNAL_ID_FIELD)
    errors = checker.check_everything(data_frame_dict)
    if len(errors) > 0:
        print("There is(are) validation error(s):")
        for error in errors:
            print(" - " + error.get_message())
        sys.exit(1)
    return None

Concerning the generated microservice, each Python worker contains a run-python-engine.py file which is where the actual processing code is called in the worker context. The pattern is to call the processing library code with a dataframe dictionary as argument and retrieve the modified dataframe dictionary as a return value of the execution.

The extract below shows how this is happening:

# Init the worker bridge (ExecutionContext)
execution_context = ExecutionContext()
execution_context.start()

print("Reading input...")
input_collector_path = execution_context.get_input_path("inputCollector")

print("Processing...")
# If you need to emit metrics, you have to pass the execution_context to your engine.
data_frame_dict = engine.run(input_collector_path)

print("Writing output...")
output_path = execution_context.get_output_path("outputCollector")
output_archive = CsvCollectorArchive(output_path, "w")
data_frame_dict.store_collector_archive(output_archive)

# Stop the ExecutionContext properly
execution_context.stop()

DOC allows using IntelliJ to debug Python workers. For more details, refer to Section Using an Integrated Development Environment.