Observability
This guide explains how to monitor, trace, and observe pipeline applications built with The Pipeline Framework using industry-standard tools and practices.
Overview
Observability in The Pipeline Framework encompasses:
- Metrics: Collecting quantitative data about pipeline performance
- Tracing: Tracking requests as they flow through the pipeline
- Logging: Structured logging with context for debugging
- Health Checks: Monitoring service availability and readiness
The framework provides built-in support for metrics and health checks through Quarkus and Micrometer.
Metrics Collection
Built-in Metrics
The Pipeline Framework automatically collects various metrics:
// Automatic metrics collected:
// - Pipeline execution duration
// - Step execution time
// - Success/failure rates
// - Throughput counters
// - Error counts by type
// - Resource utilizationCustom Metrics
Add custom metrics to your services:
@PipelineStep(
// ... configuration
)
@ApplicationScoped
public class ProcessPaymentService implements StepOneToOne<PaymentRecord, PaymentStatus> {
@Inject
MetricRegistry metricRegistry;
private Timer processingTimer;
private Counter successCounter;
private Counter failureCounter;
private Histogram paymentAmountHistogram;
@PostConstruct
void initializeMetrics() {
processingTimer = metricRegistry.timer("payment.processing.duration");
successCounter = metricRegistry.counter("payment.processing.success");
failureCounter = metricRegistry.counter("payment.processing.failure");
paymentAmountHistogram = metricRegistry.histogram("payment.amount.distribution");
}
@Override
public Uni<PaymentStatus> applyOneToOne(PaymentRecord paymentRecord) {
Timer.Context timerContext = processingTimer.time();
paymentAmountHistogram.update(paymentRecord.getAmount().doubleValue());
return processPayment(paymentRecord)
.onItem().invoke(result -> {
timerContext.stop();
successCounter.inc();
LOG.info("Payment processed successfully: amount={}", paymentRecord.getAmount());
})
.onFailure().invoke(error -> {
timerContext.stop();
failureCounter.inc();
LOG.error("Payment processing failed: amount={}", paymentRecord.getAmount(), error);
});
}
private Uni<PaymentStatus> processPayment(PaymentRecord record) {
// Implementation
return Uni.createFrom().item(/* ... */);
}
}Micrometer Integration
The framework integrates with Micrometer for metrics collection:
# application.properties
# Enable Micrometer
quarkus.micrometer.enabled=true
quarkus.micrometer.export.prometheus.enabled=true
quarkus.micrometer.export.prometheus.path=/q/metrics
quarkus.micrometer.binder.http-server.enabled=true
quarkus.micrometer.binder.http-client.enabled=true
# Configure histogram buckets
quarkus.micrometer.distribution.percentiles-histogram.http.server.requests=true
quarkus.micrometer.distribution.percentiles-histogram.http.client.requests=trueDistributed Tracing
OpenTelemetry Integration
The framework supports OpenTelemetry for distributed tracing:
# application.properties
# Enable OpenTelemetry
quarkus.otel.enabled=true
quarkus.otel.traces.enabled=true
quarkus.otel.exporter.otlp.endpoint=http://localhost:4317
quarkus.otel.exporter.otlp.protocol=grpc
# Configure tracing sampler
quarkus.otel.traces.sampler=always_onCustom Spans
Add custom spans to your services:
@PipelineStep(
// ... configuration
)
@ApplicationScoped
public class ProcessPaymentService implements StepOneToOne<PaymentRecord, PaymentStatus> {
@Inject
Tracer tracer;
@Override
public Uni<PaymentStatus> applyOneToOne(PaymentRecord paymentRecord) {
Span span = tracer.spanBuilder("process-payment")
.setAttribute("payment.id", paymentRecord.getId().toString())
.setAttribute("payment.amount", paymentRecord.getAmount().doubleValue())
.setAttribute("payment.currency", paymentRecord.getCurrency().getCurrencyCode())
.setAttribute("payment.recipient", paymentRecord.getRecipient())
.startSpan();
try (Scope scope = span.makeCurrent()) {
return processPayment(paymentRecord)
.onItem().invoke(result -> {
span.setStatus(StatusCode.OK);
LOG.info("Payment processed successfully");
})
.onFailure().invoke(error -> {
span.recordException(error);
span.setStatus(StatusCode.ERROR, error.getMessage());
LOG.error("Payment processing failed", error);
});
} finally {
span.end();
}
}
private Uni<PaymentStatus> processPayment(PaymentRecord record) {
// Add sub-spans for individual operations
Span validationSpan = tracer.spanBuilder("validate-payment")
.setAttribute("validation.type", "business-rules")
.startSpan();
try (Scope scope = validationSpan.makeCurrent()) {
validatePayment(record);
validationSpan.setStatus(StatusCode.OK);
} catch (Exception e) {
validationSpan.recordException(e);
validationSpan.setStatus(StatusCode.ERROR, e.getMessage());
throw e;
} finally {
validationSpan.end();
}
// Continue with processing...
return Uni.createFrom().item(/* ... */);
}
private void validatePayment(PaymentRecord record) {
// Validation logic
}
}Context Propagation
Ensure trace context propagation across services:
@Override
public Uni<PaymentStatus> applyOneToOne(PaymentRecord paymentRecord) {
// Trace context is automatically propagated through gRPC calls
return grpcClient.remoteProcess(convertToGrpc(paymentRecord))
.onItem().transform(this::convertFromGrpc)
.onFailure().recoverWithUni(error -> {
// Handle errors while preserving trace context
LOG.error("gRPC call failed", error);
return Uni.createFrom().failure(error);
});
}Structured Logging
MDC Integration
Use MDC (Mapped Diagnostic Context) for structured logging:
@PipelineStep(
// ... configuration
)
@ApplicationScoped
public class ProcessPaymentService implements StepOneToOne<PaymentRecord, PaymentStatus> {
private static final Logger LOG = LoggerFactory.getLogger(ProcessPaymentService.class);
@Override
public Uni<PaymentStatus> applyOneToOne(PaymentRecord paymentRecord) {
// Add context to MDC
MDC.put("paymentId", paymentRecord.getId().toString());
MDC.put("customerId", paymentRecord.getCustomerId());
MDC.put("amount", paymentRecord.getAmount().toString());
MDC.put("currency", paymentRecord.getCurrency().getCurrencyCode());
try {
LOG.info("Starting payment processing");
return processPayment(paymentRecord)
.onItem().invoke(result -> {
LOG.info("Payment processed successfully: status={}", result.getStatus());
})
.onFailure().invoke(error -> {
LOG.error("Payment processing failed", error);
});
} finally {
// Clean up MDC
MDC.clear();
}
}
}JSON Logging
Configure JSON logging for better log analysis:
# application.properties
# Enable JSON logging
quarkus.log.handler.console.json.enabled=true
quarkus.log.handler.console.json.pretty-print=false
quarkus.log.handler.console.json.fields.timestamp.field-name=@timestamp
quarkus.log.handler.console.json.fields.logger.field-name=loggerName
quarkus.log.handler.console.json.fields.level.field-name=logLevelLog Levels
Configure appropriate log levels for different environments:
# application-dev.properties
quarkus.log.category."org.pipelineframework".level=DEBUG
quarkus.log.category."com.example.app".level=DEBUG
# application-prod.properties
quarkus.log.category."org.pipelineframework".level=INFO
quarkus.log.category."com.example.app".level=WARNHealth Checks
Built-in Health Checks
The framework provides built-in health checks:
// Health check endpoints automatically available at:
// GET /q/health
// GET /q/health/live
// GET /q/health/readyCustom Health Checks
Add custom health checks to your services:
@ApplicationScoped
public class PaymentProcessingHealthCheck implements HealthCheck {
@Inject
PaymentServiceClient paymentServiceClient;
@Override
public HealthCheckResponse call() {
try {
// Check if external payment service is reachable
boolean isHealthy = paymentServiceClient.ping()
.await().atMost(Duration.ofSeconds(5));
return HealthCheckResponse.named("payment-service")
.withData("endpoint", paymentServiceClient.getEndpoint())
.status(isHealthy)
.build();
} catch (Exception e) {
return HealthCheckResponse.named("payment-service")
.withData("error", e.getMessage())
.down()
.build();
}
}
}Register health checks:
# application.properties
# Register custom health check
quarkus.health.exclude=falseAlerting and Monitoring
Prometheus Alerts
Define alerts in Prometheus:
# prometheus-alerts.yml
groups:
- name: pipeline-alerts
rules:
- alert: HighErrorRate
expr: rate(payment_processing_failure[5m]) > 0.05
for: 2m
labels:
severity: warning
annotations:
summary: "High error rate in payment processing"
description: "Payment processing error rate is above 5% for the last 5 minutes"
- alert: HighLatency
expr: histogram_quantile(0.95, rate(payment_processing_duration_seconds_bucket[5m])) > 30
for: 2m
labels:
severity: critical
annotations:
summary: "High latency in payment processing"
description: "95th percentile payment processing latency exceeds 30 seconds"Grafana Dashboards
Create comprehensive dashboards in Grafana:
{
"dashboard": {
"title": "Pipeline Application Metrics",
"panels": [
{
"title": "Pipeline Execution Rate",
"type": "graph",
"targets": [
{
"expr": "rate(pipeline_execution_total[5m])",
"legendFormat": "{{status}}"
}
]
},
{
"title": "Average Processing Time",
"type": "graph",
"targets": [
{
"expr": "rate(pipeline_processing_duration_seconds_sum[5m]) / rate(pipeline_processing_duration_seconds_count[5m])",
"legendFormat": "avg_processing_time"
}
]
},
{
"title": "Error Distribution",
"type": "piechart",
"targets": [
{
"expr": "pipeline_error_total",
"legendFormat": "{{error_type}}"
}
]
}
]
}
}Monitoring Pipeline Steps
Step-Level Metrics
Monitor individual pipeline steps:
@PipelineStep(
// ... configuration
)
@ApplicationScoped
public class ProcessPaymentService implements StepOneToOne<PaymentRecord, PaymentStatus> {
private Timer stepTimer;
private Counter stepSuccess;
private Counter stepFailure;
private Gauge queueSize;
@PostConstruct
void initializeStepMetrics() {
String stepName = this.getClass().getSimpleName();
stepTimer = metricRegistry.timer(
MetricRegistry.name("pipeline", "step", stepName, "duration")
);
stepSuccess = metricRegistry.counter(
MetricRegistry.name("pipeline", "step", stepName, "success")
);
stepFailure = metricRegistry.counter(
MetricRegistry.name("pipeline", "step", stepName, "failure")
);
queueSize = metricRegistry.gauge(
MetricRegistry.name("pipeline", "step", stepName, "queue-size"),
new AtomicLong(0)
);
}
@Override
public Uni<PaymentStatus> applyOneToOne(PaymentRecord paymentRecord) {
Timer.Context timerContext = stepTimer.time();
return processPayment(paymentRecord)
.onItem().invoke(result -> {
timerContext.stop();
stepSuccess.inc();
LOG.info("Step {} completed successfully", this.getClass().getSimpleName());
})
.onFailure().invoke(error -> {
timerContext.stop();
stepFailure.inc();
LOG.error("Step {} failed", this.getClass().getSimpleName(), error);
});
}
}Pipeline Flow Visualization
Create visualizations to monitor pipeline flow:
Best Practices
Metrics Design
- Use Meaningful Names: Follow naming conventions (e.g.,
pipeline.step.duration) - Include Dimensions: Add labels for filtering and grouping
- Avoid High Cardinality: Don't use unique identifiers as label values
- Measure What Matters: Focus on business-relevant metrics
Tracing Strategy
- Span Coverage: Instrument all major operations
- Attribute Selection: Include relevant context as span attributes
- Error Recording: Record exceptions with full stack traces
- Context Propagation: Ensure trace context flows through all services
Logging Standards
- Structured Format: Use JSON logging in production
- Contextual Information: Include MDC with request context
- Appropriate Levels: Use DEBUG, INFO, WARN, ERROR appropriately
- Sensitive Data: Never log sensitive information
Health Check Design
- Fast Responses: Health checks should respond quickly
- Meaningful Status: Reflect actual service health
- Graceful Degradation: Indicate partial availability when appropriate
- External Dependencies: Consider dependency health in readiness checks
Alerting Principles
- Actionable Alerts: Only alert on issues requiring human intervention
- Clear Descriptions: Include enough context to understand the issue
- Appropriate Severity: Use warning vs critical levels appropriately
- Noise Reduction: Tune thresholds to minimize false positives
The Pipeline Framework's observability features provide comprehensive monitoring capabilities to ensure your pipeline applications are reliable, performant, and maintainable in production environments.