Workflow System Documentation
Overview
The Soliplex Ingester workflow system orchestrates multi-step document processing pipelines. Each document flows through a series of configurable steps, with automatic retry logic, status tracking, and parallel processing support.
Core Concepts
Workflow Definition
A WorkflowDefinition specifies the processing pipeline for documents. It defines: - Unique ID and name - Item steps (processing stages) - Lifecycle event handlers - Retry policies
Defined in YAML files at: config/workflows/*.yaml
Workflow Run
A WorkflowRun represents a single execution of a workflow for one document. It tracks: - Status (PENDING → RUNNING → COMPLETED/FAILED) - Start and completion timestamps - Priority level - Associated document and batch
Run Group
A RunGroup aggregates multiple workflow runs that were started together (e.g., all documents in a batch). It provides: - Group-level status tracking - Aggregate statistics - Batch coordination
Run Step
A RunStep is one stage of execution within a workflow run. Each step: - Executes a specific handler method - Has its own status and retry counter - Produces artifacts stored in the file system - Updates database on completion/failure
Workflow Step Types
The system supports these predefined step types:
INGEST
- Purpose: Load raw document into the system
- Input: File bytes or URI
- Output: Document stored in file system
- Artifact:
ArtifactType.DOC
VALIDATE
- Purpose: Check document format and readability
- Input: Raw document
- Output: Validation result
- Handler:
soliplex_ingester.lib.workflow.validate_document
PARSE
- Purpose: Extract text, structure, and metadata
- Input: Raw document
- Output: Markdown and JSON representations
- Artifacts:
ArtifactType.PARSED_MD,ArtifactType.PARSED_JSON - Handler:
soliplex_ingester.lib.workflow.parse_document - Service: Docling server
CHUNK
- Purpose: Split document into semantic chunks
- Input: Parsed markdown
- Output: Array of text chunks
- Artifact:
ArtifactType.CHUNKS - Handler:
soliplex_ingester.lib.workflow.chunk_document
EMBED
- Purpose: Generate vector embeddings for chunks
- Input: Text chunks
- Output: Embedding vectors
- Artifact:
ArtifactType.EMBEDDINGS - Handler:
soliplex_ingester.lib.workflow.embed_document
STORE
- Purpose: Save embeddings to RAG system
- Input: Embeddings
- Output: RAG document ID
- Artifact:
ArtifactType.RAG - Handler:
soliplex_ingester.lib.workflow.save_to_rag - Backend: LanceDB + HaikuRAG
ENRICH
- Purpose: Add metadata or perform additional processing
- Input: Document and existing artifacts
- Output: Enhanced metadata
- Handler: Custom (user-defined)
ROUTE
- Purpose: Conditional logic for workflow branching
- Input: Document state
- Output: Routing decision
- Handler: Custom (user-defined)
Workflow Configuration
Workflow Definition YAML
Example: config/workflows/batch.yaml
id: batch
name: Batch Workflow
meta: {}
item_steps:
validate:
name: docling validate
retries: 3
method: soliplex_ingester.lib.workflow.validate_document
parameters: {}
parse:
name: docling parse
retries: 3
method: soliplex_ingester.lib.workflow.parse_document
parameters: {}
chunk:
name: docling chunk
retries: 3
method: soliplex_ingester.lib.workflow.chunk_document
parameters: {}
embed:
name: embeddings
retries: 3
method: soliplex_ingester.lib.workflow.embed_document
parameters: {}
store:
name: save to rag
retries: 3
method: soliplex_ingester.lib.workflow.save_to_rag
parameters: {}
lifecycle_events:
group_start:
- name: log group start
method: soliplex_ingester.lib.workflow.log_event
retries: 1
parameters:
message: "Starting workflow group"
Parameter Set YAML
Parameters control step behavior without changing the workflow definition.
Example: config/params/default.yaml
id: default
name: Default Parameters
meta:
description: Standard processing parameters
config:
parse:
format: markdown
ocr_enabled: true
chunk:
chunk_size: 512
chunk_overlap: 50
separator: "\n\n"
embed:
model: text-embedding-3-small
batch_size: 1000
store:
data_dir: lancedb
collection_name: documents
Workflow Execution
Creating Workflows
For a Batch:
curl -X POST "http://localhost:8000/api/v1/batch/start-workflows" \
-d "batch_id=1" \
-d "workflow_definition_id=batch" \
-d "param_id=default"
For a Single Document:
curl -X POST "http://localhost:8000/api/v1/workflow/" \
-d "doc_id=sha256-abc123..." \
-d "workflow_definition_id=batch"
Worker Processing
Workers continuously poll for pending steps:
- Query database for PENDING steps with highest priority
- Lock step with
FOR UPDATEto prevent duplicate processing - Transition status: PENDING → RUNNING
- Execute handler method
- Store artifacts in file system
- Update step status: RUNNING → COMPLETED/ERROR
- Update parent run status based on step results
- Repeat
Start a worker:
Or via server (starts worker automatically):
Status Transitions
Valid Transitions: - PENDING → RUNNING - RUNNING → COMPLETED - RUNNING → ERROR - ERROR → RUNNING (retry) - ERROR → FAILED (after max retries)
Invalid Transitions: - COMPLETED → RUNNING (no re-running completed steps) - FAILED → RUNNING (use retry endpoint instead)
Lifecycle Events
Lifecycle events allow custom logic at key points:
Event Types
GROUP_START- Run group beginsGROUP_END- Run group completesITEM_START- Workflow run beginsITEM_END- Workflow run completesITEM_FAILED- Workflow run failsSTEP_START- Step beginsSTEP_END- Step completesSTEP_FAILED- Step fails
Event Handler Example
lifecycle_events:
item_failed:
- name: notify on failure
method: myapp.notifications.send_alert
retries: 3
parameters:
channel: "#alerts"
message: "Document processing failed"
Retry Logic
Automatic Retries
Steps automatically retry on error: - Configurable retry count per step - Exponential backoff (implementation dependent) - Status: ERROR during retries, FAILED when exhausted
Manual Retry
Reset failed steps for a run group:
This resets all FAILED steps to PENDING for re-processing.
Monitoring
Check Run Group Status
Returns:
{
"total_runs": 100,
"completed": 95,
"running": 3,
"pending": 0,
"failed": 2,
"average_duration": 45.3,
"group_status": "RUNNING"
}
Check Specific Workflow Run
Returns workflow run with all steps and their status.
Query Steps by Status
Lists all failed steps for investigation.
Custom Step Handlers
Handler Signature
async def custom_handler(
run_step: RunStep,
workflow_run: WorkflowRun,
doc: Document,
step_params: dict[str, Any]
) -> dict[str, Any]:
"""
Custom workflow step handler.
Args:
run_step: Current step being executed
workflow_run: Parent workflow run
doc: Document being processed
step_params: Parameters from config
Returns:
Dictionary with result metadata
Raises:
Exception: On failure (will trigger retry)
"""
# Your processing logic here
result_data = await process_document(doc)
# Store artifacts if needed
await store_artifact(
doc.hash,
ArtifactType.CUSTOM,
result_data
)
return {"status": "success", "items_processed": 42}
Registering Custom Handler
- Define the handler in your Python module
- Add to workflow YAML:
- Ensure module is importable by the worker process
Worker Configuration
Environment Variables
INGEST_WORKER_CONCURRENCY- Max concurrent workflow steps (default: 10)INGEST_QUEUE_CONCURRENCY- Max concurrent queue operations (default: 20)DOCLING_CONCURRENCY- Max concurrent Docling requests (default: 3)WORKER_TASK_COUNT- Steps to fetch per query (default: 5)WORKER_CHECKIN_INTERVAL- Heartbeat interval in seconds (default: 120)WORKER_CHECKIN_TIMEOUT- Worker timeout in seconds (default: 600)
Multiple Workers
Run multiple workers for increased throughput:
Database locking ensures no duplicate processing.
Artifact Storage
Storage Paths
Artifacts are stored under FILE_STORE_DIR with subdirectories:
document_store_dir/- Raw documentsparsed_markdown_store_dir/- Parsed markdownparsed_json_store_dir/- Parsed JSONchunks_store_dir/- Text chunksembeddings_store_dir/- Embedding vectors
File Naming
Files are named by document hash:
Example:
Storage Backends
Configure via FILE_STORE_TARGET:
- fs - Local filesystem (default)
- s3 - S3-compatible storage (via OpenDAL)
- Other OpenDAL-supported backends
Best Practices
Workflow Design
- Keep steps atomic - Each step should do one thing well
- Make steps idempotent - Re-running a step should be safe
- Use appropriate retries - 3 retries for transient errors, 1 for validation
- Store intermediate results - Save artifacts for debugging and recovery
Error Handling
- Raise exceptions for retriable errors
- Log context before raising (worker ID, document hash, step)
- Use status_meta to store error details
- Monitor failed steps and investigate patterns
Performance
- Tune concurrency based on available resources
- Batch operations where possible (embeddings)
- Use priorities for urgent documents
- Monitor worker health via heartbeat table
Testing
- Test handlers independently with mock data
- Use small batches for integration testing
- Verify artifact storage after each step
- Test retry logic by simulating failures
Troubleshooting
Stuck Workflows
Symptom: Workflows remain in RUNNING status indefinitely
Solution:
1. Check worker logs for exceptions
2. Query stuck steps: SELECT * FROM runstep WHERE status='RUNNING' AND start_date < NOW() - INTERVAL '1 hour'
3. Check worker heartbeat: SELECT * FROM workercheckin
4. Restart workers if stale
Failed Steps
Symptom: Steps transition to FAILED status
Solution:
1. Query step details: GET /api/v1/workflow/steps?status=FAILED
2. Check status_message and status_meta for error info
3. Fix underlying issue (service down, invalid config, etc.)
4. Retry: POST /api/v1/workflow/retry
Slow Processing
Symptom: Low throughput, long durations
Solution:
1. Check Docling server response time
2. Increase INGEST_WORKER_CONCURRENCY
3. Run multiple workers
4. Verify database performance (add indexes if needed)
5. Check network latency to external services
Duplicate Processing
Symptom: Same document processed multiple times
Solution:
1. Verify database locking is working (FOR UPDATE)
2. Check for unique constraint violations in logs
3. Ensure workers use distinct worker IDs
4. Verify step status transitions are atomic