A map of the Python package under src/crystallise/, focused on what determines the shape of what you see at the HTTP boundary.
For the HTTP contract, see api-reference.html. This guide explains why those endpoints behave the way they do.
NetReady developers who already read the API reference and now want to interpret response fields, tune retry behaviour, or debug integration issues.
Everything below mirrors the actual code in src/crystallise/. File and line references are clickable hints; the code itself is the authority.
Python-internal patterns, test fixtures, import hygiene. We only cover behaviour visible from outside the process.
The backend is a two-layer Python codebase:
src/crystallise/ — all the AI logic, LLM plumbing, database access, prompts, and pipelines. Can be imported and used without FastAPI.api/ — HTTP routes, request/response schemas, auth middleware, async job bookkeeping.This guide documents the package layer. The HTTP layer is already fully documented in api-reference.html. The two are intentionally separated: NetReady talks to api/, but many response fields carry metadata that originates inside src/crystallise/. Understanding the package layer helps NetReady interpret those fields correctly and debug issues when something doesn't go to plan.
Each section below follows the same shape:
All 11 subpackages at a glance. Line counts are approximate and included as a rough indicator of where the complexity sits.
src/crystallise/ ├── llm/ (~645 lines) OpenAI client + retry + cost + error taxonomy ├── screening/ (~1400 lines) 4-stage screening pipeline + MockAIService ├── indexer/ (~545 lines) AutoIndexer: function-calling extraction + helpers ├── criteria/ (~1800 lines) Criteria AI: generate, PICOS, refine, consolidate, analyze-question ├── prompts/ (~1880 lines) Centralised prompt library + metadata registry ├── db/ (~1470 lines) SQLite/PostgreSQL dual backend, job persistence ├── config/ (~285 lines) Settings (env), ServiceConfig (runtime), model_capabilities ├── batch/ (~155 lines) Sync ThreadPoolExecutor + async Semaphore runners ├── common/ (~175 lines) JSON parsing, export, HTTP session helpers ├── openai_resources/ (~575 lines) OpenAI file/vector-store management (for Responses API) └── __init__.py Exports version info only
You primarily interact with this package indirectly via the HTTP API. The parts that matter most for integration are llm/ (error taxonomy, cost, retries — fields you see in every response), config/ (env-var boundary), and the pipeline-specific subpackages (screening/, indexer/, criteria/) that shape the results you receive.
Here's what happens inside the backend when NetReady sends a POST /v1/screening/jobs request. Every step below that produces a visible response field is marked.
api/auth_middleware.py) checks X-API-Key. On reject → HTTP 401 with {"detail": "Missing API key..."}. No package code runs yet.ScreeningRequest (api/schemas/screening.py). On reject → HTTP 400 with FastAPI's validation detail.api/routers/screening.py, calls crystallise.llm.cost.estimate_cost). Result is stored on the job as estimated_cost_usd.max_estimated_cost_usd was supplied and the estimate exceeds it, HTTP 400.id, project_id, status="pending", config, created_at, model_version. Saved to the database (src/crystallise/db/backend.py). Also cached in _active_jobs for fast polling.{job_id, status: "pending", progress: 0.0, stage: ""}. HTTP returns within milliseconds._run_screening_job). Sets OPENAI_API_KEY from the X-OpenAI-API-Key header if present. Updates status="running".crystallise.screening.pipeline.screen_papers. Each stage's progress is written back to the job record (so polling GETs see live updates).crystallise.llm.client.async_chat_completion opens a fresh AsyncOpenAI client, wraps the call in async_retry_with_backoff (retries on RateLimitError, APITimeoutError, APIConnectionError, InternalServerError), and on final failure raises an openai.AuthenticationError or lets other exceptions propagate up.crystallise.llm.errors.classify_openai_error) into one of 6 categories. For async jobs, the category is stored on the job as error_category, not returned as an HTTP error — the job finishes with status="failed".results and clusters arrays on the job record. Stage timings and final duration recorded.crystallise.llm.cost.tally_usage) across all OpenAI calls; final value replaces the initial estimate in estimated_cost_usd.status="completed" and completed_at timestamp._active_jobs first (in-memory), falling back to the DB if missed. Once terminal, the job is evicted from _active_jobs.Poll once every 1–2 seconds. If status == "failed", inspect error_category and error_retryable to decide whether to retry. Never rely on the estimated_cost_usd returned immediately at job creation — it's replaced by the real tally once the job completes.
src/crystallise/llm/The most important subpackage from an integration perspective. Every response you receive carries fields (error_category, usage, estimated_cost_usd, model_version) whose semantics are defined here.
client.py)The top-level entry points are async_chat_completion() (single request), async_batch_chat_completions() (parallel with semaphore), and responses_api_call() (OpenAI's Responses API, used by AutoIndexer).
Key design choices visible from the outside:
AsyncOpenAI per request (via async with). Avoids connection-pool exhaustion when handling many concurrent jobs. Cost: marginally more TCP setup overhead per call — negligible vs. LLM latency.api_key argument → OPENAI_API_KEY env → CRYSTALLISE_OPENAI_API_KEY env. The router sets OPENAI_API_KEY from the X-OpenAI-API-Key header before invoking the pipeline; so per-request keys pass through automatically.crystallise.config.model_capabilities) checks context window, output cap, and feature support (e.g. structured output, temperature handling) before the API call. Fails fast if the model can't serve the request — you'd see this as a validation error.output_schema, the client uses OpenAI's response_format parsing so the response comes back as a dict matching the schema.retry.py)Two retry helpers, async and sync, share the same error classification.
| Setting | Default | Used by |
|---|---|---|
| max_retries | 3 | Async (chat completions) |
| delays | [2.0, 8.0, 30.0] seconds | Async exponential backoff |
| max_retries (sync) | 5 | Sync (OpenAI file/vector-store ops) |
| base_backoff (sync) | 1.5 | Sync exponential base |
| jitter | 0.25s | Sync only, added per retry |
The async helper (used by screening and criteria) retries on RateLimitError, APITimeoutError, APIConnectionError, InternalServerError. It immediately re-raises AuthenticationError and LengthFinishReasonError (non-retryable). Anything else propagates unchanged.
errors.py)ErrorCategory is the canonical taxonomy. classify_openai_error() maps OpenAI SDK exceptions to categories; this is what drives the error_code field in error responses and the error_category field on failed jobs.
| Category | Retryable | OpenAI exceptions | HTTP |
|---|---|---|---|
| transient | yes | APIConnectionError, InternalServerError, 5xx by status | 500 |
| rate_limit | yes | RateLimitError, 429 by status | 429 |
| timeout | yes | APITimeoutError | 500 |
| auth | no | AuthenticationError, PermissionDeniedError, 401/403 by status | 401 |
| validation | no | BadRequestError, NotFoundError, 400 by status | 400 |
| unknown | no (default) | anything unclassified | 500 |
LLMError is a wrapper exception with a .category attribute and a .retryable property. Clients never see it directly — the FastAPI layer translates it to {message, error_code, retryable} via api/utils.py:classify_and_raise().
cost.py)Two functions: estimate_cost(model, input_tokens, output_tokens) returns a USD float; tally_usage(model, usage_list) aggregates a list of OpenAI usage objects/dicts into the usage payload you see in responses.
The tally_usage return shape is exactly:
{
"input_tokens": 320,
"output_tokens": 85,
"total_tokens": 405,
"estimated_cost_usd": 0.0002
}
Prices come from DEFAULT_PRICING_PER_1M, a hardcoded dict keyed by model name with input / cached_input / output rates per million tokens. Current entries cover gpt-5-nano, gpt-5-mini, gpt-4.1, and two gpt-5.4-* entries.
estimated_cost_usd carries a similar disclaimer. If cost caps matter to you, cross-check against OpenAI's current rates and don't treat this number as authoritative.Build a retry policy keyed on error_category: exponential backoff for rate_limit, moderate delay for transient/timeout, surface immediately for auth/validation. Don't double-retry: the backend already tries up to 3 times on retryable categories. For usage/estimated_cost_usd, trust the completed-job value (real tally), not the pre-flight estimate.
src/crystallise/screening/The entry point is screen_papers() in pipeline.py. It accepts a DataFrame of papers plus criteria + questions, and runs a 4-stage process. Each stage contributes specific columns/fields to the final response.
| Stage | Module | Model | What it adds |
|---|---|---|---|
| 1. Labelling | labelling.py | default (gpt-5-nano) | Each paper is scored 1–5 against criteria across repetitions AI calls. Columns added: score_1, ..., score_N, mean_score. |
| 2. Reasoning | reasoning.py | default | One AI call per paper generates rating_reasoning — a human-readable explanation of the score. |
| 3. Clustering | clustering.py | clustering (gpt-4.1 — needs 400K context) | Groups similar reasoning texts into thematic clusters. Produces the clusters array with cluster_id, label, count, and description. |
| 4. Cluster selection | cluster_selection.py | clustering | Assigns each paper to one or more clusters. Adds cluster_id (or cluster_ids[]) per paper in results. |
The pipeline supports progress_callback and cancel_event arguments. The router wires progress into the DB so polling GETs see live stage updates. Cancellation isn't currently wired to HTTP — the mechanism exists but NetReady can't trigger it externally.
Clusters type filter: the request field clusters_type: "include" | "exclude" | null controls which papers feed clustering. null (default) clusters both above- and below-threshold papers. Threshold is the threshold parameter, default 1.0.
MockAIService (mock.py) implements every stage's interface with canned data. See Mock mode internals.
Two models are used per screening job: model (per-request, default gpt-5-nano) for labelling+reasoning, and the hardcoded clustering model (gpt-4.1) for stages 3–4. If you override model in the request, you're only changing stages 1–2. The clustering model is fixed because it needs a 400K context window that only gpt-4.1 provides in the current roster.
src/crystallise/indexer/Unlike screening, AutoIndexer uses OpenAI function calling (not free-text output) so the model's response is validated against your field schema at the OpenAI boundary. This gives stronger guarantees on output shape.
The pipeline entry point is process_record() in pipeline.py. It:
IndexerField[] into an OpenAI function schema (schema_builder.py)tool_call response into per-field valuesconfidence, evidence (supporting text spans from the title/abstract), and reasoningHelper endpoints (field_suggestion.py, refinement.py, grouping.py) are optional:
field_suggestion.py — proposes a field set from project description + research questions. Returns IndexerField[] + ExtractionWarning[].refinement.py — reviews an existing field set and proposes add/modify/remove/merge actions.grouping.py — clusters extracted tag values into meaningful categories (post-extraction normalisation).Mode selection (test / sample / full) is applied by slicing the record list before dispatch: test = first 5, sample = first 20, full = all. Useful for dry-running cost on a partial set.
Because extraction is function-call driven, you can rely on the field names being exactly what you sent. confidence is the model's self-reported 0–1 score — treat it as a signal, not a calibrated probability. The helper endpoints (suggest/refine/group) are purely optional — you can drive AutoIndexer with hand-authored fields and skip them entirely.
src/crystallise/criteria/The largest subpackage by line count, because it hosts the multi-step generation/refinement logic. All endpoints are synchronous (one request, one response).
| Endpoint | Module/function | Output shape |
|---|---|---|
/analyze-question | ai_service.analyze_research_question | status + missing_elements[] + suggestion |
/generate | ai_service.generate_criteria | CriterionResponse[] |
/picos | ai_service.extract_pico | elements dict + gap_flags[] + contraindications[] |
/refine-context | ai_service.refine_context | refined_description + refined_research_questions[] + explanation |
/refine | ai_service.refine_criteria | CriterionResponse[] derived from conflict patterns |
/consolidate | ai_service.consolidate_criteria | DuplicateGroup[] + ConsolidationProposal[] + warnings[] |
All functions follow the same pattern: build a system + user prompt via crystallise.prompts.criteria, call async_chat_completion, json.loads() the response, and hand the parsed dict back to the router for Pydantic validation.
analyze-question is the most commonly called endpoint from NetReady's perspective (it mirrors the Streamlit demo shared earlier). It defaults to gpt-5-mini because the task is short and structured; the other criteria endpoints default to gpt-4.1 (better reasoning quality for schema design).
consolidate_criteria includes server-side safety guards that may reject LLM outputs before returning:
ai_confidence < 0.75 are dropped — a message is added to warnings explaining why.This is why you may see warnings populated even when duplicate_groups or consolidation_proposals are empty.
Criteria endpoints are synchronous but still make real LLM calls — expect 2–10 second latency. Always check the warnings array on /consolidate: an empty result with populated warnings means the LLM returned suggestions that failed quality thresholds, not that there were no duplicates to find.
src/crystallise/prompts/All system and user prompts are centralised here. Each prompt is registered with metadata in registry.py and exposed via GET /v1/config/prompts for discoverability.
PromptInfo shape:
| Field | Type | Meaning |
|---|---|---|
| name | string | Dotted identifier, e.g. criteria.question_analysis |
| service | string | One of screening, criteria, indexer |
| description | string | One-line purpose |
| has_variables | bool | Whether the prompt text is a format string (takes runtime parameters) |
| system_or_user | string | "system", "user", or "both" (the module exports both a system and a user prompt) |
Prompts themselves are plain Python strings or format-string functions in:
prompts/screening.py — 10 prompts for labelling, reasoning, clustering, cluster selection, etc.prompts/criteria.py — 8 prompts for generate/picos/refine/consolidate/analyze-questionprompts/indexer.py — 4 prompts for pipeline/refinement/grouping/field-suggestionThere is no version field on PromptInfo. The project shipped with a single current revision of each prompt.
If you need to audit which prompt was used for a given response, call GET /v1/config/prompts. The metadata is stable; the prompt text itself may evolve with the codebase. Don't hardcode prompt names in client-side logic — they're surfaced so you can display them to end-users for transparency, not to gate behaviour.
src/crystallise/db/The backend uses a single database for job state only. No user data, no papers, no extracted results are retained beyond a job's lifecycle. The service is a stateless compute layer from NetReady's perspective.
db/backend.py auto-selects between SQLite and PostgreSQL based on the CRYSTALLISE_DATABASE_URL environment variable:
.db file, or in-memory for tests)postgresql://... URI → PostgreSQL via psycopg2-binaryconvert_query_placeholders() translates SQLite-style ? placeholders to PostgreSQL %s at runtime so the same query strings work against both backends. Similarly, SQLite-only constructs like INSERT OR REPLACE INTO ... are rewritten to PostgreSQL INSERT ... ON CONFLICT ... DO UPDATE SET ....
Only two tables are currently used:
| Table | Populated by | Holds |
|---|---|---|
| screening_jobs | Async screening runs | job_id, status, progress, stage, results (JSON), clusters (JSON), timings, cost, error fields |
| indexer_jobs | Async indexer runs | job_id, status, progress, results (JSON), errors, usage, cost, error fields |
Neither table has user-identifying fields. project_id is an opaque integer NetReady passes in and gets back; the backend treats it as a correlation key, nothing more.
Neither table has a built-in retention policy. Jobs accumulate indefinitely unless manually cleaned. In-memory caches (_active_jobs dicts in both routers) hold running jobs for fast polling; a server restart loses them and pending jobs are marked as failed on next read with error_category: "server_restart".
Treat this service as a stateless compute engine. Once a job completes, pull the results and store them on your side — don't plan to query our database later. If a poll returns status: "failed" with error_category: "server_restart", the job was lost mid-run and must be re-submitted. This will be rare in practice but you should handle it gracefully.
src/crystallise/config/Three distinct configuration layers, each controlling different things:
Settings (env vars — settings.py)A pydantic_settings.BaseSettings subclass. Reads from CRYSTALLISE_* env vars. These are the "boot-time knobs" that NetReady (or whoever runs the service) sets once at startup.
| Field | Env var | Default |
|---|---|---|
| openai_api_key | CRYSTALLISE_OPENAI_API_KEY | empty (per-request header takes over) |
| database_url | CRYSTALLISE_DATABASE_URL | empty → SQLite |
| default_model | CRYSTALLISE_DEFAULT_MODEL | gpt-5-nano |
| clustering_model | CRYSTALLISE_CLUSTERING_MODEL | gpt-4.1 |
| max_concurrent_requests | CRYSTALLISE_MAX_CONCURRENT_REQUESTS | 10 |
| api_keys | CRYSTALLISE_API_KEYS | empty → dev mode |
| api_host / api_port | corresponding env vars | 0.0.0.0:8005 |
ServiceConfig + ConfigRegistry (runtime defaults — registry.py)Per-service tunables that can be adjusted at runtime via PUT /v1/config/services/{id}. Each service has a model, temperature, max_output_tokens, optional system_prompt, and a free-form extra dict.
Service IDs currently defined: screening, screening.clustering, extraction, indexer, criteria, deduplication.
model_capabilities (preflight checks — model_capabilities.py)Declarative table of what each model supports: context window size, max output tokens, temperature support, structured-output support. validate_request() is called inside async_chat_completion() before any API call — so a request that asks an incompatible model for a feature fails with a clear validation error rather than a cryptic OpenAI 400.
Settings is the boot boundary — you set env vars once at deploy time. ServiceConfig is the runtime boundary — you can adjust model/temperature per service via the config API if you need to experiment. ModelCapabilities is invisible to you except when a pre-flight error message references it; if you see "model X doesn't support feature Y", that's this module.
src/crystallise/batch/Two parallelism primitives: sync and async. Neither is exposed directly to NetReady — they're used inside the pipelines to control how many concurrent OpenAI calls the service makes.
| Module | Primitive | Where used |
|---|---|---|
| batch/runner.py | ThreadPoolExecutor with checkpointing | Sync screening stages (labelling loops over papers) |
| batch/async_runner.py | asyncio.Semaphore | Async batches (indexer /run uses this) |
| batch/progress.py | Callback types | Progress reporting into job records |
The sync runner accepts a worker function with the signature fn(item) -> (result, usage_or_none, error_or_none) and returns aggregated results. Checkpointing writes partial progress to disk so long jobs can resume.
Concurrency ceilings are controlled by Settings.max_concurrent_requests (default 10) for the async runner, and by IndexerRequest.max_workers (default 4) + batch_size (default 50) for the sync/indexer path.
You don't call these directly — they're buried behind the pipelines. But they explain why throughput has a ceiling: the service won't hammer OpenAI with more than 10 concurrent requests by default, regardless of how many papers you submit. If you're seeing slower-than-expected screening runs, the bottleneck is usually OpenAI latency × serial pipeline stages, not our parallelism cap.
Every mutating endpoint accepts "mock": true in the request body. Mock responses are served without any OpenAI call:
X-API-Key.src/crystallise/screening/mock.py's MockAIService class — replicates the real pipeline's interface with deterministic outputs.The mock service provides methods like generate_mock_evidence(), generate_mock_reasoning(), generate_mock_clusters(), generate_exclusion_criteria_from_context(), generate_from_conflicts(). Each returns fixed or context-derived data that matches the shape of the real pipeline's output.
Crucially: the examples in api-reference.html are lifted verbatim from these mock return values. This means the docs and the mock can't drift — if someone changes a mock field name, a curl against mock mode will stop matching the documented shape, and tests will catch it.
Run all of NetReady's CI against mock mode. Every endpoint shape you depend on is exercised without OpenAI cost, without credentials, and deterministically. The mock-mode examples in the API reference are a stable contract: if you code against them, your client works regardless of whether OpenAI itself is reachable.
A checklist of what you can rely on vs. what may change.
{message, error_code, retryable} for classified LLM errors, {detail: "..."} for validation/not-found errors.job_id, status, progress, stage. Terminal failures report via status: "failed" + error_category, never as HTTP errors./health/ready.project_id is an opaque correlation key.X-OpenAI-API-Key passthrough: every request can carry a user-specific key; the backend never stores it.DEFAULT_PRICING_PER_1M — hardcoded and will drift from OpenAI's public rates.ServiceConfig defaults may change between releases.[2s, 8s, 30s], may be tuned.project_id ↔ your internal identifiers).src/crystallise/ directly — it's small (~10k lines total) and every public entry point has a docstring.