Skip to content

Orchestrator Service Structure

The orchestrator service coordinates the pipeline execution and manages the overall flow of data through the pipeline.

Orchestrator Application

The orchestrator implements the framework's QuarkusApplication interface and Callable<Integer> for command-line interface:

java
// orchestrator-svc/src/main/java/com/example/app/orchestrator/OrchestratorApplication.java
@CommandLine.Command(
    name = "orchestrator",
    mixinStandardHelpOptions = true,
    version = "1.0.0",
    description = "{{appName}} Orchestrator Service")
public class OrchestratorApplication implements QuarkusApplication, Callable<Integer> {

    @Option(
        names = {"-i", "--input"}, 
        description = "Input value for the pipeline",
        defaultValue = "${sys:quarkus.pipeline.input:-${env:PIPELINE_INPUT}}"
    )
    String input;

    @Inject
    PipelineExecutionService pipelineExecutionService;

    @Override
    public int run(String... args) {
        return new CommandLine(this).execute(args);
    }

    public Integer call() {
        if (input == null || input.trim().isEmpty()) {
            System.err.println("Input parameter is required");
            return CommandLine.ExitCode.USAGE;
        }
        
        Multi<{{firstInputTypeName}}> inputMulti = getInputMulti(input);

        // Execute the pipeline with the processed input using injected service
        pipelineExecutionService.executePipeline(inputMulti)
            .collect().asList()
            .await().indefinitely();

        System.out.println("Pipeline execution completed");
        return CommandLine.ExitCode.OK;
    }
    
    // This method needs to be implemented by the user after template generation
    // based on their specific input type and requirements
    private Multi<{{firstInputTypeName}}> getInputMulti(String input) {
        // TODO: User needs to implement this method after template generation
        // Create and return appropriate Multi based on the input and first step requirements
        // For example:
        // {{firstInputTypeName}} inputItem = new {{firstInputTypeName}}();
        // inputItem.setField(input);
        // return Multi.createFrom().item(inputItem);
        
        throw new UnsupportedOperationException("Method getInputMulti needs to be implemented by user after template generation");
    }
}

Pipeline Execution Service

Handles the orchestration logic:

java
// orchestrator-svc/src/main/java/com/example/app/orchestrator/service/ProcessFolderService.java
@ApplicationScoped
public class ProcessFolderService {
    
    public Stream<CsvPaymentsInputFile> process(String csvFolderPath) {
        // Logic to read CSV files and convert to domain objects
        return Stream.of(/* ... */);
    }
}

Orchestrator Architecture