Database Models and Schema
Overview
Soliplex Ingester uses SQLModel (built on SQLAlchemy) for database modeling with async support. The system supports both SQLite (development) and PostgreSQL (production).
Database models defined in: src/soliplex/ingester/lib/models.py
Database Connection
Configuration
Set via environment variable:
DOC_DB_URL="sqlite+aiosqlite:///./db/documents.db"
# or
DOC_DB_URL="postgresql+asyncpg://user:pass@localhost/soliplex"
Database Manager
The Database class manages engine lifecycle and session creation with automatic connection pooling.
from soliplex.ingester.lib.models import Database
# Initialize once at application startup
await Database.initialize()
# Or with custom URL (for testing)
await Database.initialize("sqlite+aiosqlite:///:memory:")
# Get sessions anywhere in the app
async with Database.session() as session:
result = await session.exec(select(Document))
# Transaction auto-commits on success, rollback on exception
# Cleanup at shutdown
await Database.close()
# Reset and reinitialize (primarily for testing)
await Database.reset(url)
Backwards-Compatible Functions
from soliplex.ingester.lib.models import get_session, get_engine
async with get_session() as session:
result = await session.exec(select(Document))
engine = await get_engine()
Core Models
DocumentBatch
Represents a batch of documents ingested together.
Table: documentbatch
Fields:
id(int, primary key) - Auto-increment batch IDname(str) - Human-readable batch namesource(str) - Source system identifierstart_date(datetime) - When batch processing startedcompleted_date(datetime, nullable) - When batch completedbatch_params(dict[str, str]) - JSON metadata
Computed Fields:
duration(float) - Processing time in seconds (None if not completed)
Example:
{
"id": 1,
"name": "Q4 Financial Reports",
"source": "sharepoint",
"start_date": "2025-01-15T10:00:00",
"completed_date": "2025-01-15T12:30:00",
"batch_params": {"department": "finance"},
"duration": 9000.0
}
Document
Represents a unique document identified by content hash.
Table: document
Fields:
hash(str, primary key) - SHA256 content hash (format: "sha256-...")mime_type(str) - Document MIME typefile_size(int, nullable) - Size in bytesdoc_meta(dict[str, str]) - JSON metadata
Relationships:
- Multiple
DocumentURIrecords can reference the same document
Deduplication: Documents are deduplicated by hash. If the same file is ingested multiple times, only one Document record exists.
Example:
{
"hash": "sha256-a1b2c3d4e5f6...",
"mime_type": "application/pdf",
"file_size": 1024000,
"doc_meta": {
"author": "John Doe",
"title": "Q4 Report"
}
}
DocumentURI
Maps source URIs to document hashes, allowing multiple URIs to reference the same document.
Table: documenturi
Fields:
id(int, primary key) - Auto-increment IDdoc_hash(str, foreign key) - Referencesdocument.hashuri(str) - Source system path/identifiersource(str) - Source system nameversion(int) - Version number (increments on changes)batch_id(int, foreign key, nullable) - Associated batch
Constraints:
- Unique constraint on
(uri, source)- One active URI per source
Use Cases:
- Track document locations across source systems
- Detect when a document at a URI has changed (hash mismatch)
- Support document versioning
Example:
{
"id": 42,
"doc_hash": "sha256-a1b2c3d4e5f6...",
"uri": "/sharepoint/finance/q4-report.pdf",
"source": "sharepoint",
"version": 2,
"batch_id": 1
}
DocumentURIHistory
Tracks historical versions of documents at specific URIs.
Table: documenturihistory
Fields:
id(int, primary key) - Auto-increment IDdoc_uri_id(int, foreign key) - Referencesdocumenturi.idversion(int) - Version numberhash(str) - Document hash at this versionprocess_date(datetime) - When this version was processedaction(str) - Action taken ("created", "updated", "deleted")batch_id(int, foreign key, nullable) - Associated batchhistmeta(dict[str, str]) - JSON metadata
Use Cases:
- Audit trail of document changes
- Rollback to previous versions
- Compliance and record-keeping
Example:
{
"id": 100,
"doc_uri_id": 42,
"version": 1,
"hash": "sha256-old-hash...",
"process_date": "2025-01-10T10:00:00",
"action": "created",
"batch_id": 1,
"histmeta": {"user": "importer"}
}
DocumentBytes
Stores raw file bytes and artifacts in the database.
Table: documentbytes
Fields:
hash(str, primary key) - Document hashartifact_type(str, primary key) - Type of artifactstorage_root(str, primary key) - Storage location identifierfile_size(int, nullable) - Size in bytes (auto-computed from file_bytes)file_bytes(bytes) - Raw binary data
Artifact Types:
document- Raw documentparsed_markdown- Extracted markdownparsed_json- Structured JSONchunks- Text chunksembeddings- Vector embeddingsrag- RAG metadata
Note: For production, consider using file storage instead of database storage for large binaries.
Example:
{
"hash": "sha256-a1b2c3d4e5f6...",
"artifact_type": "parsed_markdown",
"storage_root": "db",
"file_size": 50000,
"file_bytes": "..."
}
Workflow Models
RunGroup
Groups related workflow runs together.
Table: rungroup
Fields:
id(int, primary key) - Auto-increment IDname(str, nullable) - Optional group nameworkflow_definition_id(str) - Workflow usedparam_definition_id(str) - Parameter set usedbatch_id(int, foreign key, nullable) - Associated batchcreated_date(datetime) - When group was createdstart_date(datetime) - When first run startedcompleted_date(datetime, nullable) - When all runs completedstatus(RunStatus) - Overall group statusstatus_date(datetime, nullable) - When status last changedstatus_message(str, nullable) - Status descriptionstatus_meta(dict[str, str]) - JSON metadata
Relationships:
- Has many
WorkflowRunrecords - Has many
LifecycleHistoryrecords
Example:
{
"id": 5,
"name": "Batch 1 Processing",
"workflow_definition_id": "batch",
"param_definition_id": "default",
"batch_id": 1,
"created_date": "2025-01-15T10:00:00",
"start_date": "2025-01-15T10:01:00",
"completed_date": null,
"status": "RUNNING",
"status_date": "2025-01-15T10:30:00",
"status_message": "Processing documents",
"status_meta": {}
}
WorkflowRun
Represents a single workflow execution for one document.
Table: workflowrun
Fields:
id(int, primary key) - Auto-increment IDworkflow_definition_id(str) - Workflow definition IDrun_group_id(int, foreign key) - Parent groupbatch_id(int, foreign key) - Associated batchdoc_id(str) - Document hash being processedpriority(int) - Processing priority (higher = more urgent)created_date(datetime) - When run was createdstart_date(datetime) - When first step startedcompleted_date(datetime, nullable) - When all steps completedstatus(RunStatus) - Current statusstatus_date(datetime, nullable) - When status last changedstatus_message(str, nullable) - Status descriptionstatus_meta(dict[str, str]) - JSON metadatarun_params(dict[str, str|int|bool]) - Runtime parameters
Computed Fields:
duration(float) - Processing time in seconds (None if not completed)
Relationships:
- Has many
RunSteprecords - Belongs to
RunGroup - References
Documentviadoc_id
Example:
{
"id": 100,
"workflow_definition_id": "batch",
"run_group_id": 5,
"batch_id": 1,
"doc_id": "sha256-a1b2c3d4e5f6...",
"priority": 0,
"created_date": "2025-01-15T10:00:00",
"start_date": "2025-01-15T10:01:00",
"completed_date": null,
"status": "RUNNING",
"status_date": "2025-01-15T10:05:00",
"status_message": "Processing step 3 of 5",
"status_meta": {},
"run_params": {},
"duration": null
}
RunStep
Represents one step within a workflow run.
Table: runstep
Fields:
id(int, primary key) - Auto-increment IDworkflow_run_id(int, foreign key) - Parent workflow runworkflow_step_number(int) - Step sequence numberworkflow_step_name(str) - Step name/identifierstep_config_id(int, foreign key) - Configuration usedstep_type(WorkflowStepType) - Type of stepis_last_step(bool) - Whether this is the final stepcreated_date(datetime) - When step was createdpriority(int) - Processing prioritystart_date(datetime, nullable) - When step started executingstatus_date(datetime, nullable) - When status last changedcompleted_date(datetime, nullable) - When step completedretry(int) - Current retry attempt (0-indexed)retries(int) - Maximum retry attemptsstatus(RunStatus) - Current statusstatus_message(str, nullable) - Status descriptionstatus_meta(dict[str, str]) - JSON metadataworker_id(str, nullable) - Worker processing this step
Computed Fields:
duration(float) - Execution time in seconds (None if not completed)
Relationships:
- Belongs to
WorkflowRun - References
StepConfig
Example:
{
"id": 500,
"workflow_run_id": 100,
"workflow_step_number": 2,
"workflow_step_name": "parse",
"step_config_id": 10,
"step_type": "parse",
"is_last_step": false,
"created_date": "2025-01-15T10:01:00",
"priority": 0,
"start_date": "2025-01-15T10:02:00",
"status_date": "2025-01-15T10:05:00",
"completed_date": null,
"retry": 0,
"retries": 1,
"status": "RUNNING",
"status_message": "Parsing with Docling",
"status_meta": {},
"worker_id": "worker-abc-123",
"duration": null
}
StepConfig
Stores step configuration for reuse and tracking.
Table: stepconfig
Fields:
id(int, primary key) - Auto-increment IDcreated_date(datetime, nullable) - When config was createdstep_type(WorkflowStepType) - Type of stepconfig_json(dict[str, str|int|bool], nullable) - Step parameterscuml_config_json(str, nullable) - Cumulative config from previous steps
Use Cases:
- Deduplicate identical configurations
- Track which configuration was used for each run
- Audit changes to processing parameters
Example:
{
"id": 10,
"created_date": "2025-01-15T09:00:00",
"step_type": "parse",
"config_json": {
"format": "markdown",
"ocr_enabled": true
},
"cuml_config_json": "{\"validate\":{...},\"parse\":{...}}"
}
ConfigSet
Represents a complete parameter set configuration.
Table: configset
Fields:
id(int, primary key) - Auto-increment IDyaml_id(str) - Parameter set ID from YAMLyaml_contents(str) - Full YAML contentscreated_date(datetime, nullable) - When loaded
Relationships:
- Has many
ConfigSetItemrecords (junction table) - Links to multiple
StepConfigrecords
Use Cases:
- Track which parameter sets were used
- Reproduce exact configurations
- Version control for processing parameters
ConfigSetItem
Junction table linking config sets to step configs.
Table: configsetitem
Fields:
config_set_id(int, primary key, foreign key) - Referencesconfigset.idconfig_id(int, primary key, foreign key) - Referencesstepconfig.id
LifecycleHistory
Tracks lifecycle events during workflow execution.
Table: lifecyclehistory
Fields:
id(int, primary key) - Auto-increment IDevent(LifeCycleEvent) - Type of eventhandler_name(str, nullable) - Name of the handler processing the eventrun_group_id(int, foreign key) - Associated run groupworkflow_run_id(int, foreign key) - Associated workflow runstep_id(int, nullable) - Associated step (if applicable)start_date(datetime) - When event startedcompleted_date(datetime, nullable) - When event completedstatus(RunStatus) - Event statusstatus_date(datetime, nullable) - When status changedstatus_message(str, nullable) - Status descriptionstatus_meta(dict[str, str]) - JSON metadata
Event Types:
group_start/group_enditem_start/item_end/item_failedstep_start/step_end/step_failed
Use Cases:
- Audit trail of workflow execution
- Performance monitoring
- Debugging workflow issues
WorkerCheckin
Tracks worker health and activity.
Table: workercheckin
Fields:
id(str, primary key) - Worker identifierfirst_checkin(datetime) - When worker first registeredlast_checkin(datetime) - Most recent heartbeat
Constraints:
- Unique constraint on
id
Use Cases:
- Monitor active workers
- Detect stale/crashed workers
- Worker load balancing
Example:
{
"id": "worker-abc-123",
"first_checkin": "2025-01-15T10:00:00",
"last_checkin": "2025-01-15T10:30:00"
}
Enums
RunStatus
Workflow and step status values.
class RunStatus(str, Enum):
PENDING = "PENDING" # Not yet started
RUNNING = "RUNNING" # Currently executing
COMPLETED = "COMPLETED" # Finished successfully
ERROR = "ERROR" # Failed but still retrying
FAILED = "FAILED" # Permanently failed
WorkflowStepType
Types of workflow steps.
class WorkflowStepType(str, Enum):
INGEST = "ingest"
VALIDATE = "validate"
PARSE = "parse"
CHUNK = "chunk"
EMBED = "embed"
STORE = "store"
ENRICH = "enrich"
ROUTE = "route"
ArtifactType
Types of stored artifacts.
class ArtifactType(Enum):
DOC = "document"
PARSED_MD = "parsed_markdown"
PARSED_JSON = "parsed_json"
CHUNKS = "chunks"
EMBEDDINGS = "embeddings"
RAG = "rag"
LifeCycleEvent
Workflow lifecycle events.
class LifeCycleEvent(str, Enum):
GROUP_START = "group_start"
GROUP_END = "group_end"
ITEM_START = "item_start"
ITEM_END = "item_end"
ITEM_FAILED = "item_failed"
STEP_START = "step_start"
STEP_END = "step_end"
STEP_FAILED = "step_failed"
Artifact Mapping
Workflow Steps to Artifacts:
- INGEST - DOC
- PARSE - PARSED_MD, PARSED_JSON
- CHUNK - CHUNKS
- EMBED - EMBEDDINGS
- STORE - RAG
Relationships Diagram
DocumentBatch
| (1:N)
DocumentURI --> Document (N:1)
|
DocumentURIHistory
DocumentBatch
| (1:N)
RunGroup
| (1:N)
WorkflowRun --> Document (N:1)
| (1:N)
RunStep --> StepConfig (N:1)
ConfigSet
| (N:M via ConfigSetItem)
StepConfig
RunGroup --> LifecycleHistory (1:N)
WorkflowRun --> LifecycleHistory (1:N)
Response Models
DocumentInfo
API response model for document information.
class DocumentInfo(BaseModel):
uri: str | None = None
source: str | None = None
file_size: int | None = None
mime_type: str | None = None
WorkflowRunWithDetails
Response model for workflow run with optional steps and document info.
class WorkflowRunWithDetails(BaseModel):
workflow_run: WorkflowRun
steps: list[RunStep] | None = None
document_info: DocumentInfo | None = None
PaginatedResponse
Generic paginated response model.
class PaginatedResponse[T](BaseModel):
items: list[T]
total: int
page: int
rows_per_page: int
total_pages: int
Database Initialization
Using CLI
This creates tables and runs migrations.
Using Alembic Directly
Programmatic
from soliplex.ingester.lib.models import Database
# Initialize with default URL from settings
await Database.initialize()
# Or with custom URL
await Database.initialize("sqlite+aiosqlite:///:memory:")
Python Cascading Delete Functions
delete_run_group
Cascading deletion function for run groups and all dependent records.
Location: src/soliplex/ingester/lib/wf/operations.py
Signature:
Database Compatibility:
- SQLite (via aiosqlite)
- PostgreSQL (via asyncpg)
Behavior:
- Verifies the RunGroup exists (raises
NotFoundErrorif not found) - Retrieves all WorkflowRun IDs for the RunGroup
- Deletes all RunStep records for those WorkflowRuns
- Deletes all LifecycleHistory records (for both RunGroup and WorkflowRuns)
- Deletes all WorkflowRun records in the group
- Deletes the RunGroup itself
- Returns deletion statistics
All operations occur within a single database transaction to ensure atomicity.
Usage:
from soliplex.ingester.lib.wf.operations import delete_run_group, NotFoundError
# Delete a run group and all dependent records
result = await delete_run_group(run_group_id=5)
print(f"Deleted {result['deleted_rungroups']} run group(s)")
print(f"Deleted {result['deleted_workflowruns']} workflow run(s)")
print(f"Deleted {result['deleted_runsteps']} run step(s)")
print(f"Deleted {result['deleted_lifecyclehistory']} lifecycle history record(s)")
print(f"Total records deleted: {result['total_deleted']}")
Returns:
{
"deleted_runsteps": 150,
"deleted_lifecyclehistory": 45,
"deleted_workflowruns": 10,
"deleted_rungroups": 1,
"total_deleted": 206
}
Raises:
NotFoundError- If the RunGroup with the specified ID does not exist
delete_document_uri_by_uri
Cascading deletion function for DocumentURI and all dependent records.
Location: src/soliplex/ingester/lib/operations.py
Signature:
Behavior:
- Finds the DocumentURI by
uriandsource - Counts how many DocumentURIs reference the same document hash
- If only one URI references the document (cascade delete):
- Deletes all RunStep records for WorkflowRuns with this doc_id
- Deletes all LifecycleHistory records for those WorkflowRuns
- Deletes all WorkflowRun records with this doc_id
- Deletes all DocumentBytes artifacts for this hash
- Deletes file artifacts from storage
- Deletes the DocumentURIHistory records
- Deletes the DocumentURI record
- Deletes the Document record
- If multiple URIs reference the document (preserve document):
- Deletes only the DocumentURIHistory records for this URI
- Deletes only the DocumentURI record
- Preserves the Document and all workflow-related records
Returns:
{
"deleted_document_uris": 1,
"deleted_uri_history": 3,
"deleted_documents": 1,
"deleted_workflow_runs": 2,
"deleted_run_steps": 10,
"deleted_lifecycle_history": 6,
"total_deleted": 23
}
Usage:
from soliplex.ingester.lib.operations import delete_document_uri_by_uri
from soliplex.ingester.lib.operations import DocumentURINotFoundError
try:
stats = await delete_document_uri_by_uri(
uri="/documents/report.pdf",
source="filesystem"
)
print(f"Total deleted: {stats['total_deleted']}")
except DocumentURINotFoundError as e:
print(f"Error: {e}")
Notes:
- All deletions occur within a single transaction
- Works with both SQLite and PostgreSQL
- Raises
DocumentURINotFoundErrorif the URI/source combination does not exist - Used by the
DELETE /api/v1/document/by-uriendpoint
Migrations
Location
src/soliplex/ingester/migrations/
Configuration
alembic.ini (project root)
Create Migration
Apply Migration
Rollback
Indexes
Consider adding these indexes for production:
-- Workflow processing queries
CREATE INDEX idx_runstep_status ON runstep(status, priority DESC);
CREATE INDEX idx_workflowrun_status ON workflowrun(status, batch_id);
CREATE INDEX idx_rungroup_batch ON rungroup(batch_id);
-- Document lookups
CREATE INDEX idx_documenturi_source ON documenturi(source);
-- Worker monitoring
CREATE INDEX idx_runstep_worker ON runstep(worker_id);
CREATE INDEX idx_workercheckin_last ON workercheckin(last_checkin);
Backup and Maintenance
SQLite Backup
PostgreSQL Backup
Vacuum (SQLite)
Analyze (PostgreSQL)
Query Examples
Find Failed Workflows
from soliplex.ingester.lib.models import WorkflowRun, RunStatus, get_session
from sqlmodel import select
async with get_session() as session:
query = select(WorkflowRun).where(WorkflowRun.status == RunStatus.FAILED)
results = await session.exec(query)
failed_runs = results.all()
Get Batch Statistics
from sqlmodel import func, select
async with get_session() as session:
query = select(
func.count(WorkflowRun.id).label("total"),
WorkflowRun.status
).where(
WorkflowRun.batch_id == batch_id
).group_by(WorkflowRun.status)
results = await session.exec(query)
stats = {row.status: row.total for row in results}