Skip to content

Soliplex Ingester Architecture

Overview

Soliplex Ingester is a document processing and RAG (Retrieval-Augmented Generation) ingestion system designed to handle large-scale document workflows. It provides a FastAPI-based REST API, workflow orchestration, and integration with document parsing and embedding services.

System Components

1. FastAPI Server

The server provides REST API endpoints for document and workflow management:

  • Document Routes (/api/v1/document/*) - Document upload, retrieval, and management
  • Batch Routes (/api/v1/batch/*) - Batch processing operations
  • Workflow Routes (/api/v1/workflow/*) - Workflow execution and monitoring
  • Stats Routes (/api/v1/stats/*) - System statistics and metrics

Server entry point: src/soliplex/ingester/server/__init__.py:30

2. Workflow System

The workflow system orchestrates multi-step document processing pipelines:

Document → Validate → Parse → Chunk → Embed → Store

Workflow Components:

  • WorkflowDefinition - Defines the steps and lifecycle events for a workflow
  • WorkflowRun - Represents a single execution instance for one document
  • RunGroup - Groups multiple workflow runs together
  • RunStep - Individual step execution within a workflow run

Step Types:

  • INGEST - Load document into system
  • VALIDATE - Validate document format and content
  • PARSE - Extract text and structure from document
  • CHUNK - Split document into semantic chunks
  • EMBED - Generate vector embeddings
  • STORE - Save to RAG system (LanceDB + HaikuRAG)
  • ENRICH - Add metadata or additional processing
  • ROUTE - Conditional routing logic

Implementation: src/soliplex/ingester/lib/wf/

3. Worker System

Async workers process workflow steps concurrently:

  • Workers poll for pending workflow steps
  • Configurable concurrency levels for different operations
  • Automatic retry logic with configurable retry counts
  • Health check/heartbeat system via WorkerCheckin

Worker implementation: src/soliplex/ingester/lib/wf/runner.py

4. Storage Layer

Database:

  • SQLModel + SQLAlchemy with async support
  • Supports SQLite (dev) and PostgreSQL (production)
  • Alembic for migrations

File Storage:

  • Configurable backends (filesystem, S3-compatible via OpenDAL)
  • Separate storage locations for different artifact types:
  • Raw documents
  • Parsed markdown
  • Parsed JSON
  • Chunks
  • Embeddings

Vector Storage:

  • LanceDB for vector embeddings
  • HaikuRAG client for retrieval operations

5. Document Processing Pipeline

graph LR
    A[Upload Document] --> B[Create DocumentURI]
    B --> C[Hash & Store as Document]
    C --> D[Queue Workflow Run]
    D --> E[Validate Step]
    E --> F[Parse with Docling]
    F --> G[Chunk Text]
    G --> H[Generate Embeddings]
    H --> I[Store in LanceDB]
    I --> J[Update RAG Index]

6. External Services

Docling Server:

  • Document parsing service
  • Extracts text, structure, and metadata
  • Configurable via DOCLING_SERVER_URL

HaikuRAG:

  • RAG backend for document retrieval
  • Vector search and document management
  • Optional (controlled by DO_RAG setting)

Data Flow

Document Ingestion Flow

  1. Upload - Client uploads document via /api/v1/document/upload
  2. Hash & Dedupe - System computes SHA256 hash, checks for duplicates
  3. Create URI - Maps source URI to document hash
  4. Batch Assignment - Associates document with processing batch
  5. Workflow Creation - Creates WorkflowRun and RunSteps
  6. Worker Processing - Workers pick up and execute steps
  7. Status Updates - Database tracks step and run status
  8. Completion - Document marked complete when all steps succeed

Workflow Execution Flow

  1. Worker Startup - Worker registers and starts polling
  2. Step Selection - Worker queries for PENDING steps with FOR UPDATE lock
  3. Status Transition - PENDING → RUNNING → COMPLETED/ERROR/FAILED
  4. Step Execution - Calls registered handler method
  5. Artifact Storage - Saves intermediate results
  6. Retry Logic - Automatic retry on ERROR status
  7. Run Completion - Aggregates step status to run status
  8. Group Completion - Aggregates run status to group status

Configuration

Configuration via environment variables with pydantic-settings:

  • Database connection
  • File storage paths
  • Worker concurrency settings
  • External service URLs
  • Workflow and parameter directories

See src/soliplex/ingester/lib/config.py:15 for full configuration schema.

Scalability

Horizontal Scaling:

  • Multiple workers can run concurrently
  • Database row-level locking prevents duplicate processing
  • Stateless API servers can be load balanced

Vertical Scaling:

  • Configurable concurrency per worker
  • Batch size controls for embedding operations
  • Connection pooling for database access

Workflow Parallelism:

  • Multiple workflows can process simultaneously
  • Steps within a workflow run sequentially
  • Different documents process independently

Technology Stack

  • Web Framework: FastAPI 0.120+
  • Database ORM: SQLModel 0.0.27+
  • Async Runtime: asyncio
  • CLI: Typer
  • Document Parsing: Docling
  • Vector DB: LanceDB 0.25+
  • RAG: HaikuRAG
  • Storage: OpenDAL (multi-backend support)

Extension Points

Custom Workflow Steps: Define custom step handlers by:

  1. Creating a new async function matching the EventHandler signature
  2. Registering in workflow YAML configuration
  3. Implementing retry logic and error handling

Custom Storage Backends: Configure via FILE_STORE_TARGET environment variable and OpenDAL configuration.

Custom Lifecycle Events: Add event handlers in workflow configuration to respond to:

  • GROUP_START / GROUP_END
  • ITEM_START / ITEM_END
  • STEP_START / STEP_END
  • ITEM_FAILED / STEP_FAILED

Monitoring

Database Tables:

  • workflowrun - Track run status and duration
  • runstep - Monitor individual step execution
  • workcheckin - Worker health and activity
  • lifecyclehistory - Audit trail of events

Metrics Available:

  • Document processing throughput
  • Step success/failure rates
  • Worker utilization
  • Processing durations
  • Batch completion times

Access via /api/v1/stats/* endpoints.