Skip to content

autochecklist

autochecklist

autochecklist: A library of checklist generation and scoring methods for LLM evaluation.

Checklist

Bases: BaseModel

A collection of checklist items.

Source code in autochecklist/models.py
class Checklist(BaseModel):
    """A collection of checklist items."""

    id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    items: List[ChecklistItem]
    source_method: str  # e.g., "tick", "rlcf", "checkeval"
    generation_level: str  # "instance" or "corpus"
    created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

    # Optional context
    input: Optional[str] = None  # For instance-level
    corpus_description: Optional[str] = None  # For corpus-level

    metadata: Dict[str, Any] = Field(default_factory=dict)

    def __len__(self) -> int:
        return len(self.items)

    def by_category(self) -> Dict[str, "Checklist"]:
        """Group items by category, returning a dict of sub-checklists.

        Items with ``category=None`` are placed under ``"ungrouped"``.
        Key order matches the first occurrence of each category.
        """
        groups: OrderedDict[str, List[ChecklistItem]] = OrderedDict()
        for item in self.items:
            key = item.category or "ungrouped"
            groups.setdefault(key, []).append(item)
        return {
            name: Checklist(
                id=f"{self.id}_{name}",
                items=items,
                source_method=self.source_method,
                generation_level=self.generation_level,
                input=self.input,
                corpus_description=self.corpus_description,
                metadata={**self.metadata, "parent_id": self.id, "category": name},
            )
            for name, items in groups.items()
        }

    def save(self, path: str) -> None:
        """Save checklist to JSON file."""
        from .utils import save_json
        save_json(path, self.model_dump(mode="json"))

    @classmethod
    def load(cls, path: str) -> "Checklist":
        """Load checklist from JSON file."""
        from .utils import load_json
        return cls.model_validate(load_json(path))

    def to_text(self, numbered: bool = True) -> str:
        """Format checklist as text for prompts."""
        if numbered:
            return "\n".join(
                f"{i+1}. {item.question}"
                for i, item in enumerate(self.items)
            )
        return "\n".join(f"- {item.question}" for item in self.items)

by_category()

Group items by category, returning a dict of sub-checklists.

Items with category=None are placed under "ungrouped". Key order matches the first occurrence of each category.

Source code in autochecklist/models.py
def by_category(self) -> Dict[str, "Checklist"]:
    """Group items by category, returning a dict of sub-checklists.

    Items with ``category=None`` are placed under ``"ungrouped"``.
    Key order matches the first occurrence of each category.
    """
    groups: OrderedDict[str, List[ChecklistItem]] = OrderedDict()
    for item in self.items:
        key = item.category or "ungrouped"
        groups.setdefault(key, []).append(item)
    return {
        name: Checklist(
            id=f"{self.id}_{name}",
            items=items,
            source_method=self.source_method,
            generation_level=self.generation_level,
            input=self.input,
            corpus_description=self.corpus_description,
            metadata={**self.metadata, "parent_id": self.id, "category": name},
        )
        for name, items in groups.items()
    }

save(path)

Save checklist to JSON file.

Source code in autochecklist/models.py
def save(self, path: str) -> None:
    """Save checklist to JSON file."""
    from .utils import save_json
    save_json(path, self.model_dump(mode="json"))

load(path) classmethod

Load checklist from JSON file.

Source code in autochecklist/models.py
@classmethod
def load(cls, path: str) -> "Checklist":
    """Load checklist from JSON file."""
    from .utils import load_json
    return cls.model_validate(load_json(path))

to_text(numbered=True)

Format checklist as text for prompts.

Source code in autochecklist/models.py
def to_text(self, numbered: bool = True) -> str:
    """Format checklist as text for prompts."""
    if numbered:
        return "\n".join(
            f"{i+1}. {item.question}"
            for i, item in enumerate(self.items)
        )
    return "\n".join(f"- {item.question}" for item in self.items)

ChecklistItem

Bases: BaseModel

A single checklist item (yes/no question).

Source code in autochecklist/models.py
class ChecklistItem(BaseModel):
    """A single checklist item (yes/no question)."""

    id: str = Field(default_factory=lambda: str(uuid.uuid4())[:8])
    question: str
    weight: float = Field(default=100.0, ge=0.0, le=100.0)  # RLCF uses 0-100
    category: Optional[str] = None  # For grouping (e.g., "factuality", "format")
    metadata: Dict[str, Any] = Field(default_factory=dict)

    model_config = ConfigDict(frozen=True)

Score

Bases: BaseModel

Complete scoring result.

Source code in autochecklist/models.py
class Score(BaseModel):
    """Complete scoring result."""

    checklist_id: str
    response_id: Optional[str] = None

    item_scores: List[ItemScore]

    # Aggregated scores
    total_score: float  # 0-1, proportion of "yes"
    weighted_score: Optional[float] = None  # If weights are used (RLCF-style)
    normalized_score: Optional[float] = None  # RocketEval-style

    # Which metric Score.primary_score aliases
    primary_metric: str = "pass"  # "pass", "weighted", "normalized"

    @field_validator("primary_metric")
    @classmethod
    def _validate_primary_metric(cls, v: str) -> str:
        if v not in ("pass", "weighted", "normalized"):
            raise ValueError(
                f"primary_metric must be one of 'pass', 'weighted', 'normalized', got '{v}'"
            )
        return v

    # Metadata
    judge_model: str
    scoring_method: str  # e.g., "batch", "item", "normalized", "weighted"
    metadata: Dict[str, Any] = Field(default_factory=dict)

    @property
    def pass_rate(self) -> float:
        """Proportion of items answered 'yes'."""
        yes_count = sum(1 for s in self.item_scores if s.answer == ChecklistItemAnswer.YES)
        total = len(self.item_scores)
        return yes_count / total if total > 0 else 0.0

    @property
    def primary_score(self) -> float:
        """Returns whichever metric the pipeline designated as primary."""
        if self.primary_metric == "weighted":
            return self.weighted_score if self.weighted_score is not None else self.pass_rate
        elif self.primary_metric == "normalized":
            return self.normalized_score if self.normalized_score is not None else self.pass_rate
        return self.pass_rate

    @property
    def scaled_score_1_5(self) -> float:
        """Scale pass_rate to 1-5 range (InteractEval style).

        Formula: score = pass_rate * 4 + 1
        - pass_rate=0.0 → score=1.0
        - pass_rate=0.5 → score=3.0
        - pass_rate=1.0 → score=5.0
        """
        return self.pass_rate * 4 + 1

pass_rate property

Proportion of items answered 'yes'.

primary_score property

Returns whichever metric the pipeline designated as primary.

scaled_score_1_5 property

Scale pass_rate to 1-5 range (InteractEval style).

Formula: score = pass_rate * 4 + 1 - pass_rate=0.0 → score=1.0 - pass_rate=0.5 → score=3.0 - pass_rate=1.0 → score=5.0

ItemScore

Bases: BaseModel

Score for a single checklist item.

Source code in autochecklist/models.py
class ItemScore(BaseModel):
    """Score for a single checklist item."""

    item_id: str
    answer: ChecklistItemAnswer
    confidence: Optional[float] = None  # 0-1, for normalized scoring
    confidence_level: Optional[ConfidenceLevel] = None  # RocketEval-style
    reasoning: Optional[str] = None

ChecklistItemAnswer

Bases: str, Enum

Possible answers for a checklist item.

Source code in autochecklist/models.py
class ChecklistItemAnswer(str, Enum):
    """Possible answers for a checklist item."""
    YES = "yes"
    NO = "no"

ConfidenceLevel

Bases: str, Enum

Confidence levels for normalized scoring (RocketEval-style).

Source code in autochecklist/models.py
class ConfidenceLevel(str, Enum):
    """Confidence levels for normalized scoring (RocketEval-style)."""
    NO_10 = "no_10"    # No (10% confidence)
    NO_30 = "no_30"    # No (30% confidence)
    UNSURE = "unsure"  # Unsure (50%)
    YES_70 = "yes_70"  # Yes (70% confidence)
    YES_90 = "yes_90"  # Yes (90% confidence)

GroupedScore

Bases: BaseModel

Scoring result grouped by category (e.g., per-dimension scores).

Produced when scoring sub-checklists from Checklist.by_category().

Source code in autochecklist/models.py
class GroupedScore(BaseModel):
    """Scoring result grouped by category (e.g., per-dimension scores).

    Produced when scoring sub-checklists from ``Checklist.by_category()``.
    """

    scores: Dict[str, Score]  # category_name -> Score
    metadata: Dict[str, Any] = Field(default_factory=dict)

    @property
    def per_group_pass_rates(self) -> Dict[str, float]:
        """Pass rate for each category."""
        return {name: score.pass_rate for name, score in self.scores.items()}

    @property
    def pass_rate(self) -> float:
        """Macro-averaged pass rate (each category weighted equally)."""
        if not self.scores:
            return 0.0
        return sum(s.pass_rate for s in self.scores.values()) / len(self.scores)

    @property
    def micro_pass_rate(self) -> float:
        """Micro-averaged pass rate (pooled across all items)."""
        total_yes = 0
        total = 0
        for score in self.scores.values():
            for item_score in score.item_scores:
                total += 1
                if item_score.answer == ChecklistItemAnswer.YES:
                    total_yes += 1
        return total_yes / total if total > 0 else 0.0

    @property
    def mean_score(self) -> float:
        """Mean of Score.primary_score across all categories."""
        if not self.scores:
            return 0.0
        return sum(s.primary_score for s in self.scores.values()) / len(self.scores)

    def flatten(self) -> Score:
        """Merge all sub-scores into a single Score."""
        all_item_scores: List[ItemScore] = []
        judge_model = "unknown"
        scoring_method = "batch"
        for score in self.scores.values():
            all_item_scores.extend(score.item_scores)
            judge_model = score.judge_model
            scoring_method = score.scoring_method
        total_yes = sum(
            1 for s in all_item_scores if s.answer == ChecklistItemAnswer.YES
        )
        total = len(all_item_scores)
        return Score(
            checklist_id="grouped",
            item_scores=all_item_scores,
            total_score=total_yes / total if total > 0 else 0.0,
            judge_model=judge_model,
            scoring_method=scoring_method,
        )

per_group_pass_rates property

Pass rate for each category.

pass_rate property

Macro-averaged pass rate (each category weighted equally).

micro_pass_rate property

Micro-averaged pass rate (pooled across all items).

mean_score property

Mean of Score.primary_score across all categories.

flatten()

Merge all sub-scores into a single Score.

Source code in autochecklist/models.py
def flatten(self) -> Score:
    """Merge all sub-scores into a single Score."""
    all_item_scores: List[ItemScore] = []
    judge_model = "unknown"
    scoring_method = "batch"
    for score in self.scores.values():
        all_item_scores.extend(score.item_scores)
        judge_model = score.judge_model
        scoring_method = score.scoring_method
    total_yes = sum(
        1 for s in all_item_scores if s.answer == ChecklistItemAnswer.YES
    )
    total = len(all_item_scores)
    return Score(
        checklist_id="grouped",
        item_scores=all_item_scores,
        total_score=total_yes / total if total > 0 else 0.0,
        judge_model=judge_model,
        scoring_method=scoring_method,
    )

DeductiveInput

Bases: BaseModel

Input for deductive (dimension-based) generation (CheckEval, InteractEval).

Source code in autochecklist/models.py
class DeductiveInput(BaseModel):
    """Input for deductive (dimension-based) generation (CheckEval, InteractEval)."""
    name: str
    definition: str
    sub_dimensions: Optional[List[str]] = None

FeedbackInput

Bases: BaseModel

Input for feedback-based checklist generation.

Source code in autochecklist/models.py
class FeedbackInput(BaseModel):
    """Input for feedback-based checklist generation."""
    feedback_text: str
    source: Optional[str] = None  # e.g., "user", "expert"
    category: Optional[str] = None

InteractiveInput

Bases: BaseModel

Input for interactive (think-aloud based) generation (InteractEval).

Source code in autochecklist/models.py
class InteractiveInput(BaseModel):
    """Input for interactive (think-aloud based) generation (InteractEval)."""
    source: str  # "human" or "llm"
    dimension: str
    attributes: List[str]
    sample_context: Optional[str] = None

ChecklistResponse

Bases: BaseModel

LLM response schema for unweighted checklist generation (TICK, RocketEval).

Source code in autochecklist/models.py
class ChecklistResponse(BaseModel):
    """LLM response schema for unweighted checklist generation (TICK, RocketEval)."""
    questions: List[GeneratedQuestion]

WeightedChecklistResponse

Bases: BaseModel

LLM response schema for weighted checklist generation (RLCF).

Source code in autochecklist/models.py
class WeightedChecklistResponse(BaseModel):
    """LLM response schema for weighted checklist generation (RLCF)."""
    questions: List[GeneratedWeightedQuestion]

CategorizedChecklistResponse

Bases: BaseModel

LLM response schema for categorized checklist generation (OpenRubrics CRG).

Source code in autochecklist/models.py
class CategorizedChecklistResponse(BaseModel):
    """LLM response schema for categorized checklist generation (OpenRubrics CRG)."""
    questions: List[GeneratedCategorizedQuestion]

GeneratedCategorizedQuestion

Bases: GeneratedQuestion

A generated yes/no question with a category label.

Source code in autochecklist/models.py
class GeneratedCategorizedQuestion(GeneratedQuestion):
    """A generated yes/no question with a category label."""
    category: str

DirectGenerator

Bases: InstanceChecklistGenerator

Generate checklists using a prompt template + structured JSON output.

Can be configured via pipeline presets (built-in methods) or custom prompts.

Parameters:

Name Type Description Default
method_name str

Pipeline preset name (e.g., "tick") or custom name. If a known preset, loads config from PIPELINE_PRESETS.

'custom'
custom_prompt Optional[Union[str, Path]]

Custom prompt template. Pass a Path to load from file, or a str for raw prompt text. Overrides preset template.

None
response_schema Optional[type]

Pydantic model for JSON validation. Default: ChecklistResponse.

None
format_name Optional[str]

Format prompt file name (e.g., "checklist"). Default from preset.

None
max_items int

Maximum checklist items to return.

10
min_items int

Minimum expected items.

2
**kwargs Any

Passed to InstanceChecklistGenerator (model, temperature, etc.)

{}
Source code in autochecklist/generators/instance_level/direct.py
class DirectGenerator(InstanceChecklistGenerator):
    """Generate checklists using a prompt template + structured JSON output.

    Can be configured via pipeline presets (built-in methods) or custom prompts.

    Args:
        method_name: Pipeline preset name (e.g., "tick") or custom name.
            If a known preset, loads config from PIPELINE_PRESETS.
        custom_prompt: Custom prompt template. Pass a Path to load from file,
            or a str for raw prompt text. Overrides preset template.
        response_schema: Pydantic model for JSON validation. Default: ChecklistResponse.
        format_name: Format prompt file name (e.g., "checklist"). Default from preset.
        max_items: Maximum checklist items to return.
        min_items: Minimum expected items.
        **kwargs: Passed to InstanceChecklistGenerator (model, temperature, etc.)
    """

    def __init__(
        self,
        method_name: str = "custom",
        custom_prompt: Optional[Union[str, Path]] = None,
        response_schema: Optional[type] = None,
        format_name: Optional[str] = None,
        max_items: int = 10,
        min_items: int = 2,
        **kwargs: Any,
    ):
        # Load preset defaults if this is a known method
        from .pipeline_presets import PIPELINE_PRESETS

        preset = PIPELINE_PRESETS.get(method_name, {})

        # Apply preset defaults, allowing kwargs to override
        if "temperature" not in kwargs and "temperature" in preset:
            kwargs["temperature"] = preset["temperature"]

        super().__init__(**kwargs)

        self._method_name = method_name
        self.max_items = preset.get("max_items", max_items)
        self.min_items = preset.get("min_items", min_items)

        is_custom_schema = response_schema is not None
        self._response_schema = response_schema or preset.get(
            "response_schema", ChecklistResponse
        )
        if format_name is not None:
            self._format_name = format_name
        elif is_custom_schema:
            self._format_name = None
        else:
            self._format_name = preset.get("format_name", "checklist")

        # Load template
        if custom_prompt is not None:
            if isinstance(custom_prompt, Path):
                template_text = custom_prompt.read_text(encoding="utf-8")
            else:
                template_text = custom_prompt
        elif preset:
            template_text = load_template(
                preset["template_dir"], preset["template_name"]
            )
        else:
            raise ValueError(
                f"Unknown method '{method_name}' and no custom_prompt provided"
            )

        self._template = PromptTemplate(template_text)

    @property
    def method_name(self) -> str:
        return self._method_name

    @property
    def prompt_text(self) -> str:
        """The raw prompt template text."""
        return self._template.template

    def generate(
        self,
        input: str,
        target: Optional[str] = None,
        reference: Optional[str] = None,
        history: str = "",
        **kwargs: Any,
    ) -> Checklist:
        """Generate checklist from input using template + structured output.

        Automatically detects which placeholders the template needs and passes
        only those. This allows the same class to handle TICK (input only),
        RocketEval (input + reference + history), RLCF-direct
        (input + reference), etc.
        """
        # Build format kwargs — only pass placeholders that exist in template
        format_kwargs: dict[str, str] = {"input": input}
        if "target" in self._template._placeholders and target is not None:
            format_kwargs["target"] = target
        if "reference" in self._template._placeholders:
            if reference is None:
                raise ValueError(
                    f"{self._method_name} requires a reference target."
                )
            format_kwargs["reference"] = reference
        if "history" in self._template._placeholders:
            format_kwargs["history"] = history

        # Load format instructions (skip for custom schemas)
        format_text = load_format(self._format_name) if self._format_name else ""

        # Inject format inline if template has {format_instructions} placeholder,
        # otherwise append after the prompt (default).
        if "format_instructions" in self._template._placeholders:
            format_kwargs["format_instructions"] = format_text
            full_prompt = self._template.format(**format_kwargs)
        else:
            prompt = self._template.format(**format_kwargs)
            full_prompt = prompt + "\n\n" + format_text

        # Call model with structured output
        response_format = to_response_format(
            self._response_schema, self._method_name
        )
        raw = self._call_model(full_prompt, response_format=response_format)

        # Parse structured response
        items = self._parse_structured(raw)

        return Checklist(
            items=items,
            source_method=self.method_name,
            generation_level=self.generation_level,
            input=input,
            metadata={"raw_response": raw},
        )

    def _parse_structured(self, raw: str) -> list[ChecklistItem]:
        """Parse JSON response using Pydantic schema.

        Primary path: json.loads() succeeds (structured output).
        Fallback path: extract_json() extracts JSON from raw text.

        Auto-detects the list field and item fields from the schema,
        supporting both built-in and custom response schemas.
        """
        try:
            data = json.loads(raw)
        except json.JSONDecodeError:
            data = extract_json(raw)
        validated = self._response_schema.model_validate(data)

        # Find the list field (first List[BaseModel] field)
        item_list = self._get_item_list(validated)

        items = []
        for q in item_list[: self.max_items]:
            q_data = q.model_dump() if hasattr(q, "model_dump") else {}
            # Find question text: use 'question' field, or first str field
            question, question_key = self._get_question_text(q, q_data)
            weight = getattr(q, "weight", 100.0)
            category = getattr(q, "category", None)
            # Extra fields → metadata
            known = {question_key, "weight", "category"}
            extra = {k: v for k, v in q_data.items() if k not in known}
            items.append(
                ChecklistItem(
                    question=question,
                    weight=weight,
                    category=category,
                    metadata=extra if extra else {},
                )
            )
        return items

    @staticmethod
    def _get_item_list(validated: Any) -> list:
        """Extract the list of items from a validated response model."""
        # Try 'questions' first (built-in convention)
        if hasattr(validated, "questions"):
            return validated.questions
        # Auto-detect: first list attribute
        for field_name in type(validated).model_fields:
            value = getattr(validated, field_name)
            if isinstance(value, list):
                return value
        raise ValueError(
            f"Cannot find list field in {type(validated).__name__}. "
            "Schema must have a list field (e.g., 'questions', 'items')."
        )

    @staticmethod
    def _get_question_text(item: Any, item_data: dict) -> tuple[str, str]:
        """Extract question text and its field key from an item."""
        if isinstance(item, str):
            return item, "question"
        if hasattr(item, "question"):
            return item.question, "question"
        # Fall back to first str field
        for key, value in item_data.items():
            if isinstance(value, str):
                return value, key
        raise ValueError(
            f"Cannot find question text in {type(item).__name__}. "
            "Item must have a 'question' field or at least one str field."
        )

prompt_text property

The raw prompt template text.

generate(input, target=None, reference=None, history='', **kwargs)

Generate checklist from input using template + structured output.

Automatically detects which placeholders the template needs and passes only those. This allows the same class to handle TICK (input only), RocketEval (input + reference + history), RLCF-direct (input + reference), etc.

Source code in autochecklist/generators/instance_level/direct.py
def generate(
    self,
    input: str,
    target: Optional[str] = None,
    reference: Optional[str] = None,
    history: str = "",
    **kwargs: Any,
) -> Checklist:
    """Generate checklist from input using template + structured output.

    Automatically detects which placeholders the template needs and passes
    only those. This allows the same class to handle TICK (input only),
    RocketEval (input + reference + history), RLCF-direct
    (input + reference), etc.
    """
    # Build format kwargs — only pass placeholders that exist in template
    format_kwargs: dict[str, str] = {"input": input}
    if "target" in self._template._placeholders and target is not None:
        format_kwargs["target"] = target
    if "reference" in self._template._placeholders:
        if reference is None:
            raise ValueError(
                f"{self._method_name} requires a reference target."
            )
        format_kwargs["reference"] = reference
    if "history" in self._template._placeholders:
        format_kwargs["history"] = history

    # Load format instructions (skip for custom schemas)
    format_text = load_format(self._format_name) if self._format_name else ""

    # Inject format inline if template has {format_instructions} placeholder,
    # otherwise append after the prompt (default).
    if "format_instructions" in self._template._placeholders:
        format_kwargs["format_instructions"] = format_text
        full_prompt = self._template.format(**format_kwargs)
    else:
        prompt = self._template.format(**format_kwargs)
        full_prompt = prompt + "\n\n" + format_text

    # Call model with structured output
    response_format = to_response_format(
        self._response_schema, self._method_name
    )
    raw = self._call_model(full_prompt, response_format=response_format)

    # Parse structured response
    items = self._parse_structured(raw)

    return Checklist(
        items=items,
        source_method=self.method_name,
        generation_level=self.generation_level,
        input=input,
        metadata={"raw_response": raw},
    )

ContrastiveGenerator

Bases: DirectGenerator

Generate checklists by comparing candidate responses (RLCF candidate modes).

Extends DirectGenerator with candidate auto-generation. Candidates are generated by smaller models and included in the prompt for contrastive analysis.

Two modes: - rlcf_candidate: input + reference + candidates - rlcf_candidates_only: input + candidates (no reference)

Source code in autochecklist/generators/instance_level/contrastive.py
class ContrastiveGenerator(DirectGenerator):
    """Generate checklists by comparing candidate responses (RLCF candidate modes).

    Extends DirectGenerator with candidate auto-generation. Candidates are
    generated by smaller models and included in the prompt for contrastive
    analysis.

    Two modes:
    - rlcf_candidate: input + reference + candidates
    - rlcf_candidates_only: input + candidates (no reference)
    """

    def __init__(
        self,
        candidate_models: Optional[List[str]] = None,
        num_candidates: int = 4,
        generate_candidates: Optional[bool] = None,
        candidate_provider: Optional[str] = None,
        candidate_base_url: Optional[str] = None,
        candidate_api_key: Optional[str] = None,
        candidate_api_format: Optional[str] = None,
        **kwargs: Any,
    ):
        super().__init__(**kwargs)
        # Read generate_candidates from preset if not explicitly provided
        from .pipeline_presets import PIPELINE_PRESETS
        preset = PIPELINE_PRESETS.get(self._method_name, {})
        if generate_candidates is None:
            self.generate_candidates = preset.get("generate_candidates", True)
        else:
            self.generate_candidates = generate_candidates
        self.candidate_models = candidate_models
        self.num_candidates = num_candidates
        self._candidate_provider = candidate_provider
        self._candidate_base_url = candidate_base_url
        self._candidate_api_key = candidate_api_key
        self._candidate_api_format = candidate_api_format

    def generate(
        self,
        input: str,
        target: Optional[str] = None,
        reference: Optional[str] = None,
        candidates: Optional[Union[List[str], Dict[str, str]]] = None,
        **kwargs: Any,
    ) -> Checklist:
        """Generate checklist from input + candidates.

        Args:
            input: The instruction/query
            target: Alias for reference
            reference: Expert/reference target (optional for candidates_only)
            candidates: Candidate responses. Can be:
                - List[str]: multiple candidates (RLCF or listwise)
                - Dict with "chosen"/"rejected" keys (pairwise CRG)
                - None: auto-generated if candidate_models is set
            **kwargs: Additional arguments
        """
        # Get or generate candidates
        if candidates is None:
            if self.generate_candidates and self.candidate_models:
                candidates = self._generate_candidates(input)
            else:
                raise ValueError(
                    f"{self.method_name} requires 'candidates' argument."
                )

        # Delegate to _generate_with_candidates with raw candidates
        checklist = self._generate_with_candidates(
            input=input,
            candidates=candidates,
            reference=reference,
            **kwargs,
        )
        # Store raw candidates and count in metadata
        if isinstance(candidates, dict):
            checklist.metadata["candidates"] = list(candidates.values())
            checklist.metadata["num_candidates"] = 2
        else:
            checklist.metadata["candidates"] = candidates
            checklist.metadata["num_candidates"] = len(candidates)
        return checklist

    def _generate_with_candidates(
        self,
        input: str,
        candidates: Union[List[str], Dict[str, str]],
        reference: Optional[str] = None,
        **kwargs: Any,
    ) -> Checklist:
        """Build prompt with candidates and call model.

        Routes candidates to template placeholders based on type and template:
        - Dict → {chosen} + {rejected} placeholders (pairwise CRG)
        - List + {responses} placeholder → numbered Response blocks (listwise)
        - List + {candidates} placeholder → numbered Candidate blocks (RLCF)
        """
        placeholders = self._template._placeholders
        format_kwargs: dict[str, str] = {"input": input}

        # --- Route candidates to placeholders ---
        if isinstance(candidates, dict):
            # Pairwise: dict must have chosen+rejected, template must have those placeholders
            if "candidates" in placeholders:
                raise ValueError(
                    "Template has {candidates} placeholder but received dict candidates. "
                    "Use {chosen}/{rejected} placeholders for pairwise, or pass a list."
                )
            if not {"chosen", "rejected"} <= placeholders:
                raise ValueError(
                    "Template must have {chosen} and {rejected} placeholders for dict candidates."
                )
            if set(candidates.keys()) != {"chosen", "rejected"}:
                raise ValueError(
                    "Dict candidates must have exactly 'chosen' and 'rejected' keys, "
                    f"got: {set(candidates.keys())}"
                )
            format_kwargs["chosen"] = candidates["chosen"]
            format_kwargs["rejected"] = candidates["rejected"]
        else:
            # List candidates
            if "chosen" in placeholders or "rejected" in placeholders:
                raise ValueError(
                    "Template has {chosen}/{rejected} placeholders but received list candidates. "
                    "Pass a dict with 'chosen' and 'rejected' keys instead."
                )
            if "responses" in placeholders:
                format_kwargs["responses"] = self._format_ordered_responses(candidates)
            elif "candidates" in placeholders:
                format_kwargs["candidates"] = self._format_candidates(candidates)
            else:
                raise ValueError(
                    "Template must have {candidates} or {responses} placeholder for list candidates."
                )

        # --- Handle optional placeholders ---
        if "context" in placeholders:
            format_kwargs["context"] = kwargs.pop("context", "")

        if "reference" in placeholders:
            if reference is None:
                raise ValueError(
                    f"{self.method_name} requires a reference target."
                )
            format_kwargs["reference"] = reference

        # Load format instructions (skip for custom schemas)
        format_text = load_format(self._format_name) if self._format_name else ""

        # Inject format inline if template has {format_instructions} placeholder,
        # otherwise append after the prompt (default).
        if "format_instructions" in placeholders:
            format_kwargs["format_instructions"] = format_text
            full_prompt = self._template.format(**format_kwargs)
        else:
            prompt = self._template.format(**format_kwargs)
            full_prompt = prompt + "\n\n" + format_text

        response_format = to_response_format(
            self._response_schema, self._method_name
        )
        raw = self._call_model(full_prompt, response_format=response_format)
        items = self._parse_structured(raw)

        return Checklist(
            items=items,
            source_method=self.method_name,
            generation_level=self.generation_level,
            input=input,
            metadata={"raw_response": raw},
        )

    def _get_candidate_client(self) -> Any:
        """Get client for candidate generation.

        If any candidate_* provider param is set, creates a separate client.
        Otherwise falls back to the main client via _get_or_create_client().
        """
        if any([
            self._candidate_provider,
            self._candidate_base_url,
            self._candidate_api_key,
            self._candidate_api_format,
        ]):
            return get_client(
                provider=self._candidate_provider or self._provider,
                base_url=self._candidate_base_url,
                api_key=self._candidate_api_key,
                model=self.model,
                api_format=self._candidate_api_format,
            )
        return self._get_or_create_client()

    def _generate_candidates(self, input: str) -> List[str]:
        """Generate candidate responses using smaller models."""
        candidates = []
        client = self._get_candidate_client()

        if len(self.candidate_models) > 1:
            for model in self.candidate_models:
                resp = client.chat_completion(
                    model=model,
                    messages=[{"role": "user", "content": input}],
                    temperature=0.7,
                    max_tokens=1024,
                )
                candidates.append(resp["choices"][0]["message"]["content"])
        else:
            model = self.candidate_models[0]
            for _ in range(self.num_candidates):
                resp = client.chat_completion(
                    model=model,
                    messages=[{"role": "user", "content": input}],
                    temperature=0.9,
                    max_tokens=1024,
                )
                candidates.append(resp["choices"][0]["message"]["content"])

        return candidates

    def _format_ordered_responses(self, responses: List[str]) -> str:
        """Format responses as numbered Response blocks for listwise CRG."""
        formatted = []
        for i, response in enumerate(responses, 1):
            formatted.append(f"### Response {i}\n{response}")
        return "\n\n".join(formatted)

    def _format_candidates(self, candidates: List[str]) -> str:
        """Format candidate responses for prompt injection."""
        formatted = []
        for i, candidate in enumerate(candidates, 1):
            formatted.append(f"### Candidate {i}\n{candidate}")
        return "\n\n".join(formatted)

generate(input, target=None, reference=None, candidates=None, **kwargs)

Generate checklist from input + candidates.

Parameters:

Name Type Description Default
input str

The instruction/query

required
target Optional[str]

Alias for reference

None
reference Optional[str]

Expert/reference target (optional for candidates_only)

None
candidates Optional[Union[List[str], Dict[str, str]]]

Candidate responses. Can be: - List[str]: multiple candidates (RLCF or listwise) - Dict with "chosen"/"rejected" keys (pairwise CRG) - None: auto-generated if candidate_models is set

None
**kwargs Any

Additional arguments

{}
Source code in autochecklist/generators/instance_level/contrastive.py
def generate(
    self,
    input: str,
    target: Optional[str] = None,
    reference: Optional[str] = None,
    candidates: Optional[Union[List[str], Dict[str, str]]] = None,
    **kwargs: Any,
) -> Checklist:
    """Generate checklist from input + candidates.

    Args:
        input: The instruction/query
        target: Alias for reference
        reference: Expert/reference target (optional for candidates_only)
        candidates: Candidate responses. Can be:
            - List[str]: multiple candidates (RLCF or listwise)
            - Dict with "chosen"/"rejected" keys (pairwise CRG)
            - None: auto-generated if candidate_models is set
        **kwargs: Additional arguments
    """
    # Get or generate candidates
    if candidates is None:
        if self.generate_candidates and self.candidate_models:
            candidates = self._generate_candidates(input)
        else:
            raise ValueError(
                f"{self.method_name} requires 'candidates' argument."
            )

    # Delegate to _generate_with_candidates with raw candidates
    checklist = self._generate_with_candidates(
        input=input,
        candidates=candidates,
        reference=reference,
        **kwargs,
    )
    # Store raw candidates and count in metadata
    if isinstance(candidates, dict):
        checklist.metadata["candidates"] = list(candidates.values())
        checklist.metadata["num_candidates"] = 2
    else:
        checklist.metadata["candidates"] = candidates
        checklist.metadata["num_candidates"] = len(candidates)
    return checklist

InductiveGenerator

Bases: CorpusChecklistGenerator

Generator that induces checklists from observations.

Takes a collection of evaluative observations (e.g., reviewer feedback, user complaints, quality notes, strengths/weaknesses) and generates a comprehensive yes/no checklist that addresses them.

The pipeline applies multiple refinement steps: - Deduplication (merge similar questions) - Tagging (filter for applicability) - Selection (beam search for diverse subset)

Source code in autochecklist/generators/corpus_level/inductive.py
class InductiveGenerator(CorpusChecklistGenerator):
    """Generator that induces checklists from observations.

    Takes a collection of evaluative observations (e.g., reviewer feedback,
    user complaints, quality notes, strengths/weaknesses) and generates a
    comprehensive yes/no checklist that addresses them.

    The pipeline applies multiple refinement steps:
    - Deduplication (merge similar questions)
    - Tagging (filter for applicability)
    - Selection (beam search for diverse subset)
    """

    @property
    def method_name(self) -> str:
        return "feedback"

    def __init__(
        self,
        model: Optional[str] = None,
        temperature: float = 0.7,
        api_key: Optional[str] = None,
        # Refinement parameters
        dedup_threshold: float = 0.85,
        max_questions: int = 20,
        beam_width: int = 5,
        # Batching
        batch_size: int = 100,
        # Unit testing
        max_unit_test_references: int = 20,
        # API keys
        embedding_api_key: Optional[str] = None,
        # Custom prompt
        custom_prompt: Optional[Union[str, Path]] = None,
        **kwargs,
    ):
        super().__init__(model=model, api_key=api_key, **kwargs)
        self.temperature = temperature
        self.dedup_threshold = dedup_threshold
        self.max_questions = max_questions
        self.beam_width = beam_width
        self.batch_size = batch_size
        self.max_unit_test_references = max_unit_test_references
        self.embedding_api_key = embedding_api_key

        # Load generation prompt template (custom or default)
        if custom_prompt is not None:
            if isinstance(custom_prompt, Path):
                template_str = custom_prompt.read_text(encoding="utf-8")
            else:
                template_str = custom_prompt
        else:
            template_str = load_template("generators/feedback", "generate")
        self._generate_template = PromptTemplate(template_str)

    def generate(
        self,
        observations: List[str],
        domain: str = "general responses",
        skip_dedup: bool = False,
        skip_tagging: bool = False,
        skip_selection: bool = False,
        skip_unit_testing: bool = True,
        references: Optional[List[str]] = None,
        verbose: bool = False,
        **kwargs: Any,
    ) -> Checklist:
        """Generate a checklist from observations.

        Args:
            observations: List of evaluative observation strings (feedback,
                review comments, quality notes, etc.)
            domain: Domain description for the prompt
            skip_dedup: Skip deduplication step
            skip_tagging: Skip tagging/filtering step
            skip_selection: Skip subset selection step
            skip_unit_testing: Skip unit testing step (default True)
            references: Optional reference targets for unit testing
            verbose: Print progress at each pipeline stage
            **kwargs: Additional arguments

        Returns:
            Generated and refined Checklist
        """
        if not observations:
            return Checklist(
                items=[],
                source_method="feedback",
                generation_level="corpus",
                metadata={"observation_count": 0},
            )

        # Step 1: Generate initial questions from observations
        raw_questions = self._generate_questions(observations, domain=domain)

        # Convert to checklist items
        items = []
        for i, q in enumerate(raw_questions):
            items.append(
                ChecklistItem(
                    id=f"fb-{i}",
                    question=q["question"],
                    metadata={
                        "source_feedback_indices": q.get("source_feedback_indices", []),
                    },
                )
            )

        checklist = Checklist(
            items=items,
            source_method="feedback",
            generation_level="corpus",
            metadata={
                "observation_count": len(observations),
                "raw_question_count": len(items),
            },
        )

        if verbose:
            print(f"[InductiveGenerator] Generated {len(items)} raw questions from {len(observations)} observations")

        # Step 2: Deduplicate
        if not skip_dedup and len(checklist.items) > 1:
            before_count = len(checklist.items)
            dedup = Deduplicator(
                similarity_threshold=self.dedup_threshold,
                model=self.model,
                api_key=self.api_key,
                embedding_api_key=self.embedding_api_key,
            )
            checklist = dedup.refine(checklist)
            if verbose:
                after_count = len(checklist.items)
                clusters_merged = checklist.metadata.get("clusters_merged", 0)
                print(f"[InductiveGenerator] Deduplication: {before_count}{after_count} questions ({clusters_merged} clusters merged)")

        # Step 3: Tag and filter
        if not skip_tagging and len(checklist.items) > 0:
            before_count = len(checklist.items)
            tagger = Tagger(
                model=self.model,
                api_key=self.api_key,
            )
            checklist = tagger.refine(checklist)
            if verbose:
                after_count = len(checklist.items)
                filtered_count = checklist.metadata.get("filtered_count", 0)
                print(f"[InductiveGenerator] Tagging: {before_count}{after_count} questions ({filtered_count} filtered out)")

        # Step 4: Unit test (optional, requires references)
        if not skip_unit_testing and references is not None and len(checklist.items) > 0:
            import random as _random
            before_count = len(checklist.items)
            refs = references
            if len(refs) > self.max_unit_test_references:
                _random.seed(0)
                refs = _random.sample(refs, self.max_unit_test_references)
            raw_samples = [{"id": f"ref-{i}", "text": r} for i, r in enumerate(refs)]
            unit_tester = UnitTester(
                model=self.model,
                api_key=self.api_key,
            )
            checklist = unit_tester.refine(checklist, raw_samples=raw_samples)
            if verbose:
                after_count = len(checklist.items)
                print(f"[InductiveGenerator] Unit testing: {before_count}{after_count} questions")

        # Step 5: Select diverse subset
        if not skip_selection and len(checklist.items) > self.max_questions:
            before_count = len(checklist.items)
            selector = Selector(
                max_questions=self.max_questions,
                beam_width=self.beam_width,
                embedding_api_key=self.embedding_api_key,
                observations=observations,
            )
            checklist = selector.refine(checklist)
            if verbose:
                after_count = len(checklist.items)
                diversity_score = checklist.metadata.get("diversity_score", 0)
                print(f"[InductiveGenerator] Selection: {before_count}{after_count} questions (diversity={diversity_score:.3f})")

        if verbose:
            print(f"[InductiveGenerator] Final checklist: {len(checklist.items)} questions")

        return checklist

    def _generate_questions(
        self,
        observations: List[str],
        domain: str = "general responses",
        verbose: bool = False,
    ) -> List[Dict[str, Any]]:
        """Generate questions from observations using LLM, batched for scalability.

        Splits observations into batches of ``self.batch_size`` items, makes one
        LLM call per batch, and pools results. Observation indices in each batch
        prompt use **global** numbering so ``source_feedback_indices`` are
        correct across batches.

        Args:
            observations: List of observation strings
            domain: Domain description
            verbose: Print per-batch progress

        Returns:
            List of question dicts with 'question' and 'source_feedback_indices'
        """
        response_format = {
            "type": "json_schema",
            "json_schema": {
                "name": "generated_questions",
                "strict": True,
                "schema": {
                    "type": "object",
                    "properties": {
                        "questions": {
                            "type": "array",
                            "items": {
                                "type": "object",
                                "properties": {
                                    "question": {"type": "string"},
                                    "source_feedback_indices": {
                                        "type": "array",
                                        "items": {"type": "integer"},
                                    },
                                },
                                "required": ["question", "source_feedback_indices"],
                                "additionalProperties": False,
                            },
                        },
                    },
                    "required": ["questions"],
                    "additionalProperties": False,
                },
            },
        }

        client = self._get_or_create_client()
        n_batches = math.ceil(len(observations) / self.batch_size)
        all_questions: List[Dict[str, Any]] = []

        for batch_idx in range(n_batches):
            start = batch_idx * self.batch_size
            end = min(start + self.batch_size, len(observations))
            batch = observations[start:end]

            # Format with global indices
            feedback_text = "\n".join(
                f"[{start + i}] {f}" for i, f in enumerate(batch)
            )

            prompt = self._generate_template.format(
                domain=domain,
                feedback=feedback_text,
            )

            response = client.chat_completion(
                model=self.model or "openai/gpt-4o",
                messages=[{"role": "user", "content": prompt}],
                temperature=self.temperature,
                max_tokens=self.max_tokens,
                response_format=response_format,
            )

            content = response["choices"][0]["message"]["content"]

            try:
                result = json.loads(content)
                batch_questions = result.get("questions", [])
            except json.JSONDecodeError:
                batch_questions = []

            all_questions.extend(batch_questions)

            if verbose:
                print(
                    f"[InductiveGenerator] Batch {batch_idx + 1}/{n_batches}: "
                    f"items [{start}..{end - 1}] → {len(batch_questions)} questions"
                )

        return all_questions

generate(observations, domain='general responses', skip_dedup=False, skip_tagging=False, skip_selection=False, skip_unit_testing=True, references=None, verbose=False, **kwargs)

Generate a checklist from observations.

Parameters:

Name Type Description Default
observations List[str]

List of evaluative observation strings (feedback, review comments, quality notes, etc.)

required
domain str

Domain description for the prompt

'general responses'
skip_dedup bool

Skip deduplication step

False
skip_tagging bool

Skip tagging/filtering step

False
skip_selection bool

Skip subset selection step

False
skip_unit_testing bool

Skip unit testing step (default True)

True
references Optional[List[str]]

Optional reference targets for unit testing

None
verbose bool

Print progress at each pipeline stage

False
**kwargs Any

Additional arguments

{}

Returns:

Type Description
Checklist

Generated and refined Checklist

Source code in autochecklist/generators/corpus_level/inductive.py
def generate(
    self,
    observations: List[str],
    domain: str = "general responses",
    skip_dedup: bool = False,
    skip_tagging: bool = False,
    skip_selection: bool = False,
    skip_unit_testing: bool = True,
    references: Optional[List[str]] = None,
    verbose: bool = False,
    **kwargs: Any,
) -> Checklist:
    """Generate a checklist from observations.

    Args:
        observations: List of evaluative observation strings (feedback,
            review comments, quality notes, etc.)
        domain: Domain description for the prompt
        skip_dedup: Skip deduplication step
        skip_tagging: Skip tagging/filtering step
        skip_selection: Skip subset selection step
        skip_unit_testing: Skip unit testing step (default True)
        references: Optional reference targets for unit testing
        verbose: Print progress at each pipeline stage
        **kwargs: Additional arguments

    Returns:
        Generated and refined Checklist
    """
    if not observations:
        return Checklist(
            items=[],
            source_method="feedback",
            generation_level="corpus",
            metadata={"observation_count": 0},
        )

    # Step 1: Generate initial questions from observations
    raw_questions = self._generate_questions(observations, domain=domain)

    # Convert to checklist items
    items = []
    for i, q in enumerate(raw_questions):
        items.append(
            ChecklistItem(
                id=f"fb-{i}",
                question=q["question"],
                metadata={
                    "source_feedback_indices": q.get("source_feedback_indices", []),
                },
            )
        )

    checklist = Checklist(
        items=items,
        source_method="feedback",
        generation_level="corpus",
        metadata={
            "observation_count": len(observations),
            "raw_question_count": len(items),
        },
    )

    if verbose:
        print(f"[InductiveGenerator] Generated {len(items)} raw questions from {len(observations)} observations")

    # Step 2: Deduplicate
    if not skip_dedup and len(checklist.items) > 1:
        before_count = len(checklist.items)
        dedup = Deduplicator(
            similarity_threshold=self.dedup_threshold,
            model=self.model,
            api_key=self.api_key,
            embedding_api_key=self.embedding_api_key,
        )
        checklist = dedup.refine(checklist)
        if verbose:
            after_count = len(checklist.items)
            clusters_merged = checklist.metadata.get("clusters_merged", 0)
            print(f"[InductiveGenerator] Deduplication: {before_count}{after_count} questions ({clusters_merged} clusters merged)")

    # Step 3: Tag and filter
    if not skip_tagging and len(checklist.items) > 0:
        before_count = len(checklist.items)
        tagger = Tagger(
            model=self.model,
            api_key=self.api_key,
        )
        checklist = tagger.refine(checklist)
        if verbose:
            after_count = len(checklist.items)
            filtered_count = checklist.metadata.get("filtered_count", 0)
            print(f"[InductiveGenerator] Tagging: {before_count}{after_count} questions ({filtered_count} filtered out)")

    # Step 4: Unit test (optional, requires references)
    if not skip_unit_testing and references is not None and len(checklist.items) > 0:
        import random as _random
        before_count = len(checklist.items)
        refs = references
        if len(refs) > self.max_unit_test_references:
            _random.seed(0)
            refs = _random.sample(refs, self.max_unit_test_references)
        raw_samples = [{"id": f"ref-{i}", "text": r} for i, r in enumerate(refs)]
        unit_tester = UnitTester(
            model=self.model,
            api_key=self.api_key,
        )
        checklist = unit_tester.refine(checklist, raw_samples=raw_samples)
        if verbose:
            after_count = len(checklist.items)
            print(f"[InductiveGenerator] Unit testing: {before_count}{after_count} questions")

    # Step 5: Select diverse subset
    if not skip_selection and len(checklist.items) > self.max_questions:
        before_count = len(checklist.items)
        selector = Selector(
            max_questions=self.max_questions,
            beam_width=self.beam_width,
            embedding_api_key=self.embedding_api_key,
            observations=observations,
        )
        checklist = selector.refine(checklist)
        if verbose:
            after_count = len(checklist.items)
            diversity_score = checklist.metadata.get("diversity_score", 0)
            print(f"[InductiveGenerator] Selection: {before_count}{after_count} questions (diversity={diversity_score:.3f})")

    if verbose:
        print(f"[InductiveGenerator] Final checklist: {len(checklist.items)} questions")

    return checklist

DeductiveGenerator

Bases: CorpusChecklistGenerator

Generate checklists from evaluation dimension definitions.

CheckEval creates binary yes/no evaluation questions organized by dimension and sub-dimension. Questions can be provided as seeds or generated from dimension definitions.

Parameters:

Name Type Description Default
model Optional[str]

OpenRouter model ID for generation

None
augmentation_mode str | AugmentationMode

One of seed, elaboration, or diversification

SEED
task_type str

Type of task being evaluated (e.g., "summarization", "dialog")

'general'
Example

checkeval = DeductiveGenerator(model="openai/gpt-4o-mini") dimensions = [ ... DeductiveInput( ... name="coherence", ... definition="The response should maintain logical flow.", ... sub_dimensions=["Logical Flow", "Consistency"] ... ) ... ] checklist = checkeval.generate(dimensions=dimensions)

Source code in autochecklist/generators/corpus_level/deductive.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
class DeductiveGenerator(CorpusChecklistGenerator):
    """Generate checklists from evaluation dimension definitions.

    CheckEval creates binary yes/no evaluation questions organized by
    dimension and sub-dimension. Questions can be provided as seeds
    or generated from dimension definitions.

    Args:
        model: OpenRouter model ID for generation
        augmentation_mode: One of seed, elaboration, or diversification
        task_type: Type of task being evaluated (e.g., "summarization", "dialog")

    Example:
        >>> checkeval = DeductiveGenerator(model="openai/gpt-4o-mini")
        >>> dimensions = [
        ...     DeductiveInput(
        ...         name="coherence",
        ...         definition="The response should maintain logical flow.",
        ...         sub_dimensions=["Logical Flow", "Consistency"]
        ...     )
        ... ]
        >>> checklist = checkeval.generate(dimensions=dimensions)
    """

    def __init__(
        self,
        model: Optional[str] = None,
        augmentation_mode: str | AugmentationMode = AugmentationMode.SEED,
        task_type: str = "general",
        api_key: Optional[str] = None,
        **kwargs,
    ):
        super().__init__(model=model, api_key=api_key, **kwargs)

        # Handle string augmentation mode
        if isinstance(augmentation_mode, str):
            augmentation_mode = AugmentationMode(augmentation_mode.lower())
        self.augmentation_mode = augmentation_mode

        self.task_type = task_type

        # Load prompt templates
        self._generate_template = PromptTemplate(load_template("generators/checkeval", "generate"))
        self._augment_template = PromptTemplate(load_template("generators/checkeval", "augment"))
        self._filter_template = PromptTemplate(load_template("generators/checkeval", "filter"))

    @property
    def method_name(self) -> str:
        """Return the method name for this generator."""
        return "checkeval"

    def generate(
        self,
        dimensions: List[DeductiveInput],
        seed_questions: Optional[Dict[str, Dict[str, List[str]]]] = None,
        augment: bool = True,
        max_questions: Optional[int] = None,
        apply_filtering: bool = False,
        verbose: bool = False,
        **kwargs: Any,
    ) -> Checklist:
        """Generate a checklist from dimension definitions.

        Args:
            dimensions: List of DeductiveInput with name, definition, sub_dimensions
            seed_questions: Optional pre-defined questions by dimension/sub-aspect
                Format: {dimension: {sub_aspect: [questions]}}
            augment: Whether to augment seed questions (default True)
            max_questions: Maximum number of questions to include
            apply_filtering: Whether to apply Tagger and Deduplicator refiners (default False)
            verbose: Print progress at each stage (default False)
            **kwargs: Additional parameters passed to generation

        Returns:
            Checklist with generated questions
        """
        if not dimensions:
            return Checklist(
                id=str(uuid.uuid4()),
                items=[],
                source_method="checkeval",
                generation_level="corpus",
                metadata={
                    "dimension_count": 0,
                    "dimensions": [],
                    "augmentation_mode": self.augmentation_mode.value,
                },
            )

        all_questions = []

        for dimension in dimensions:
            # Check if seed questions provided for this dimension
            dim_seed_questions = (
                seed_questions.get(dimension.name) if seed_questions else None
            )

            if dim_seed_questions and not augment:
                # Use seed questions directly without augmentation
                questions = self._convert_seed_to_questions(
                    dim_seed_questions, dimension.name
                )
            elif self.augmentation_mode == AugmentationMode.COMBINED:
                # Paper-faithful: generate seeds, then run both augmentation
                # modes independently, then merge all three pools
                if dim_seed_questions:
                    seeds = self._convert_seed_to_questions(
                        dim_seed_questions, dimension.name
                    )
                else:
                    seeds = self._generate_questions(
                        dimension,
                        mode=AugmentationMode.SEED,
                        task_type=self.task_type,
                    )

                # Tag seeds
                for q in seeds:
                    q["augmentation_source"] = "seed"

                # Run elaboration independently from seeds
                elaborated = self._augment_questions(
                    seeds, dimension,
                    mode=AugmentationMode.ELABORATION,
                    task_type=self.task_type,
                )
                for q in elaborated:
                    q.setdefault("augmentation_source", "elaboration")

                # Run diversification independently from seeds
                diversified = self._augment_questions(
                    seeds, dimension,
                    mode=AugmentationMode.DIVERSIFICATION,
                    task_type=self.task_type,
                )
                for q in diversified:
                    q.setdefault("augmentation_source", "diversification")

                # Merge all three pools, deduplicating by question text
                questions = self._merge_and_deduplicate(seeds, elaborated, diversified)
            elif dim_seed_questions and augment:
                # Augment from seed questions
                seed_list = self._convert_seed_to_questions(
                    dim_seed_questions, dimension.name
                )
                questions = self._augment_questions(
                    seed_list,
                    dimension,
                    mode=self.augmentation_mode,
                    task_type=self.task_type,
                )
            else:
                # Generate questions from dimension definition
                questions = self._generate_questions(
                    dimension,
                    mode=self.augmentation_mode,
                    task_type=self.task_type,
                )

            all_questions.extend(questions)

        # Apply max_questions limit if specified
        if max_questions and len(all_questions) > max_questions:
            all_questions = all_questions[:max_questions]

        # Convert to ChecklistItems
        items = []
        for q in all_questions:
            meta = {
                "dimension": q.get("dimension"),
                "sub_aspect": q.get("sub_aspect"),
            }
            if "augmentation_source" in q:
                meta["augmentation_source"] = q["augmentation_source"]
            item = ChecklistItem(
                id=str(uuid.uuid4()),
                question=q["question"],
                category=q.get("dimension"),
                metadata=meta,
            )
            items.append(item)

        checklist = Checklist(
            id=str(uuid.uuid4()),
            items=items,
            source_method="checkeval",
            generation_level="corpus",
            metadata={
                "dimension_count": len(dimensions),
                "dimensions": [d.name for d in dimensions],
                "augmentation_mode": self.augmentation_mode.value,
                "task_type": self.task_type,
            },
        )

        if verbose:
            print(f"[DeductiveGenerator] Generated {len(checklist.items)} questions from {len(dimensions)} dimensions")

        # Apply filtering if enabled (CheckEval paper §4.2)
        if apply_filtering and len(checklist.items) > 0:
            # Build dimension lookup for consistency checks
            dim_lookup = {d.name: d.definition for d in dimensions}

            # Stage 1 & 2: Alignment + Dimension Consistency (CheckEval-specific)
            before_count = len(checklist.items)
            filtered_items, filter_stats = self._filter_questions(
                checklist.items, dim_lookup, verbose
            )

            # Update checklist with filtered items
            checklist = Checklist(
                id=checklist.id,
                items=filtered_items,
                source_method=checklist.source_method,
                generation_level=checklist.generation_level,
                input=checklist.input,
                metadata={
                    **checklist.metadata,
                    "alignment_filtered": filter_stats["alignment_filtered"],
                    "consistency_filtered": filter_stats["consistency_filtered"],
                },
            )

            if verbose:
                after_count = len(checklist.items)
                print(
                    f"[DeductiveGenerator] Filtering: {before_count}{after_count} questions "
                    f"({filter_stats['alignment_filtered']} alignment, "
                    f"{filter_stats['consistency_filtered']} consistency)"
                )

            # Stage 3: Redundancy Removal (via Deduplicator)
            if len(checklist.items) > 1:
                before_count = len(checklist.items)
                dedup = Deduplicator(
                    similarity_threshold=0.85,
                    model=self.model,
                    api_key=self.api_key,
                )
                checklist = dedup.refine(checklist)
                if verbose:
                    after_count = len(checklist.items)
                    merged = checklist.metadata.get("clusters_merged", 0)
                    print(f"[DeductiveGenerator] Deduplication: {before_count}{after_count} questions ({merged} clusters merged)")

        if verbose:
            print(f"[DeductiveGenerator] Final checklist: {len(checklist.items)} questions")

        return checklist

    def generate_grouped(
        self,
        dimensions: List[DeductiveInput],
        **kwargs: Any,
    ) -> Dict[str, "Checklist"]:
        """Generate a checklist and split it by dimension category.

        Convenience wrapper around ``generate().by_category()``.

        Args:
            dimensions: List of DeductiveInput with name, definition, sub_dimensions
            **kwargs: Additional arguments passed to generate()

        Returns:
            Dict mapping dimension name to sub-Checklist
        """
        checklist = self.generate(dimensions=dimensions, **kwargs)
        return checklist.by_category()

    def _generate_questions(
        self,
        dimension: DeductiveInput,
        mode: AugmentationMode,
        task_type: str,
    ) -> List[Dict[str, Any]]:
        """Generate questions from a dimension definition.

        Args:
            dimension: Dimension with name, definition, sub_dimensions
            mode: Augmentation mode (affects how many questions)
            task_type: Type of task being evaluated

        Returns:
            List of question dicts with question, sub_aspect, dimension
        """
        # Determine questions per sub-dimension based on mode
        questions_per_sub = {
            AugmentationMode.SEED: 2,
            AugmentationMode.ELABORATION: 5,
            AugmentationMode.DIVERSIFICATION: 4,
            AugmentationMode.COMBINED: 2,  # Seeds only; augmentation happens separately
        }.get(mode, 2)

        # Format sub-dimensions
        sub_dims = dimension.sub_dimensions or ["General"]
        sub_dims_text = "\n".join(f"- {sd}" for sd in sub_dims)

        # Render prompt
        prompt = self._generate_template.format(
            task_type=task_type,
            dimension_name=dimension.name,
            definition=dimension.definition,
            sub_dimensions=sub_dims_text,
            questions_per_sub=questions_per_sub,
        )

        # Call LLM (request JSON output for reliable parsing with reasoning models)
        client = self._get_or_create_client()
        response = client.chat_completion(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7,
            response_format={"type": "json_object"},
        )

        # Parse response
        content = response["choices"][0]["message"]["content"]
        questions = self._parse_questions_response(content)

        # Ensure dimension is set
        for q in questions:
            q["dimension"] = dimension.name
            if "sub_aspect" not in q:
                q["sub_aspect"] = "General"

        return questions

    def _augment_questions(
        self,
        existing_questions: List[Dict[str, Any]],
        dimension: DeductiveInput,
        mode: AugmentationMode,
        task_type: str,
    ) -> List[Dict[str, Any]]:
        """Augment existing questions using the specified mode.

        Args:
            existing_questions: List of existing question dicts
            dimension: Dimension definition
            mode: Augmentation mode
            task_type: Type of task being evaluated

        Returns:
            List of augmented question dicts
        """
        # Format existing questions
        existing_text = "\n".join(
            f"- [{q.get('sub_aspect', 'General')}] {q['question']}"
            for q in existing_questions
        )

        # Get augmentation instructions
        instructions = AUGMENTATION_INSTRUCTIONS.get(mode, "")

        # Render prompt
        prompt = self._augment_template.format(
            task_type=task_type,
            dimension_name=dimension.name,
            definition=dimension.definition,
            existing_questions=existing_text,
            augmentation_mode=mode.value,
            augmentation_instructions=instructions,
        )

        # Call LLM (request JSON output for reliable parsing with reasoning models)
        client = self._get_or_create_client()
        response = client.chat_completion(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7,
            response_format={"type": "json_object"},
        )

        # Parse response
        content = response["choices"][0]["message"]["content"]
        questions = self._parse_questions_response(content)

        # Ensure dimension is set
        for q in questions:
            q["dimension"] = dimension.name
            if "sub_aspect" not in q:
                q["sub_aspect"] = "General"

        return questions

    def _convert_seed_to_questions(
        self,
        seed_dict: Dict[str, List[str]],
        dimension_name: str,
    ) -> List[Dict[str, Any]]:
        """Convert seed question dict to list of question dicts.

        Args:
            seed_dict: {sub_aspect: [questions]}
            dimension_name: Name of the dimension

        Returns:
            List of question dicts
        """
        questions = []
        for sub_aspect, question_list in seed_dict.items():
            for q in question_list:
                questions.append(
                    {
                        "question": q,
                        "sub_aspect": sub_aspect,
                        "dimension": dimension_name,
                    }
                )
        return questions

    @staticmethod
    def _merge_and_deduplicate(
        *pools: List[Dict[str, Any]],
    ) -> List[Dict[str, Any]]:
        """Merge multiple question pools, deduplicating by question text.

        Questions from earlier pools take priority (seeds first, then
        elaborated, then diversified). This preserves the augmentation_source
        tag from the first occurrence.

        Args:
            *pools: Variable number of question lists to merge

        Returns:
            Deduplicated list of question dicts
        """
        seen = set()
        merged = []
        for pool in pools:
            for q in pool:
                text = q["question"].strip()
                if text not in seen:
                    seen.add(text)
                    merged.append(q)
        return merged

    def _parse_questions_response(self, content: str) -> List[Dict[str, Any]]:
        """Parse LLM response to extract questions.

        Args:
            content: Raw LLM response

        Returns:
            List of question dicts
        """
        # Try to find JSON in the response
        try:
            # Look for JSON block
            if "```json" in content:
                start = content.find("```json") + 7
                end = content.find("```", start)
                json_str = content[start:end].strip()
            elif "```" in content:
                start = content.find("```") + 3
                end = content.find("```", start)
                json_str = content[start:end].strip()
            elif "{" in content:
                # Find JSON object
                start = content.find("{")
                end = content.rfind("}") + 1
                json_str = content[start:end]
            else:
                return []

            data = json.loads(json_str)

            if isinstance(data, dict) and "questions" in data:
                return data["questions"]
            elif isinstance(data, list):
                return data
            else:
                return []

        except (json.JSONDecodeError, ValueError):
            return []

    def _filter_questions(
        self,
        items: List[ChecklistItem],
        dim_lookup: Dict[str, str],
        verbose: bool = False,
    ) -> tuple[List[ChecklistItem], Dict[str, int]]:
        """Filter questions using CheckEval-specific criteria.

        Implements the CheckEval paper's filtering stages:
        1. Alignment: "YES" should indicate higher quality
        2. Dimension Consistency: Question matches its dimension definition

        Args:
            items: List of checklist items to filter
            dim_lookup: Dict mapping dimension name to definition
            verbose: Print per-item filtering details

        Returns:
            Tuple of (filtered items, stats dict)
        """
        filtered_items = []
        stats = {"alignment_filtered": 0, "consistency_filtered": 0}

        client = self._get_or_create_client()
        for item in items:
            # Get dimension info from item metadata
            dimension_name = item.metadata.get("dimension", "unknown") if item.metadata else "unknown"
            dimension_def = dim_lookup.get(dimension_name, "General evaluation criterion")

            # Build filter prompt
            prompt = self._filter_template.format(
                dimension_name=dimension_name,
                dimension_definition=dimension_def,
                question=item.question,
            )

            response_format = {
                "type": "json_schema",
                "json_schema": {
                    "name": "filter_result",
                    "strict": True,
                    "schema": {
                        "type": "object",
                        "properties": {
                            "reasoning": {"type": "string"},
                            "alignment_pass": {"type": "boolean"},
                            "dimension_consistent": {"type": "boolean"},
                        },
                        "required": ["reasoning", "alignment_pass", "dimension_consistent"],
                        "additionalProperties": False,
                    },
                },
            }

            try:
                response = client.chat_completion(
                    model=self.model,
                    messages=[{"role": "user", "content": prompt}],
                    temperature=0.0,
                    response_format=response_format,
                )
                result = json.loads(response["choices"][0]["message"]["content"])

                alignment_pass = result.get("alignment_pass", False)
                dimension_consistent = result.get("dimension_consistent", False)

                if alignment_pass and dimension_consistent:
                    # Question passes both checks
                    filtered_items.append(item)
                else:
                    # Track why it was filtered
                    if not alignment_pass:
                        stats["alignment_filtered"] += 1
                    if not dimension_consistent:
                        stats["consistency_filtered"] += 1

            except (json.JSONDecodeError, KeyError, TypeError):
                # On parse error, be conservative and keep the question
                filtered_items.append(item)

        return filtered_items, stats

method_name property

Return the method name for this generator.

generate(dimensions, seed_questions=None, augment=True, max_questions=None, apply_filtering=False, verbose=False, **kwargs)

Generate a checklist from dimension definitions.

Parameters:

Name Type Description Default
dimensions List[DeductiveInput]

List of DeductiveInput with name, definition, sub_dimensions

required
seed_questions Optional[Dict[str, Dict[str, List[str]]]]

Optional pre-defined questions by dimension/sub-aspect Format: {dimension: {sub_aspect: [questions]}}

None
augment bool

Whether to augment seed questions (default True)

True
max_questions Optional[int]

Maximum number of questions to include

None
apply_filtering bool

Whether to apply Tagger and Deduplicator refiners (default False)

False
verbose bool

Print progress at each stage (default False)

False
**kwargs Any

Additional parameters passed to generation

{}

Returns:

Type Description
Checklist

Checklist with generated questions

Source code in autochecklist/generators/corpus_level/deductive.py
def generate(
    self,
    dimensions: List[DeductiveInput],
    seed_questions: Optional[Dict[str, Dict[str, List[str]]]] = None,
    augment: bool = True,
    max_questions: Optional[int] = None,
    apply_filtering: bool = False,
    verbose: bool = False,
    **kwargs: Any,
) -> Checklist:
    """Generate a checklist from dimension definitions.

    Args:
        dimensions: List of DeductiveInput with name, definition, sub_dimensions
        seed_questions: Optional pre-defined questions by dimension/sub-aspect
            Format: {dimension: {sub_aspect: [questions]}}
        augment: Whether to augment seed questions (default True)
        max_questions: Maximum number of questions to include
        apply_filtering: Whether to apply Tagger and Deduplicator refiners (default False)
        verbose: Print progress at each stage (default False)
        **kwargs: Additional parameters passed to generation

    Returns:
        Checklist with generated questions
    """
    if not dimensions:
        return Checklist(
            id=str(uuid.uuid4()),
            items=[],
            source_method="checkeval",
            generation_level="corpus",
            metadata={
                "dimension_count": 0,
                "dimensions": [],
                "augmentation_mode": self.augmentation_mode.value,
            },
        )

    all_questions = []

    for dimension in dimensions:
        # Check if seed questions provided for this dimension
        dim_seed_questions = (
            seed_questions.get(dimension.name) if seed_questions else None
        )

        if dim_seed_questions and not augment:
            # Use seed questions directly without augmentation
            questions = self._convert_seed_to_questions(
                dim_seed_questions, dimension.name
            )
        elif self.augmentation_mode == AugmentationMode.COMBINED:
            # Paper-faithful: generate seeds, then run both augmentation
            # modes independently, then merge all three pools
            if dim_seed_questions:
                seeds = self._convert_seed_to_questions(
                    dim_seed_questions, dimension.name
                )
            else:
                seeds = self._generate_questions(
                    dimension,
                    mode=AugmentationMode.SEED,
                    task_type=self.task_type,
                )

            # Tag seeds
            for q in seeds:
                q["augmentation_source"] = "seed"

            # Run elaboration independently from seeds
            elaborated = self._augment_questions(
                seeds, dimension,
                mode=AugmentationMode.ELABORATION,
                task_type=self.task_type,
            )
            for q in elaborated:
                q.setdefault("augmentation_source", "elaboration")

            # Run diversification independently from seeds
            diversified = self._augment_questions(
                seeds, dimension,
                mode=AugmentationMode.DIVERSIFICATION,
                task_type=self.task_type,
            )
            for q in diversified:
                q.setdefault("augmentation_source", "diversification")

            # Merge all three pools, deduplicating by question text
            questions = self._merge_and_deduplicate(seeds, elaborated, diversified)
        elif dim_seed_questions and augment:
            # Augment from seed questions
            seed_list = self._convert_seed_to_questions(
                dim_seed_questions, dimension.name
            )
            questions = self._augment_questions(
                seed_list,
                dimension,
                mode=self.augmentation_mode,
                task_type=self.task_type,
            )
        else:
            # Generate questions from dimension definition
            questions = self._generate_questions(
                dimension,
                mode=self.augmentation_mode,
                task_type=self.task_type,
            )

        all_questions.extend(questions)

    # Apply max_questions limit if specified
    if max_questions and len(all_questions) > max_questions:
        all_questions = all_questions[:max_questions]

    # Convert to ChecklistItems
    items = []
    for q in all_questions:
        meta = {
            "dimension": q.get("dimension"),
            "sub_aspect": q.get("sub_aspect"),
        }
        if "augmentation_source" in q:
            meta["augmentation_source"] = q["augmentation_source"]
        item = ChecklistItem(
            id=str(uuid.uuid4()),
            question=q["question"],
            category=q.get("dimension"),
            metadata=meta,
        )
        items.append(item)

    checklist = Checklist(
        id=str(uuid.uuid4()),
        items=items,
        source_method="checkeval",
        generation_level="corpus",
        metadata={
            "dimension_count": len(dimensions),
            "dimensions": [d.name for d in dimensions],
            "augmentation_mode": self.augmentation_mode.value,
            "task_type": self.task_type,
        },
    )

    if verbose:
        print(f"[DeductiveGenerator] Generated {len(checklist.items)} questions from {len(dimensions)} dimensions")

    # Apply filtering if enabled (CheckEval paper §4.2)
    if apply_filtering and len(checklist.items) > 0:
        # Build dimension lookup for consistency checks
        dim_lookup = {d.name: d.definition for d in dimensions}

        # Stage 1 & 2: Alignment + Dimension Consistency (CheckEval-specific)
        before_count = len(checklist.items)
        filtered_items, filter_stats = self._filter_questions(
            checklist.items, dim_lookup, verbose
        )

        # Update checklist with filtered items
        checklist = Checklist(
            id=checklist.id,
            items=filtered_items,
            source_method=checklist.source_method,
            generation_level=checklist.generation_level,
            input=checklist.input,
            metadata={
                **checklist.metadata,
                "alignment_filtered": filter_stats["alignment_filtered"],
                "consistency_filtered": filter_stats["consistency_filtered"],
            },
        )

        if verbose:
            after_count = len(checklist.items)
            print(
                f"[DeductiveGenerator] Filtering: {before_count}{after_count} questions "
                f"({filter_stats['alignment_filtered']} alignment, "
                f"{filter_stats['consistency_filtered']} consistency)"
            )

        # Stage 3: Redundancy Removal (via Deduplicator)
        if len(checklist.items) > 1:
            before_count = len(checklist.items)
            dedup = Deduplicator(
                similarity_threshold=0.85,
                model=self.model,
                api_key=self.api_key,
            )
            checklist = dedup.refine(checklist)
            if verbose:
                after_count = len(checklist.items)
                merged = checklist.metadata.get("clusters_merged", 0)
                print(f"[DeductiveGenerator] Deduplication: {before_count}{after_count} questions ({merged} clusters merged)")

    if verbose:
        print(f"[DeductiveGenerator] Final checklist: {len(checklist.items)} questions")

    return checklist

generate_grouped(dimensions, **kwargs)

Generate a checklist and split it by dimension category.

Convenience wrapper around generate().by_category().

Parameters:

Name Type Description Default
dimensions List[DeductiveInput]

List of DeductiveInput with name, definition, sub_dimensions

required
**kwargs Any

Additional arguments passed to generate()

{}

Returns:

Type Description
Dict[str, Checklist]

Dict mapping dimension name to sub-Checklist

Source code in autochecklist/generators/corpus_level/deductive.py
def generate_grouped(
    self,
    dimensions: List[DeductiveInput],
    **kwargs: Any,
) -> Dict[str, "Checklist"]:
    """Generate a checklist and split it by dimension category.

    Convenience wrapper around ``generate().by_category()``.

    Args:
        dimensions: List of DeductiveInput with name, definition, sub_dimensions
        **kwargs: Additional arguments passed to generate()

    Returns:
        Dict mapping dimension name to sub-Checklist
    """
    checklist = self.generate(dimensions=dimensions, **kwargs)
    return checklist.by_category()

AugmentationMode

Bases: str, Enum

Augmentation modes for question generation.

Source code in autochecklist/generators/corpus_level/deductive.py
class AugmentationMode(str, Enum):
    """Augmentation modes for question generation."""

    SEED = "seed"  # Minimal questions (1-3 per sub-dimension)
    ELABORATION = "elaboration"  # Expanded with more detail
    DIVERSIFICATION = "diversification"  # Alternative framings
    COMBINED = "combined"  # Both elaboration + diversification from seeds (paper-faithful)

InteractiveGenerator

Bases: CorpusChecklistGenerator

Generate checklists from interactive think-aloud attributes.

InteractEval takes pre-collected think-aloud attributes (considerations about evaluating a dimension) and transforms them through a 5-stage pipeline into a validated checklist of yes/no questions.

Parameters:

Name Type Description Default
model Optional[str]

OpenRouter model ID for generation

None
max_components int

Maximum number of components to extract (default 5)

5
Example

interacteval = InteractiveGenerator(model="openai/gpt-4o-mini") input = InteractiveInput( ... source="human_llm", ... dimension="coherence", ... attributes=["Check for logical flow", "Ensure consistency"], ... ) checklist = interacteval.generate( ... inputs=[input], ... rubric="Coherence measures the logical flow and consistency...", ... )

Source code in autochecklist/generators/corpus_level/interactive.py
class InteractiveGenerator(CorpusChecklistGenerator):
    """Generate checklists from interactive think-aloud attributes.

    InteractEval takes pre-collected think-aloud attributes (considerations
    about evaluating a dimension) and transforms them through a 5-stage
    pipeline into a validated checklist of yes/no questions.

    Args:
        model: OpenRouter model ID for generation
        max_components: Maximum number of components to extract (default 5)

    Example:
        >>> interacteval = InteractiveGenerator(model="openai/gpt-4o-mini")
        >>> input = InteractiveInput(
        ...     source="human_llm",
        ...     dimension="coherence",
        ...     attributes=["Check for logical flow", "Ensure consistency"],
        ... )
        >>> checklist = interacteval.generate(
        ...     inputs=[input],
        ...     rubric="Coherence measures the logical flow and consistency...",
        ... )
    """

    def __init__(
        self,
        model: Optional[str] = None,
        max_components: int = 5,
        api_key: Optional[str] = None,
        **kwargs,
    ):
        super().__init__(model=model, api_key=api_key, **kwargs)
        self.max_components = max_components

        # Load prompt templates for each pipeline stage
        self._component_extraction_template = PromptTemplate(
            load_template("generators/interacteval", "component_extraction")
        )
        self._attributes_clustering_template = PromptTemplate(
            load_template("generators/interacteval", "attributes_clustering")
        )
        self._question_generation_template = PromptTemplate(
            load_template("generators/interacteval", "question_generation")
        )
        self._sub_question_generation_template = PromptTemplate(
            load_template("generators/interacteval", "sub_question_generation")
        )
        self._question_validation_template = PromptTemplate(
            load_template("generators/interacteval", "question_validation")
        )

    @property
    def method_name(self) -> str:
        """Return the method name for this generator."""
        return "interacteval"

    def generate(
        self,
        inputs: List[InteractiveInput],
        rubric: str = "",
        max_questions: Optional[int] = None,
        **kwargs: Any,
    ) -> Checklist:
        """Generate a checklist from think-aloud attributes.

        Args:
            inputs: List of InteractiveInput with attributes from human/LLM sources
            rubric: Definition/rubric for the evaluation dimension
            max_questions: Maximum number of questions to include
            **kwargs: Additional parameters

        Returns:
            Checklist with generated questions
        """
        if not inputs:
            return self._empty_checklist()

        # Combine attributes from all inputs
        all_attributes = []
        dimension = inputs[0].dimension
        source = inputs[0].source

        for inp in inputs:
            all_attributes.extend(inp.attributes)
            # Use the first non-None dimension
            if not dimension and inp.dimension:
                dimension = inp.dimension
            # Track combined sources
            if inp.source != source:
                source = "human_llm"

        if not all_attributes:
            return self._empty_checklist(dimension=dimension)

        # Run the 5-stage pipeline
        validated_questions = self._run_pipeline(
            all_attributes, rubric, dimension or "quality"
        )

        # Apply max_questions limit if specified
        if max_questions and len(validated_questions) > max_questions:
            validated_questions = validated_questions[:max_questions]

        # Convert to ChecklistItems
        items = []
        for q in validated_questions:
            item = ChecklistItem(
                id=str(uuid.uuid4()),
                question=q,
                category=dimension,
                metadata={
                    "dimension": dimension,
                },
            )
            items.append(item)

        return Checklist(
            id=str(uuid.uuid4()),
            items=items,
            source_method="interacteval",
            generation_level="corpus",
            metadata={
                "dimension": dimension,
                "attribute_count": len(all_attributes),
                "source": source,
                "rubric": rubric[:200] if rubric else None,  # Truncate for metadata
            },
        )

    def _run_pipeline(
        self,
        attributes: List[str],
        rubric: str,
        dimension: str,
    ) -> List[str]:
        """Run the 5-stage pipeline to generate validated questions.

        Args:
            attributes: List of think-aloud attributes
            rubric: Dimension definition/rubric
            dimension: Name of the evaluation dimension

        Returns:
            List of validated question strings
        """
        # Stage 1: Extract components
        components = self._extract_components(attributes, rubric, dimension)
        if not components:
            return []

        # Stage 2: Cluster attributes under components
        clustered = self._cluster_attributes(components, attributes)

        # Stage 3: Generate key questions
        key_questions = self._generate_key_questions(clustered, rubric, dimension)

        # Stage 4: Generate sub-questions
        sub_questions = self._generate_sub_questions(key_questions, rubric, dimension)

        # Stage 5: Validate and refine
        validated = self._validate_questions(sub_questions, rubric, dimension)

        return validated

    def _extract_components(
        self,
        attributes: List[str],
        rubric: str,
        dimension: str,
    ) -> List[str]:
        """Stage 1: Extract recurring components/themes from attributes."""
        attributes_text = "\n".join(f"- {attr}" for attr in attributes)

        prompt = self._component_extraction_template.format(
            dimension=dimension,
            rubric=rubric,
            attributes=attributes_text,
            max_components=self.max_components,
        )

        client = self._get_or_create_client()
        response = client.chat_completion(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7,
            response_format={"type": "json_object"},
        )

        content = response["choices"][0]["message"]["content"]
        return self._parse_list_response(content)

    def _cluster_attributes(
        self,
        components: List[str],
        attributes: List[str],
    ) -> Dict[str, List[str]]:
        """Stage 2: Cluster attributes under components."""
        components_text = "\n".join(f"- {c}" for c in components)
        attributes_text = "\n".join(f"- {attr}" for attr in attributes)

        prompt = self._attributes_clustering_template.format(
            components=components_text,
            attributes=attributes_text,
        )

        client = self._get_or_create_client()
        response = client.chat_completion(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7,
            response_format={"type": "json_object"},
        )

        content = response["choices"][0]["message"]["content"]
        return self._parse_dict_response(content)

    def _generate_key_questions(
        self,
        clustered: Dict[str, List[str]],
        rubric: str,
        dimension: str,
    ) -> Dict[str, str]:
        """Stage 3: Generate key questions for each component."""
        components_attributes_text = ""
        for component, attrs in clustered.items():
            components_attributes_text += f"\n**{component}**:\n"
            for attr in attrs:
                components_attributes_text += f"  - {attr}\n"

        prompt = self._question_generation_template.format(
            dimension=dimension,
            rubric=rubric,
            components_attributes=components_attributes_text,
        )

        client = self._get_or_create_client()
        response = client.chat_completion(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7,
            response_format={"type": "json_object"},
        )

        content = response["choices"][0]["message"]["content"]
        return self._parse_dict_response(content)

    def _generate_sub_questions(
        self,
        key_questions: Dict[str, str],
        rubric: str,
        dimension: str,
    ) -> Dict[str, List[str]]:
        """Stage 4: Generate sub-questions for each key question."""
        key_questions_text = ""
        for component, question in key_questions.items():
            key_questions_text += f"\n**{component}**: {question}\n"

        prompt = self._sub_question_generation_template.format(
            dimension=dimension,
            rubric=rubric,
            key_questions=key_questions_text,
        )

        client = self._get_or_create_client()
        response = client.chat_completion(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7,
            response_format={"type": "json_object"},
        )

        content = response["choices"][0]["message"]["content"]
        return self._parse_dict_list_response(content)

    def _validate_questions(
        self,
        sub_questions: Dict[str, List[str]],
        rubric: str,
        dimension: str,
    ) -> List[str]:
        """Stage 5: Validate and refine questions."""
        # Flatten all questions
        all_questions = []
        for component, questions in sub_questions.items():
            all_questions.extend(questions)

        all_questions_text = "\n".join(f"- {q}" for q in all_questions)

        prompt = self._question_validation_template.format(
            dimension=dimension,
            rubric=rubric,
            all_questions=all_questions_text,
        )

        client = self._get_or_create_client()
        response = client.chat_completion(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.5,  # Lower temperature for validation
            response_format={"type": "json_object"},
        )

        content = response["choices"][0]["message"]["content"]
        return self._parse_list_response(content)

    def _empty_checklist(self, dimension: str = None) -> Checklist:
        """Return an empty checklist."""
        return Checklist(
            id=str(uuid.uuid4()),
            items=[],
            source_method="interacteval",
            generation_level="corpus",
            metadata={
                "dimension": dimension,
                "attribute_count": 0,
                "source": None,
            },
        )

    def _parse_list_response(self, content: str) -> List[str]:
        """Parse LLM response to extract a list of strings."""
        try:
            # Try to find JSON in the response
            if "```json" in content:
                start = content.find("```json") + 7
                end = content.find("```", start)
                json_str = content[start:end].strip()
            elif "```" in content:
                start = content.find("```") + 3
                end = content.find("```", start)
                json_str = content[start:end].strip()
            else:
                json_str = content.strip()

            data = json.loads(json_str)
            if isinstance(data, list):
                return [str(item) for item in data]
            # Handle JSON objects containing a list value (e.g., {"components": [...]})
            if isinstance(data, dict):
                for value in data.values():
                    if isinstance(value, list) and value:
                        return [str(item) for item in value]
            return []

        except (json.JSONDecodeError, ValueError):
            # Fallback: try to extract a JSON array via bracket matching
            if "[" in content:
                try:
                    start = content.find("[")
                    end = content.rfind("]") + 1
                    arr = json.loads(content[start:end])
                    if isinstance(arr, list):
                        return [str(item) for item in arr]
                except (json.JSONDecodeError, ValueError):
                    pass
            return []

    def _parse_dict_response(self, content: str) -> Dict[str, Any]:
        """Parse LLM response to extract a dictionary."""
        try:
            if "```json" in content:
                start = content.find("```json") + 7
                end = content.find("```", start)
                json_str = content[start:end].strip()
            elif "```" in content:
                start = content.find("```") + 3
                end = content.find("```", start)
                json_str = content[start:end].strip()
            elif "{" in content:
                start = content.find("{")
                end = content.rfind("}") + 1
                json_str = content[start:end]
            else:
                return {}

            data = json.loads(json_str)
            if isinstance(data, dict):
                return data
            return {}

        except (json.JSONDecodeError, ValueError):
            return {}

    def _parse_dict_list_response(self, content: str) -> Dict[str, List[str]]:
        """Parse LLM response to extract a dict of lists."""
        result = self._parse_dict_response(content)
        # Ensure all values are lists
        for key, value in result.items():
            if not isinstance(value, list):
                result[key] = [value] if value else []
        return result

method_name property

Return the method name for this generator.

generate(inputs, rubric='', max_questions=None, **kwargs)

Generate a checklist from think-aloud attributes.

Parameters:

Name Type Description Default
inputs List[InteractiveInput]

List of InteractiveInput with attributes from human/LLM sources

required
rubric str

Definition/rubric for the evaluation dimension

''
max_questions Optional[int]

Maximum number of questions to include

None
**kwargs Any

Additional parameters

{}

Returns:

Type Description
Checklist

Checklist with generated questions

Source code in autochecklist/generators/corpus_level/interactive.py
def generate(
    self,
    inputs: List[InteractiveInput],
    rubric: str = "",
    max_questions: Optional[int] = None,
    **kwargs: Any,
) -> Checklist:
    """Generate a checklist from think-aloud attributes.

    Args:
        inputs: List of InteractiveInput with attributes from human/LLM sources
        rubric: Definition/rubric for the evaluation dimension
        max_questions: Maximum number of questions to include
        **kwargs: Additional parameters

    Returns:
        Checklist with generated questions
    """
    if not inputs:
        return self._empty_checklist()

    # Combine attributes from all inputs
    all_attributes = []
    dimension = inputs[0].dimension
    source = inputs[0].source

    for inp in inputs:
        all_attributes.extend(inp.attributes)
        # Use the first non-None dimension
        if not dimension and inp.dimension:
            dimension = inp.dimension
        # Track combined sources
        if inp.source != source:
            source = "human_llm"

    if not all_attributes:
        return self._empty_checklist(dimension=dimension)

    # Run the 5-stage pipeline
    validated_questions = self._run_pipeline(
        all_attributes, rubric, dimension or "quality"
    )

    # Apply max_questions limit if specified
    if max_questions and len(validated_questions) > max_questions:
        validated_questions = validated_questions[:max_questions]

    # Convert to ChecklistItems
    items = []
    for q in validated_questions:
        item = ChecklistItem(
            id=str(uuid.uuid4()),
            question=q,
            category=dimension,
            metadata={
                "dimension": dimension,
            },
        )
        items.append(item)

    return Checklist(
        id=str(uuid.uuid4()),
        items=items,
        source_method="interacteval",
        generation_level="corpus",
        metadata={
            "dimension": dimension,
            "attribute_count": len(all_attributes),
            "source": source,
            "rubric": rubric[:200] if rubric else None,  # Truncate for metadata
        },
    )

ChecklistGenerator

Bases: ABC

Base class for all checklist generators.

Source code in autochecklist/generators/base.py
class ChecklistGenerator(ABC):
    """Base class for all checklist generators."""

    def __init__(
        self,
        model: Optional[str] = None,
        temperature: Optional[float] = None,
        max_tokens: int = 2048,
        api_key: Optional[str] = None,
        provider: Optional[str] = None,
        base_url: Optional[str] = None,
        client: Any = None,
        api_format: Optional[str] = None,
        reasoning_effort: Optional[str] = None,
    ):
        config = get_config()
        self.model = model or config.generator_model.model_id
        self.temperature = temperature if temperature is not None else config.generator_model.temperature
        self.max_tokens = max_tokens
        self.api_key = api_key
        self._client = client
        self._provider = provider or config.generator_model.provider or "openrouter"
        self._base_url = base_url
        self._api_format = api_format or "chat"
        self.reasoning_effort = reasoning_effort

    @property
    @abstractmethod
    def generation_level(self) -> str:
        """Return 'instance' or 'corpus'."""
        pass

    @property
    @abstractmethod
    def method_name(self) -> str:
        """Return the method name (e.g., 'tick', 'rlcf')."""
        pass

    @abstractmethod
    def generate(self, **kwargs: Any) -> Checklist:
        """Generate a checklist."""
        pass

    def generate_stream(self, **kwargs: Any) -> Iterator[str]:
        """Stream checklist generation (for UI).

        Default implementation just yields the final result.
        Override for true streaming support.
        """
        checklist = self.generate(**kwargs)
        yield checklist.to_text()

    def _get_or_create_client(self) -> Any:
        """Get injected client or create one from provider settings."""
        if self._client is not None:
            return self._client
        from ..providers.factory import get_client
        return get_client(
            provider=self._provider,
            api_key=self.api_key,
            base_url=self._base_url,
            model=self.model,
            api_format=self._api_format,
        )

    def _call_model(
        self,
        prompt: str,
        system_prompt: Optional[str] = None,
        response_format: Optional[dict] = None,
    ) -> str:
        """Call the LLM and return the response text.

        Args:
            prompt: The user prompt text.
            system_prompt: Optional system prompt.
            response_format: Optional OpenAI-compatible response_format dict
                for structured JSON output. When provided, the call is attempted
                with ``response_format`` first; if the provider does not support
                it, the call is retried without it (fallback to schema-in-prompt).

        Returns:
            The model's response text.
        """
        messages: List[Dict[str, str]] = []
        if system_prompt:
            messages.append({"role": "system", "content": system_prompt})
        messages.append({"role": "user", "content": prompt})

        kwargs: Dict[str, Any] = {}
        if response_format is not None:
            kwargs["response_format"] = response_format
        if self.reasoning_effort is not None:
            kwargs["reasoning_effort"] = self.reasoning_effort

        client = self._get_or_create_client()

        try:
            response = client.chat_completion(
                model=self.model,
                messages=messages,
                temperature=self.temperature,
                max_tokens=self.max_tokens,
                **kwargs,
            )
        except (ValueError, KeyError, TypeError) as e:
            # Schema/parsing errors from response_format — fall back to schema-in-prompt
            if response_format is not None:
                logger.warning(
                    "Structured output failed (%s), retrying without "
                    "response_format (fallback to schema-in-prompt).",
                    e,
                )
                fallback_kwargs = {k: v for k, v in kwargs.items() if k != "response_format"}
                response = client.chat_completion(
                    model=self.model,
                    messages=messages,
                    temperature=self.temperature,
                    max_tokens=self.max_tokens,
                    **fallback_kwargs,
                )
            else:
                raise
        except Exception as e:
            # For HTTP errors (auth, rate limit, server), only fallback on 400 (bad schema)
            import httpx

            if (
                response_format is not None
                and isinstance(e, httpx.HTTPStatusError)
                and e.response.status_code == 400
            ):
                logger.warning(
                    "Structured output failed (%s), retrying without "
                    "response_format (fallback to schema-in-prompt).",
                    e,
                )
                fallback_kwargs = {k: v for k, v in kwargs.items() if k != "response_format"}
                response = client.chat_completion(
                    model=self.model,
                    messages=messages,
                    temperature=self.temperature,
                    max_tokens=self.max_tokens,
                    **fallback_kwargs,
                )
            else:
                raise

        return response["choices"][0]["message"]["content"]

generation_level abstractmethod property

Return 'instance' or 'corpus'.

method_name abstractmethod property

Return the method name (e.g., 'tick', 'rlcf').

generate(**kwargs) abstractmethod

Generate a checklist.

Source code in autochecklist/generators/base.py
@abstractmethod
def generate(self, **kwargs: Any) -> Checklist:
    """Generate a checklist."""
    pass

generate_stream(**kwargs)

Stream checklist generation (for UI).

Default implementation just yields the final result. Override for true streaming support.

Source code in autochecklist/generators/base.py
def generate_stream(self, **kwargs: Any) -> Iterator[str]:
    """Stream checklist generation (for UI).

    Default implementation just yields the final result.
    Override for true streaming support.
    """
    checklist = self.generate(**kwargs)
    yield checklist.to_text()

InstanceChecklistGenerator

Bases: ChecklistGenerator

Base for instance-level generators (one checklist per input).

Source code in autochecklist/generators/base.py
class InstanceChecklistGenerator(ChecklistGenerator):
    """Base for instance-level generators (one checklist per input)."""

    @property
    def generation_level(self) -> str:
        return "instance"

    @abstractmethod
    def generate(
        self,
        input: str,
        target: Optional[str] = None,
        reference: Optional[str] = None,
        **kwargs: Any,
    ) -> Checklist:
        """Generate checklist for an input.

        Args:
            input: The input/query/instruction text
            target: Optional target response to evaluate
            reference: Optional reference target for comparison
            **kwargs: Method-specific arguments

        Returns:
            Generated Checklist
        """
        pass

generate(input, target=None, reference=None, **kwargs) abstractmethod

Generate checklist for an input.

Parameters:

Name Type Description Default
input str

The input/query/instruction text

required
target Optional[str]

Optional target response to evaluate

None
reference Optional[str]

Optional reference target for comparison

None
**kwargs Any

Method-specific arguments

{}

Returns:

Type Description
Checklist

Generated Checklist

Source code in autochecklist/generators/base.py
@abstractmethod
def generate(
    self,
    input: str,
    target: Optional[str] = None,
    reference: Optional[str] = None,
    **kwargs: Any,
) -> Checklist:
    """Generate checklist for an input.

    Args:
        input: The input/query/instruction text
        target: Optional target response to evaluate
        reference: Optional reference target for comparison
        **kwargs: Method-specific arguments

    Returns:
        Generated Checklist
    """
    pass

CorpusChecklistGenerator

Bases: ChecklistGenerator

Base for corpus-level generators (one checklist for entire dataset).

Source code in autochecklist/generators/base.py
class CorpusChecklistGenerator(ChecklistGenerator):
    """Base for corpus-level generators (one checklist for entire dataset)."""

    @property
    def generation_level(self) -> str:
        return "corpus"

    @abstractmethod
    def generate(
        self,
        inputs: List[Dict[str, Any]],
        **kwargs: Any,
    ) -> Checklist:
        """Generate checklist from corpus inputs.

        Args:
            inputs: List of input items (feedback, dimensions, think-aloud, etc.)
            **kwargs: Method-specific arguments

        Returns:
            Generated Checklist
        """
        pass

generate(inputs, **kwargs) abstractmethod

Generate checklist from corpus inputs.

Parameters:

Name Type Description Default
inputs List[Dict[str, Any]]

List of input items (feedback, dimensions, think-aloud, etc.)

required
**kwargs Any

Method-specific arguments

{}

Returns:

Type Description
Checklist

Generated Checklist

Source code in autochecklist/generators/base.py
@abstractmethod
def generate(
    self,
    inputs: List[Dict[str, Any]],
    **kwargs: Any,
) -> Checklist:
    """Generate checklist from corpus inputs.

    Args:
        inputs: List of input items (feedback, dimensions, think-aloud, etc.)
        **kwargs: Method-specific arguments

    Returns:
        Generated Checklist
    """
    pass

ChecklistScorer

Configurable checklist scorer that supports batch and per-item modes.

Consolidates the former BatchScorer, ItemScorer, WeightedScorer, and NormalizedScorer into a single class. All three aggregate metrics (pass_rate, weighted_score, normalized_score) are always computed.

Parameters:

Name Type Description Default
mode str

"batch" (one LLM call) or "item" (one call per item).

'batch'
capture_reasoning bool

Item mode only — include per-item reasoning.

False
use_logprobs bool

Item mode only — use logprobs for confidence scoring.

False
primary_metric str

Which metric Score.primary_score aliases. One of "pass" (pass_rate), "weighted" (weighted_score), "normalized" (normalized_score).

'pass'
custom_prompt Optional[Union[str, Path]]

Override the default prompt template (str text or Path).

None
model Optional[str]

LLM model identifier.

None
temperature float

Sampling temperature.

0.0
api_key Optional[str]

Provider API key.

None
provider Optional[str]

LLM provider name.

None
base_url Optional[str]

Override base URL.

None
client Any

Pre-configured LLM client.

None
api_format Optional[str]

API format ("chat" or "responses").

None
max_tokens int

Maximum response tokens.

2048
reasoning_effort Optional[str]

Reasoning effort hint for supported models.

None
Example

scorer = ChecklistScorer(mode="batch") score = scorer.score(checklist, target="The response text...") print(score.primary_score) # uses primary_metric

Source code in autochecklist/scorers/base.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
class ChecklistScorer:
    """Configurable checklist scorer that supports batch and per-item modes.

    Consolidates the former BatchScorer, ItemScorer, WeightedScorer, and
    NormalizedScorer into a single class.  All three aggregate metrics
    (pass_rate, weighted_score, normalized_score) are always computed.

    Args:
        mode: ``"batch"`` (one LLM call) or ``"item"`` (one call per item).
        capture_reasoning: Item mode only — include per-item reasoning.
        use_logprobs: Item mode only — use logprobs for confidence scoring.
        primary_metric: Which metric ``Score.primary_score`` aliases.
            One of ``"pass"`` (pass_rate), ``"weighted"`` (weighted_score),
            ``"normalized"`` (normalized_score).
        custom_prompt: Override the default prompt template (str text or Path).
        model: LLM model identifier.
        temperature: Sampling temperature.
        api_key: Provider API key.
        provider: LLM provider name.
        base_url: Override base URL.
        client: Pre-configured LLM client.
        api_format: API format (``"chat"`` or ``"responses"``).
        max_tokens: Maximum response tokens.
        reasoning_effort: Reasoning effort hint for supported models.

    Example:
        >>> scorer = ChecklistScorer(mode="batch")
        >>> score = scorer.score(checklist, target="The response text...")
        >>> print(score.primary_score)  # uses primary_metric
    """

    def __init__(
        self,
        mode: str = "batch",
        capture_reasoning: bool = False,
        use_logprobs: bool = False,
        primary_metric: str = "pass",
        custom_prompt: Optional[Union[str, Path]] = None,
        # LLM config
        model: Optional[str] = None,
        temperature: float = 0.0,
        api_key: Optional[str] = None,
        provider: Optional[str] = None,
        base_url: Optional[str] = None,
        client: Any = None,
        api_format: Optional[str] = None,
        max_tokens: int = 2048,
        reasoning_effort: Optional[str] = None,
    ):
        if mode not in _VALID_MODES:
            raise ValueError(
                f"mode must be one of {_VALID_MODES!r}, got {mode!r}"
            )
        if primary_metric not in _VALID_PRIMARY_METRICS:
            raise ValueError(
                f"primary_metric must be one of {_VALID_PRIMARY_METRICS!r}, "
                f"got {primary_metric!r}"
            )

        self.mode = mode
        self.capture_reasoning = capture_reasoning
        self.use_logprobs = use_logprobs
        self.primary_metric = primary_metric

        # LLM settings
        config = get_config()
        self.model = model or config.scorer_model.model_id
        self.temperature = temperature
        self.max_tokens = max_tokens
        self.api_key = api_key
        self._client = client
        self._provider = provider or config.scorer_model.provider or "openrouter"
        self._base_url = base_url
        self._api_format = api_format or "chat"
        self.reasoning_effort = reasoning_effort

        # Load prompt template and format
        self._template, self._format_text = self._load_prompt_and_format(
            custom_prompt
        )

        # Logprobs availability check (deferred to avoid API call unless needed)
        self._logprobs_available = False
        if self.use_logprobs:
            client_instance = self._get_or_create_client()
            if client_instance.supports_logprobs(self.model):
                self._logprobs_available = True
            else:
                warnings.warn(
                    f"Model '{self.model}' does not support logprobs. "
                    "Scorer will use text-based scoring with confidence=None. "
                    "normalized_score will equal pass_rate (unweighted).",
                    UserWarning,
                    stacklevel=2,
                )

    # ── Public properties ──────────────────────────────────────────────────

    @property
    def scoring_method(self) -> str:
        """Backward-compat scoring method string for Score metadata."""
        if self.mode == "batch":
            return "batch"
        if self.use_logprobs:
            return "normalized"
        if self.primary_metric == "weighted":
            return "weighted"
        if self.capture_reasoning:
            return "item"
        return "item"

    @property
    def prompt_text(self) -> str:
        """The raw prompt template text."""
        return self._template.template

    # ── Core scoring API ───────────────────────────────────────────────────

    def score(
        self,
        checklist: Checklist,
        target: str,
        input: Optional[str] = None,
        **kwargs: Any,
    ) -> Score:
        """Score a target response against a checklist.

        Args:
            checklist: The checklist to evaluate against.
            target: The target text to score.
            input: Optional input/context (falls back to checklist.input).
            **kwargs: Additional arguments (ignored).

        Returns:
            Score object with item-level and all aggregate scores.
        """
        if self.mode == "batch":
            return self._score_batch(checklist, target, input)
        else:
            return self._score_items_dispatch(checklist, target, input)

    def score_batch(
        self,
        checklist: Checklist,
        targets: List[str],
        inputs: Optional[List[str]] = None,
        **kwargs: Any,
    ) -> List[Score]:
        """Score multiple targets sequentially."""
        results = []
        for i, target in enumerate(targets):
            input_text = inputs[i] if inputs else None
            results.append(self.score(checklist, target, input_text, **kwargs))
        return results

    # ── Batch scoring ──────────────────────────────────────────────────────

    def _score_batch(
        self,
        checklist: Checklist,
        target: str,
        input: Optional[str],
    ) -> Score:
        """Evaluate ALL checklist items in a single LLM call."""
        inst = input or checklist.input or ""

        checklist_text = "\n".join(
            f"Q{i}: {item.question}"
            for i, item in enumerate(checklist.items, 1)
        )

        prompt = self._template.format(
            input=inst, target=target, checklist=checklist_text,
        )
        full_prompt = prompt + "\n\n" + self._format_text

        if self.capture_reasoning:
            rf = to_response_format(
                BatchScoringResponseReasoned, "batch_scoring_reasoned"
            )
            raw = self._call_model(full_prompt, response_format=rf)
            try:
                data = json.loads(raw)
            except json.JSONDecodeError:
                data = extract_json(raw)
            validated = BatchScoringResponseReasoned.model_validate(data)

            answer_map = {a.question_index: a.answer for a in validated.answers}
            reasoning_map = {
                a.question_index: a.reasoning for a in validated.answers
            }
        else:
            rf = to_response_format(BatchScoringResponse, "batch_scoring")
            raw = self._call_model(full_prompt, response_format=rf)
            try:
                data = json.loads(raw)
            except json.JSONDecodeError:
                data = extract_json(raw)
            validated = BatchScoringResponse.model_validate(data)

            answer_map = {a.question_index: a.answer for a in validated.answers}
            reasoning_map = {}

        item_scores = []
        for i, item in enumerate(checklist.items):
            answer_str = answer_map.get(i + 1, "NO")
            answer = (
                ChecklistItemAnswer.YES
                if answer_str == "YES"
                else ChecklistItemAnswer.NO
            )
            reasoning = reasoning_map.get(i + 1)
            item_scores.append(
                ItemScore(item_id=item.id, answer=answer, reasoning=reasoning)
            )

        return self._build_score(checklist, item_scores, raw_response=raw)

    # ── Item scoring ───────────────────────────────────────────────────────

    def _score_items_dispatch(
        self,
        checklist: Checklist,
        target: str,
        input: Optional[str],
    ) -> Score:
        """Evaluate each checklist item individually."""
        inst = input or checklist.input or ""
        item_scores = []

        client = self._get_or_create_client()

        for item in checklist.items:
            prompt = self._template.format(
                input=inst, target=target, question=item.question,
                # normalized template may use {history}
                history="",
            )

            if self.use_logprobs and self._logprobs_available:
                # Logprobs path — confidence scoring
                messages = [{"role": "user", "content": prompt}]
                probs = client.get_logprobs(
                    model=self.model,
                    messages=messages,
                    temperature=self.temperature,
                )
                confidence, answer, level = self._interpret_probs(probs)
                item_scores.append(
                    ItemScore(
                        item_id=item.id,
                        answer=answer,
                        confidence=confidence,
                        confidence_level=level,
                    )
                )
            else:
                # Structured output path
                full_prompt = prompt + "\n\n" + self._format_text

                if self.capture_reasoning:
                    rf = to_response_format(
                        ItemScoringResponseReasoned, "item_scoring_reasoned"
                    )
                    raw = self._call_model(full_prompt, response_format=rf)
                    try:
                        data = json.loads(raw)
                    except json.JSONDecodeError:
                        data = extract_json(raw)
                    validated = ItemScoringResponseReasoned.model_validate(data)
                    answer = (
                        ChecklistItemAnswer.YES
                        if validated.answer == "YES"
                        else ChecklistItemAnswer.NO
                    )
                    item_scores.append(
                        ItemScore(
                            item_id=item.id,
                            answer=answer,
                            reasoning=validated.reasoning,
                        )
                    )
                else:
                    rf = to_response_format(
                        ItemScoringResponse, "item_scoring"
                    )
                    raw = self._call_model(full_prompt, response_format=rf)
                    try:
                        data = json.loads(raw)
                    except json.JSONDecodeError:
                        data = extract_json(raw)
                    validated = ItemScoringResponse.model_validate(data)
                    answer = (
                        ChecklistItemAnswer.YES
                        if validated.answer == "YES"
                        else ChecklistItemAnswer.NO
                    )
                    item_scores.append(
                        ItemScore(item_id=item.id, answer=answer)
                    )

        return self._build_score(
            checklist,
            item_scores,
            num_calls=len(checklist.items),
        )

    # ── Score assembly ─────────────────────────────────────────────────────

    def _build_score(
        self,
        checklist: Checklist,
        item_scores: List[ItemScore],
        raw_response: Optional[str] = None,
        num_calls: Optional[int] = None,
    ) -> Score:
        """Build a Score with all three aggregate metrics computed."""
        yes_count = sum(
            1 for s in item_scores if s.answer == ChecklistItemAnswer.YES
        )
        total = len(item_scores)
        total_score = yes_count / total if total > 0 else 0.0

        # Weighted score
        weighted_score = self._calculate_weighted_score(item_scores, checklist)

        # Normalized score
        confidences = [
            s.confidence for s in item_scores if s.confidence is not None
        ]
        normalized_score = (
            sum(confidences) / len(confidences) if confidences else total_score
        )

        metadata: Dict[str, Any] = {}
        if raw_response is not None:
            metadata["raw_response"] = raw_response
        if num_calls is not None:
            metadata["num_calls"] = num_calls

        return Score(
            checklist_id=checklist.id,
            item_scores=item_scores,
            total_score=total_score,
            weighted_score=weighted_score,
            normalized_score=normalized_score,
            primary_metric=self.primary_metric,
            judge_model=self.model,
            scoring_method=self.scoring_method,
            metadata=metadata,
        )

    # ── Weighted score calculation ─────────────────────────────────────────

    def _calculate_weighted_score(
        self,
        item_scores: List[ItemScore],
        checklist: Checklist,
    ) -> float:
        """Calculate weighted score: sum(weight_i * score_i) / sum(weight_i)."""
        item_weights = {item.id: item.weight for item in checklist.items}

        weighted_sum = 0.0
        total_weight = 0.0

        for s in item_scores:
            weight = item_weights.get(s.item_id, 1.0)
            total_weight += weight
            if s.answer == ChecklistItemAnswer.YES:
                weighted_sum += weight

        return weighted_sum / total_weight if total_weight > 0 else 0.0

    # ── Logprobs helpers ───────────────────────────────────────────────────

    def _interpret_probs(
        self,
        probs: Dict[str, float],
    ) -> tuple:
        """Interpret Yes/No probabilities into confidence and answer.

        Confidence = P(Yes) / (P(Yes) + P(No))
        """
        yes_prob = probs.get("yes", 0.0)
        no_prob = probs.get("no", 0.0)

        if yes_prob + no_prob < 1e-10:
            return 0.5, ChecklistItemAnswer.NO, ConfidenceLevel.UNSURE

        confidence = yes_prob / (yes_prob + no_prob)
        answer, level = self._confidence_to_level(confidence)
        return confidence, answer, level

    def _confidence_to_level(
        self,
        confidence: float,
    ) -> tuple:
        """Map confidence value to answer and confidence level."""
        if confidence < 0.2:
            return ChecklistItemAnswer.NO, ConfidenceLevel.NO_10
        elif confidence < 0.4:
            return ChecklistItemAnswer.NO, ConfidenceLevel.NO_30
        elif confidence < 0.6:
            return ChecklistItemAnswer.NO, ConfidenceLevel.UNSURE
        elif confidence < 0.8:
            return ChecklistItemAnswer.YES, ConfidenceLevel.YES_70
        else:
            return ChecklistItemAnswer.YES, ConfidenceLevel.YES_90

    # ── Prompt loading ─────────────────────────────────────────────────────

    def _load_prompt_and_format(
        self,
        custom_prompt: Optional[Union[str, Path]],
    ) -> tuple:
        """Load prompt template and format instruction based on mode."""
        # Determine template text
        if custom_prompt is not None:
            if isinstance(custom_prompt, Path):
                template_text = custom_prompt.read_text(encoding="utf-8")
            else:
                template_text = custom_prompt
        else:
            template_text = self._default_template_text()

        template = PromptTemplate(template_text)

        # Determine format text
        format_text = self._default_format_text()

        return template, format_text

    def _default_template_text(self) -> str:
        """Load the default prompt template for the current mode config.

        Simplified: just batch vs item. Pipeline presets override via
        ``scorer_prompt`` key for paper-specific prompts (rlcf, rocketeval).
        """
        if self.mode == "batch":
            return load_template("scoring", "batch")
        return load_template("scoring", "item")

    def _default_format_text(self) -> str:
        """Load the default format instruction for the current mode config."""
        if self.mode == "batch":
            if self.capture_reasoning:
                return load_format("batch_scoring_reasoned")
            return load_format("batch_scoring")
        if self.capture_reasoning:
            return load_format("item_scoring_reasoned")
        return load_format("item_scoring")

    # ── LLM client management ─────────────────────────────────────────────

    def _get_or_create_client(self) -> Any:
        """Get injected client or create one from provider settings."""
        if self._client is not None:
            return self._client
        from ..providers.factory import get_client
        return get_client(
            provider=self._provider,
            api_key=self.api_key,
            base_url=self._base_url,
            model=self.model,
            api_format=self._api_format,
        )

    def _call_model(
        self,
        prompt: str,
        system_prompt: Optional[str] = None,
        response_format: Optional[dict] = None,
    ) -> str:
        """Call the LLM and return the response text.

        Handles fallback from structured output to schema-in-prompt on
        400 errors.
        """
        messages: List[Dict[str, str]] = []
        if system_prompt:
            messages.append({"role": "system", "content": system_prompt})
        messages.append({"role": "user", "content": prompt})

        kwargs: Dict[str, Any] = {}
        if response_format is not None:
            kwargs["response_format"] = response_format
        if self.reasoning_effort is not None:
            kwargs["reasoning_effort"] = self.reasoning_effort

        client = self._get_or_create_client()

        try:
            response = client.chat_completion(
                model=self.model,
                messages=messages,
                temperature=self.temperature,
                max_tokens=self.max_tokens,
                **kwargs,
            )
        except (ValueError, KeyError, TypeError) as e:
            if response_format is not None:
                logger.warning(
                    "Structured output failed (%s), retrying without "
                    "response_format (fallback to schema-in-prompt).",
                    e,
                )
                fallback_kwargs = {
                    k: v for k, v in kwargs.items() if k != "response_format"
                }
                response = client.chat_completion(
                    model=self.model,
                    messages=messages,
                    temperature=self.temperature,
                    max_tokens=self.max_tokens,
                    **fallback_kwargs,
                )
            else:
                raise
        except Exception as e:
            import httpx
            if (
                response_format is not None
                and isinstance(e, httpx.HTTPStatusError)
                and e.response.status_code == 400
            ):
                logger.warning(
                    "Structured output failed (%s), retrying without "
                    "response_format (fallback to schema-in-prompt).",
                    e,
                )
                fallback_kwargs = {
                    k: v for k, v in kwargs.items() if k != "response_format"
                }
                response = client.chat_completion(
                    model=self.model,
                    messages=messages,
                    temperature=self.temperature,
                    max_tokens=self.max_tokens,
                    **fallback_kwargs,
                )
            else:
                raise

        return response["choices"][0]["message"]["content"]

scoring_method property

Backward-compat scoring method string for Score metadata.

prompt_text property

The raw prompt template text.

score(checklist, target, input=None, **kwargs)

Score a target response against a checklist.

Parameters:

Name Type Description Default
checklist Checklist

The checklist to evaluate against.

required
target str

The target text to score.

required
input Optional[str]

Optional input/context (falls back to checklist.input).

None
**kwargs Any

Additional arguments (ignored).

{}

Returns:

Type Description
Score

Score object with item-level and all aggregate scores.

Source code in autochecklist/scorers/base.py
def score(
    self,
    checklist: Checklist,
    target: str,
    input: Optional[str] = None,
    **kwargs: Any,
) -> Score:
    """Score a target response against a checklist.

    Args:
        checklist: The checklist to evaluate against.
        target: The target text to score.
        input: Optional input/context (falls back to checklist.input).
        **kwargs: Additional arguments (ignored).

    Returns:
        Score object with item-level and all aggregate scores.
    """
    if self.mode == "batch":
        return self._score_batch(checklist, target, input)
    else:
        return self._score_items_dispatch(checklist, target, input)

score_batch(checklist, targets, inputs=None, **kwargs)

Score multiple targets sequentially.

Source code in autochecklist/scorers/base.py
def score_batch(
    self,
    checklist: Checklist,
    targets: List[str],
    inputs: Optional[List[str]] = None,
    **kwargs: Any,
) -> List[Score]:
    """Score multiple targets sequentially."""
    results = []
    for i, target in enumerate(targets):
        input_text = inputs[i] if inputs else None
        results.append(self.score(checklist, target, input_text, **kwargs))
    return results

ChecklistRefiner

Bases: ABC

Base class for all checklist refiners.

Refiners take a checklist and improve it through various operations: - Deduplication (merge similar questions) - Filtering (remove low-quality questions) - Selection (choose optimal subset) - Testing (validate discriminativeness)

Source code in autochecklist/refiners/base.py
class ChecklistRefiner(ABC):
    """Base class for all checklist refiners.

    Refiners take a checklist and improve it through various operations:
    - Deduplication (merge similar questions)
    - Filtering (remove low-quality questions)
    - Selection (choose optimal subset)
    - Testing (validate discriminativeness)
    """

    def __init__(
        self,
        model: Optional[str] = None,
        temperature: Optional[float] = None,
        api_key: Optional[str] = None,
        provider: Optional[str] = None,
        base_url: Optional[str] = None,
        client: Any = None,
        api_format: Optional[str] = None,
        reasoning_effort: Optional[str] = None,
    ):
        config = get_config()
        self.model = model or config.generator_model.model_id
        self.temperature = temperature if temperature is not None else config.generator_model.temperature
        self.api_key = api_key
        self._client = client
        self._provider = provider or config.generator_model.provider or "openrouter"
        self._base_url = base_url
        self._api_format = api_format or "chat"
        self.reasoning_effort = reasoning_effort

    @property
    @abstractmethod
    def refiner_name(self) -> str:
        """Return the refiner name (e.g., 'deduplicator', 'tagger')."""
        pass

    @abstractmethod
    def refine(self, checklist: Checklist, **kwargs: Any) -> Checklist:
        """Refine the checklist.

        Args:
            checklist: Input checklist to refine
            **kwargs: Refiner-specific arguments

        Returns:
            Refined checklist
        """
        pass

    def _get_or_create_client(self) -> Any:
        """Get injected client or create one from provider settings."""
        if self._client is not None:
            return self._client
        from ..providers.factory import get_client
        return get_client(
            provider=self._provider,
            api_key=self.api_key,
            base_url=self._base_url,
            model=self.model,
            api_format=self._api_format,
        )

    def _call_model(
        self,
        prompt: str,
        system_prompt: Optional[str] = None,
        response_format: Optional[Dict] = None,
    ) -> str:
        """Call the LLM and return the response text."""
        messages: List[Dict[str, str]] = []
        if system_prompt:
            messages.append({"role": "system", "content": system_prompt})
        messages.append({"role": "user", "content": prompt})

        client = self._get_or_create_client()
        kwargs: Dict[str, Any] = {
            "model": self.model,
            "messages": messages,
            "temperature": self.temperature,
            "max_tokens": 2048,
        }
        if response_format:
            kwargs["response_format"] = response_format
        if self.reasoning_effort is not None:
            kwargs["reasoning_effort"] = self.reasoning_effort

        response = client.chat_completion(**kwargs)
        return response["choices"][0]["message"]["content"]

    def _create_refined_checklist(
        self,
        original: Checklist,
        items: List[ChecklistItem],
        metadata_updates: Optional[Dict[str, Any]] = None,
    ) -> Checklist:
        """Create a new checklist with refined items.

        Args:
            original: Original checklist to base metadata on
            items: New refined items
            metadata_updates: Additional metadata to add

        Returns:
            New Checklist instance
        """
        metadata = dict(original.metadata) if original.metadata else {}
        metadata["refined_by"] = self.refiner_name
        metadata["original_count"] = len(original.items)
        if metadata_updates:
            metadata.update(metadata_updates)

        return Checklist(
            items=items,
            source_method=original.source_method,
            generation_level=original.generation_level,
            input=original.input,
            metadata=metadata,
        )

refiner_name abstractmethod property

Return the refiner name (e.g., 'deduplicator', 'tagger').

refine(checklist, **kwargs) abstractmethod

Refine the checklist.

Parameters:

Name Type Description Default
checklist Checklist

Input checklist to refine

required
**kwargs Any

Refiner-specific arguments

{}

Returns:

Type Description
Checklist

Refined checklist

Source code in autochecklist/refiners/base.py
@abstractmethod
def refine(self, checklist: Checklist, **kwargs: Any) -> Checklist:
    """Refine the checklist.

    Args:
        checklist: Input checklist to refine
        **kwargs: Refiner-specific arguments

    Returns:
        Refined checklist
    """
    pass

Deduplicator

Bases: ChecklistRefiner

Refiner that merges semantically similar checklist questions.

Pipeline: 1. Compute embeddings for all questions 2. Build similarity graph (edge if cosine >= threshold) 3. Find connected components (clusters) 4. Keep isolated nodes (unique questions) as-is 5. Use LLM to merge multi-node clusters into single questions

Source code in autochecklist/refiners/deduplicator.py
class Deduplicator(ChecklistRefiner):
    """Refiner that merges semantically similar checklist questions.

    Pipeline:
    1. Compute embeddings for all questions
    2. Build similarity graph (edge if cosine >= threshold)
    3. Find connected components (clusters)
    4. Keep isolated nodes (unique questions) as-is
    5. Use LLM to merge multi-node clusters into single questions
    """

    def __init__(
        self,
        similarity_threshold: float = 0.85,
        model: Optional[str] = None,
        temperature: Optional[float] = None,
        api_key: Optional[str] = None,
        embedding_api_key: Optional[str] = None,
        custom_prompt: Optional[Union[str, Path]] = None,
        **kwargs,
    ):
        super().__init__(model=model, temperature=temperature, api_key=api_key, **kwargs)
        self.similarity_threshold = similarity_threshold
        self.embedding_api_key = embedding_api_key
        # Load merge prompt template
        if custom_prompt is not None:
            if isinstance(custom_prompt, Path):
                template_str = custom_prompt.read_text(encoding="utf-8")
            else:
                template_str = custom_prompt
        else:
            template_str = load_template("generators/feedback", "merge")
        self._merge_template = PromptTemplate(template_str)

    @property
    def refiner_name(self) -> str:
        return "deduplicator"

    def refine(self, checklist: Checklist, **kwargs: Any) -> Checklist:
        """Deduplicate the checklist by merging similar questions.

        Args:
            checklist: Input checklist to deduplicate

        Returns:
            Checklist with similar questions merged
        """
        if len(checklist.items) <= 1:
            return self._create_refined_checklist(
                checklist,
                list(checklist.items),
                metadata_updates={"clusters_merged": 0},
            )

        # Get questions and compute embeddings
        questions = [item.question for item in checklist.items]
        embeddings = get_embeddings(questions, api_key=self.embedding_api_key)

        # Build similarity graph
        graph, clusters = self._build_similarity_graph(
            checklist.items, embeddings
        )

        # Process clusters
        refined_items: List[ChecklistItem] = []
        clusters_merged = 0

        for cluster_ids in clusters:
            cluster_items = [
                item for item in checklist.items if item.id in cluster_ids
            ]

            if len(cluster_items) == 1:
                # Isolated node - keep as-is
                refined_items.append(cluster_items[0])
            else:
                # Multi-node cluster - merge with LLM
                merged_item = self._merge_cluster(cluster_items)
                refined_items.append(merged_item)
                clusters_merged += 1

        return self._create_refined_checklist(
            checklist,
            refined_items,
            metadata_updates={
                "clusters_merged": clusters_merged,
                "similarity_threshold": self.similarity_threshold,
            },
        )

    def _build_similarity_graph(
        self,
        items: List[ChecklistItem],
        embeddings,
    ) -> Tuple[nx.Graph, List[Set[str]]]:
        """Build similarity graph and find connected components.

        Args:
            items: Checklist items
            embeddings: Numpy array of embeddings

        Returns:
            Tuple of (graph, list of component sets)
        """
        try:
            import networkx as nx
        except ImportError:
            raise ImportError(
                "networkx is required for checklist deduplication. "
                "Install with: pip install 'autochecklist[ml]'"
            ) from None

        G = nx.Graph()

        # Add all items as nodes
        for item in items:
            G.add_node(item.id)

        # Compute similarity matrix and add edges
        similarity_matrix = cosine_similarity(embeddings)

        for i in range(len(items)):
            for j in range(i + 1, len(items)):
                if similarity_matrix[i, j] >= self.similarity_threshold:
                    G.add_edge(items[i].id, items[j].id)

        # Find connected components
        components = list(nx.connected_components(G))

        return G, components

    def _merge_cluster(self, items: List[ChecklistItem]) -> ChecklistItem:
        """Merge a cluster of similar questions into one.

        Args:
            items: List of similar checklist items

        Returns:
            Single merged ChecklistItem
        """
        # Format questions for the prompt
        questions_text = "\n".join(f"- {item.question}" for item in items)

        # Call LLM to merge
        prompt = self._merge_template.format(questions=questions_text)

        response_format = {
            "type": "json_schema",
            "json_schema": {
                "name": "merged_question",
                "strict": True,
                "schema": {
                    "type": "object",
                    "properties": {
                        "question": {"type": "string"}
                    },
                    "required": ["question"],
                    "additionalProperties": False,
                },
            },
        }

        response = self._call_model(prompt, response_format=response_format)

        # Parse response
        try:
            result = json.loads(response)
            merged_question = result["question"]
        except (json.JSONDecodeError, KeyError):
            # Fallback: try to extract question from response
            merged_question = response.strip()
            if merged_question.startswith('"') and merged_question.endswith('"'):
                merged_question = merged_question[1:-1]

        # Create merged item
        # Use first item's ID with -merged suffix (like reference impl)
        merged_id = f"{items[0].id}-merged"

        # Average weights if items have weights
        avg_weight = sum(item.weight for item in items) / len(items)

        return ChecklistItem(
            id=merged_id,
            question=merged_question,
            weight=avg_weight,
            metadata={
                "merged_from": [item.id for item in items],
                "original_questions": [item.question for item in items],
            },
        )

refine(checklist, **kwargs)

Deduplicate the checklist by merging similar questions.

Parameters:

Name Type Description Default
checklist Checklist

Input checklist to deduplicate

required

Returns:

Type Description
Checklist

Checklist with similar questions merged

Source code in autochecklist/refiners/deduplicator.py
def refine(self, checklist: Checklist, **kwargs: Any) -> Checklist:
    """Deduplicate the checklist by merging similar questions.

    Args:
        checklist: Input checklist to deduplicate

    Returns:
        Checklist with similar questions merged
    """
    if len(checklist.items) <= 1:
        return self._create_refined_checklist(
            checklist,
            list(checklist.items),
            metadata_updates={"clusters_merged": 0},
        )

    # Get questions and compute embeddings
    questions = [item.question for item in checklist.items]
    embeddings = get_embeddings(questions, api_key=self.embedding_api_key)

    # Build similarity graph
    graph, clusters = self._build_similarity_graph(
        checklist.items, embeddings
    )

    # Process clusters
    refined_items: List[ChecklistItem] = []
    clusters_merged = 0

    for cluster_ids in clusters:
        cluster_items = [
            item for item in checklist.items if item.id in cluster_ids
        ]

        if len(cluster_items) == 1:
            # Isolated node - keep as-is
            refined_items.append(cluster_items[0])
        else:
            # Multi-node cluster - merge with LLM
            merged_item = self._merge_cluster(cluster_items)
            refined_items.append(merged_item)
            clusters_merged += 1

    return self._create_refined_checklist(
        checklist,
        refined_items,
        metadata_updates={
            "clusters_merged": clusters_merged,
            "similarity_threshold": self.similarity_threshold,
        },
    )

Tagger

Bases: ChecklistRefiner

Refiner that filters checklist items based on applicability and specificity.

Uses LLM (default: gpt-5-mini) with zero-shot CoT to classify each question: - Generally applicable: Can be answered Yes/No for any input (no N/A scenarios) - Section specific: Evaluates single aspect without cross-references

Source code in autochecklist/refiners/tagger.py
class Tagger(ChecklistRefiner):
    """Refiner that filters checklist items based on applicability and specificity.

    Uses LLM (default: gpt-5-mini) with zero-shot CoT to classify each question:
    - Generally applicable: Can be answered Yes/No for any input (no N/A scenarios)
    - Section specific: Evaluates single aspect without cross-references
    """

    def __init__(
        self,
        model: Optional[str] = None,
        temperature: Optional[float] = None,
        api_key: Optional[str] = None,
        custom_prompt: Optional[Union[str, Path]] = None,
        **kwargs,
    ):
        # Use gpt-5-mini as default (instead of o3-mini from paper)
        super().__init__(
            model=model or "openai/gpt-5-mini",
            temperature=temperature,
            api_key=api_key,
            **kwargs,
        )
        # Load tagging prompt template
        if custom_prompt is not None:
            if isinstance(custom_prompt, Path):
                template_str = custom_prompt.read_text(encoding="utf-8")
            else:
                template_str = custom_prompt
        else:
            template_str = load_template("generators/feedback", "tag")
        self._tag_template = PromptTemplate(template_str)

    @property
    def refiner_name(self) -> str:
        return "tagger"

    def refine(self, checklist: Checklist, **kwargs: Any) -> Checklist:
        """Filter checklist items based on tagging criteria.

        Args:
            checklist: Input checklist to filter

        Returns:
            Checklist with only items that pass both criteria
        """
        if len(checklist.items) == 0:
            return self._create_refined_checklist(
                checklist,
                [],
                metadata_updates={
                    "filtered_count": 0,
                    "filtered_items": [],
                },
            )

        passing_items: List[ChecklistItem] = []
        filtered_items: List[Dict[str, Any]] = []

        for item in checklist.items:
            tag_result = self._tag_question(item)

            if tag_result["generally_applicable"] and tag_result["section_specific"]:
                # Question passes - add tagging metadata
                item_with_metadata = ChecklistItem(
                    id=item.id,
                    question=item.question,
                    weight=item.weight,
                    category=item.category,
                    metadata={
                        **(item.metadata or {}),
                        "generally_applicable": True,
                        "section_specific": True,
                        "tag_reasoning": tag_result.get("reasoning", ""),
                    },
                )
                passing_items.append(item_with_metadata)
            else:
                # Question filtered out - track reason
                filtered_items.append({
                    "id": item.id,
                    "question": item.question,
                    "generally_applicable": tag_result["generally_applicable"],
                    "section_specific": tag_result["section_specific"],
                    "reasoning": tag_result.get("reasoning", ""),
                })

        return self._create_refined_checklist(
            checklist,
            passing_items,
            metadata_updates={
                "filtered_count": len(filtered_items),
                "filtered_items": filtered_items,
            },
        )

    def _tag_question(self, item: ChecklistItem) -> Dict[str, Any]:
        """Tag a single question for applicability and specificity.

        Args:
            item: Checklist item to tag

        Returns:
            Dict with keys: generally_applicable, section_specific, reasoning
        """
        prompt = self._tag_template.format(question=item.question)

        response_format = {
            "type": "json_schema",
            "json_schema": {
                "name": "tag_result",
                "strict": True,
                "schema": {
                    "type": "object",
                    "properties": {
                        "reasoning": {"type": "string"},
                        "generally_applicable": {"type": "boolean"},
                        "section_specific": {"type": "boolean"},
                    },
                    "required": ["reasoning", "generally_applicable", "section_specific"],
                    "additionalProperties": False,
                },
            },
        }

        try:
            response = self._call_model(prompt, response_format=response_format)
            result = json.loads(response)
            return {
                "generally_applicable": result.get("generally_applicable", False),
                "section_specific": result.get("section_specific", False),
                "reasoning": result.get("reasoning", ""),
            }
        except (json.JSONDecodeError, KeyError, TypeError):
            # Malformed response - default to filtering out
            return {
                "generally_applicable": False,
                "section_specific": False,
                "reasoning": "Failed to parse LLM response",
            }

refine(checklist, **kwargs)

Filter checklist items based on tagging criteria.

Parameters:

Name Type Description Default
checklist Checklist

Input checklist to filter

required

Returns:

Type Description
Checklist

Checklist with only items that pass both criteria

Source code in autochecklist/refiners/tagger.py
def refine(self, checklist: Checklist, **kwargs: Any) -> Checklist:
    """Filter checklist items based on tagging criteria.

    Args:
        checklist: Input checklist to filter

    Returns:
        Checklist with only items that pass both criteria
    """
    if len(checklist.items) == 0:
        return self._create_refined_checklist(
            checklist,
            [],
            metadata_updates={
                "filtered_count": 0,
                "filtered_items": [],
            },
        )

    passing_items: List[ChecklistItem] = []
    filtered_items: List[Dict[str, Any]] = []

    for item in checklist.items:
        tag_result = self._tag_question(item)

        if tag_result["generally_applicable"] and tag_result["section_specific"]:
            # Question passes - add tagging metadata
            item_with_metadata = ChecklistItem(
                id=item.id,
                question=item.question,
                weight=item.weight,
                category=item.category,
                metadata={
                    **(item.metadata or {}),
                    "generally_applicable": True,
                    "section_specific": True,
                    "tag_reasoning": tag_result.get("reasoning", ""),
                },
            )
            passing_items.append(item_with_metadata)
        else:
            # Question filtered out - track reason
            filtered_items.append({
                "id": item.id,
                "question": item.question,
                "generally_applicable": tag_result["generally_applicable"],
                "section_specific": tag_result["section_specific"],
                "reasoning": tag_result.get("reasoning", ""),
            })

    return self._create_refined_checklist(
        checklist,
        passing_items,
        metadata_updates={
            "filtered_count": len(filtered_items),
            "filtered_items": filtered_items,
        },
    )

UnitTester

Bases: ChecklistRefiner

Refiner that validates questions via unit test rewrites.

Pipeline: 1. For each question, find samples that pass (answer=Yes) 2. LLM rewrites each sample to fail the criterion 3. Score rewritten samples - should get "No" 4. Enforceability rate = proportion of rewrites correctly failing 5. Filter questions below threshold

Source code in autochecklist/refiners/unit_tester.py
class UnitTester(ChecklistRefiner):
    """Refiner that validates questions via unit test rewrites.

    Pipeline:
    1. For each question, find samples that pass (answer=Yes)
    2. LLM rewrites each sample to fail the criterion
    3. Score rewritten samples - should get "No"
    4. Enforceability rate = proportion of rewrites correctly failing
    5. Filter questions below threshold
    """

    def __init__(
        self,
        enforceability_threshold: float = 0.7,
        max_samples: int = 10,
        model: Optional[str] = None,
        scorer_model: Optional[str] = None,
        temperature: Optional[float] = None,
        api_key: Optional[str] = None,
        custom_prompt: Optional[Union[str, Path]] = None,
        **kwargs,
    ):
        super().__init__(model=model, temperature=temperature, api_key=api_key, **kwargs)
        self.enforceability_threshold = enforceability_threshold
        self.max_samples = max_samples
        self.scorer_model = scorer_model or model
        # Load rewrite prompt template
        if custom_prompt is not None:
            if isinstance(custom_prompt, Path):
                template_str = custom_prompt.read_text(encoding="utf-8")
            else:
                template_str = custom_prompt
        else:
            template_str = load_template("generators/feedback", "rewrite_fail")
        self._rewrite_template = PromptTemplate(template_str)

    @property
    def refiner_name(self) -> str:
        return "unit_tester"

    def refine(
        self,
        checklist: Checklist,
        samples: Optional[List[Dict[str, Any]]] = None,
        sample_scores: Optional[Dict[str, Dict[str, str]]] = None,
        raw_samples: Optional[List[Dict[str, Any]]] = None,
        **kwargs: Any,
    ) -> Checklist:
        """Filter checklist based on LLM enforceability.

        Args:
            checklist: Input checklist to validate
            samples: List of sample dicts with 'id' and 'text' keys
            sample_scores: Dict mapping sample_id -> {question_id -> "Yes"/"No"}
            raw_samples: Samples to auto-score when sample_scores not provided.
                         Each dict must have 'id' and 'text' keys.

        Returns:
            Checklist with only enforceable questions
        """
        # Auto-score raw_samples if provided without sample_scores
        if raw_samples and not sample_scores:
            sample_scores = self._auto_score_samples(checklist, raw_samples)
            samples = raw_samples
        else:
            samples = samples or []
            sample_scores = sample_scores or {}

        if len(checklist.items) == 0 or len(samples) == 0:
            return self._create_refined_checklist(
                checklist,
                [],
                metadata_updates={
                    "filtered_count": len(checklist.items),
                    "enforceability_rates": {},
                },
            )

        passing_items: List[ChecklistItem] = []
        enforceability_rates: Dict[str, float] = {}
        filtered_count = 0

        for item in checklist.items:
            # Find samples that pass this question
            passing_samples = self._get_passing_samples(
                item.id, samples, sample_scores
            )

            if len(passing_samples) == 0:
                # No passing samples - can't test enforceability
                filtered_count += 1
                enforceability_rates[item.id] = 0.0
                continue

            # Limit samples
            if len(passing_samples) > self.max_samples:
                random.seed(0)  # Reproducibility
                passing_samples = random.sample(passing_samples, self.max_samples)

            # Test enforceability
            rate = self._compute_enforceability(item, passing_samples)
            enforceability_rates[item.id] = rate

            if rate >= self.enforceability_threshold:
                # Question passes - add enforceability metadata
                item_with_metadata = ChecklistItem(
                    id=item.id,
                    question=item.question,
                    weight=item.weight,
                    category=item.category,
                    metadata={
                        **(item.metadata or {}),
                        "enforceability_rate": rate,
                        "samples_tested": len(passing_samples),
                    },
                )
                passing_items.append(item_with_metadata)
            else:
                filtered_count += 1

        return self._create_refined_checklist(
            checklist,
            passing_items,
            metadata_updates={
                "filtered_count": filtered_count,
                "enforceability_rates": enforceability_rates,
                "enforceability_threshold": self.enforceability_threshold,
            },
        )

    def _get_passing_samples(
        self,
        question_id: str,
        samples: List[Dict[str, Any]],
        sample_scores: Dict[str, Dict[str, str]],
    ) -> List[Dict[str, Any]]:
        """Get samples that pass a specific question."""
        passing = []
        for sample in samples:
            sample_id = sample["id"]
            if sample_id in sample_scores:
                if sample_scores[sample_id].get(question_id) == "Yes":
                    passing.append(sample)
        return passing

    def _compute_enforceability(
        self,
        item: ChecklistItem,
        passing_samples: List[Dict[str, Any]],
    ) -> float:
        """Compute enforceability rate for a question.

        Args:
            item: Checklist item to test
            passing_samples: Samples that pass this question

        Returns:
            Enforceability rate (0.0 to 1.0)
        """
        if len(passing_samples) == 0:
            return 0.0

        correct_failures = 0
        for sample in passing_samples:
            # Rewrite sample to fail
            rewritten = self._rewrite_sample(item.question, sample["text"])

            # Score rewritten sample
            score = self._score_sample(item.id, item.question, rewritten)

            if score == "No":
                correct_failures += 1

        return correct_failures / len(passing_samples)

    def _rewrite_sample(self, question: str, sample_text: str) -> str:
        """Rewrite a passing sample to fail the criterion.

        Args:
            question: The checklist question
            sample_text: The sample text to rewrite

        Returns:
            Rewritten sample that should fail
        """
        prompt = self._rewrite_template.format(
            question=question,
            sample=sample_text,
        )

        response = self._call_model(prompt)
        return response.strip()

    def _score_sample(
        self,
        question_id: str,
        question: str,
        sample_text: str,
    ) -> str:
        """Score a sample against a question.

        Args:
            question_id: ID of the question
            question: The checklist question
            sample_text: The sample to score

        Returns:
            "Yes" or "No"
        """
        prompt = f"""Evaluate the following sample against the criterion.

Question: {question}

Sample:
{sample_text}

Does this sample meet the criterion? Answer only "Yes" or "No"."""

        response = self._call_model(prompt)
        response = response.strip().lower()

        # Normalize response
        if "yes" in response:
            return "Yes"
        elif "no" in response:
            return "No"
        else:
            # Ambiguous - treat as failure (conservative)
            return "No"

    def _auto_score_samples(
        self,
        checklist: Checklist,
        raw_samples: List[Dict[str, Any]],
    ) -> Dict[str, Dict[str, str]]:
        """Score each sample against every checklist question.

        Builds the sample_scores dict that ``refine()`` normally expects as
        a pre-computed input.

        Args:
            checklist: Checklist with items to score against
            raw_samples: Sample dicts with 'id' and 'text' keys

        Returns:
            Dict mapping sample_id -> {question_id -> "Yes"/"No"}
        """
        scores: Dict[str, Dict[str, str]] = {}
        for sample in raw_samples:
            sample_id = sample["id"]
            sample_text = sample["text"]
            scores[sample_id] = {}
            for item in checklist.items:
                result = self._score_sample(item.id, item.question, sample_text)
                scores[sample_id][item.id] = result
        return scores

refine(checklist, samples=None, sample_scores=None, raw_samples=None, **kwargs)

Filter checklist based on LLM enforceability.

Parameters:

Name Type Description Default
checklist Checklist

Input checklist to validate

required
samples Optional[List[Dict[str, Any]]]

List of sample dicts with 'id' and 'text' keys

None
sample_scores Optional[Dict[str, Dict[str, str]]]

Dict mapping sample_id -> {question_id -> "Yes"/"No"}

None
raw_samples Optional[List[Dict[str, Any]]]

Samples to auto-score when sample_scores not provided. Each dict must have 'id' and 'text' keys.

None

Returns:

Type Description
Checklist

Checklist with only enforceable questions

Source code in autochecklist/refiners/unit_tester.py
def refine(
    self,
    checklist: Checklist,
    samples: Optional[List[Dict[str, Any]]] = None,
    sample_scores: Optional[Dict[str, Dict[str, str]]] = None,
    raw_samples: Optional[List[Dict[str, Any]]] = None,
    **kwargs: Any,
) -> Checklist:
    """Filter checklist based on LLM enforceability.

    Args:
        checklist: Input checklist to validate
        samples: List of sample dicts with 'id' and 'text' keys
        sample_scores: Dict mapping sample_id -> {question_id -> "Yes"/"No"}
        raw_samples: Samples to auto-score when sample_scores not provided.
                     Each dict must have 'id' and 'text' keys.

    Returns:
        Checklist with only enforceable questions
    """
    # Auto-score raw_samples if provided without sample_scores
    if raw_samples and not sample_scores:
        sample_scores = self._auto_score_samples(checklist, raw_samples)
        samples = raw_samples
    else:
        samples = samples or []
        sample_scores = sample_scores or {}

    if len(checklist.items) == 0 or len(samples) == 0:
        return self._create_refined_checklist(
            checklist,
            [],
            metadata_updates={
                "filtered_count": len(checklist.items),
                "enforceability_rates": {},
            },
        )

    passing_items: List[ChecklistItem] = []
    enforceability_rates: Dict[str, float] = {}
    filtered_count = 0

    for item in checklist.items:
        # Find samples that pass this question
        passing_samples = self._get_passing_samples(
            item.id, samples, sample_scores
        )

        if len(passing_samples) == 0:
            # No passing samples - can't test enforceability
            filtered_count += 1
            enforceability_rates[item.id] = 0.0
            continue

        # Limit samples
        if len(passing_samples) > self.max_samples:
            random.seed(0)  # Reproducibility
            passing_samples = random.sample(passing_samples, self.max_samples)

        # Test enforceability
        rate = self._compute_enforceability(item, passing_samples)
        enforceability_rates[item.id] = rate

        if rate >= self.enforceability_threshold:
            # Question passes - add enforceability metadata
            item_with_metadata = ChecklistItem(
                id=item.id,
                question=item.question,
                weight=item.weight,
                category=item.category,
                metadata={
                    **(item.metadata or {}),
                    "enforceability_rate": rate,
                    "samples_tested": len(passing_samples),
                },
            )
            passing_items.append(item_with_metadata)
        else:
            filtered_count += 1

    return self._create_refined_checklist(
        checklist,
        passing_items,
        metadata_updates={
            "filtered_count": filtered_count,
            "enforceability_rates": enforceability_rates,
            "enforceability_threshold": self.enforceability_threshold,
        },
    )

Selector

Bases: ChecklistRefiner

Refiner that selects optimal diverse subset via beam search.

Since we lack source feedback mapping, uses embedding diversity only. Beam search explores multiple candidate subsets to find optimal score.

Source code in autochecklist/refiners/selector.py
class Selector(ChecklistRefiner):
    """Refiner that selects optimal diverse subset via beam search.

    Since we lack source feedback mapping, uses embedding diversity only.
    Beam search explores multiple candidate subsets to find optimal score.
    """

    def __init__(
        self,
        max_questions: int = 20,
        beam_width: int = 5,
        length_penalty: float = 0.0005,
        model: Optional[str] = None,
        temperature: Optional[float] = None,
        api_key: Optional[str] = None,
        embedding_api_key: Optional[str] = None,
        # Coverage-aware selection
        observations: Optional[List[str]] = None,
        classifier_model: Optional[str] = None,
        alpha: float = 0.5,
        custom_prompt: Optional[Union[str, Path]] = None,
        **kwargs,
    ):
        super().__init__(model=model, temperature=temperature, api_key=api_key, **kwargs)
        self.max_questions = max_questions
        self.beam_width = beam_width
        self.length_penalty = length_penalty
        self.embedding_api_key = embedding_api_key
        # Coverage
        self.observations = observations
        self.classifier_model = classifier_model or "openai/gpt-4o-mini"
        self.alpha = alpha
        # Populated during refine() when observations are provided
        self._feedback_assignment: Dict[int, Set[int]] = {}
        self._total_feedback_count: int = 0
        # Load classify template
        if custom_prompt is not None:
            if isinstance(custom_prompt, Path):
                template_str = custom_prompt.read_text(encoding="utf-8")
            else:
                template_str = custom_prompt
        else:
            template_str = load_template("generators/feedback", "classify")
        self._classify_template = PromptTemplate(template_str)

    @property
    def refiner_name(self) -> str:
        return "selector"

    def refine(self, checklist: Checklist, **kwargs: Any) -> Checklist:
        """Select optimal diverse subset of questions.

        Args:
            checklist: Input checklist to select from

        Returns:
            Checklist with selected subset
        """
        if len(checklist.items) == 0:
            return self._create_refined_checklist(
                checklist,
                [],
                metadata_updates={
                    "diversity_score": 0.0,
                    "beam_width": self.beam_width,
                },
            )

        # Classify feedback if provided (before beam search)
        if self.observations:
            self._classify_feedback(checklist)

        # If already small enough, return as-is
        if len(checklist.items) <= self.max_questions:
            diversity = self._compute_diversity(
                list(range(len(checklist.items))),
                self._get_similarity_matrix(checklist),
            )
            return self._create_refined_checklist(
                checklist,
                list(checklist.items),
                metadata_updates={
                    "diversity_score": diversity,
                    "beam_width": self.beam_width,
                },
            )

        # Get embeddings and similarity matrix
        similarity_matrix = self._get_similarity_matrix(checklist)

        # Run beam search
        selected_indices, diversity_score = self._beam_search(
            len(checklist.items),
            similarity_matrix,
        )

        # Build selected items with metadata
        selected_items = []
        for order, idx in enumerate(selected_indices):
            item = checklist.items[idx]
            selected_items.append(
                ChecklistItem(
                    id=item.id,
                    question=item.question,
                    weight=item.weight,
                    category=item.category,
                    metadata={
                        **(item.metadata or {}),
                        "selection_order": order,
                    },
                )
            )

        return self._create_refined_checklist(
            checklist,
            selected_items,
            metadata_updates={
                "diversity_score": diversity_score,
                "beam_width": self.beam_width,
                "length_penalty": self.length_penalty,
            },
        )

    def _get_similarity_matrix(self, checklist: Checklist) -> np.ndarray:
        """Compute similarity matrix for checklist questions."""
        questions = [item.question for item in checklist.items]
        embeddings = get_embeddings(questions, api_key=self.embedding_api_key)
        return cosine_similarity(embeddings)

    def _beam_search(
        self,
        n_items: int,
        similarity_matrix: np.ndarray,
    ) -> Tuple[List[int], float]:
        """Run beam search to find optimal subset.

        Args:
            n_items: Total number of items
            similarity_matrix: Pairwise similarity matrix

        Returns:
            Tuple of (selected indices, diversity score)
        """
        # Initialize beam with empty set
        # Each candidate is (frozenset of indices, score)
        beam: List[Tuple[frozenset, float]] = [(frozenset(), 0.0)]

        for step in range(self.max_questions):
            candidates = []

            for current_set, _ in beam:
                # Try adding each available item
                available = set(range(n_items)) - current_set

                for idx in available:
                    new_set = current_set | {idx}
                    score = self._score_subset(list(new_set), similarity_matrix)
                    candidates.append((new_set, score))

            if not candidates:
                break

            # Keep top beam_width candidates
            candidates.sort(key=lambda x: x[1], reverse=True)
            beam = candidates[: self.beam_width]

        # Return best candidate
        if beam:
            best_set, best_score = max(beam, key=lambda x: x[1])
            selected = sorted(best_set)  # Sort for consistent ordering
            diversity = self._compute_diversity(selected, similarity_matrix)
            return selected, diversity
        else:
            return [], 0.0

    def _score_subset(
        self,
        indices: List[int],
        similarity_matrix: np.ndarray,
    ) -> float:
        """Score a subset based on diversity, coverage, and length penalty.

        Without observations: Score = Diversity - λ·k
        With observations:    Score = α·Coverage + (1-α)·Diversity - λ·k
        """
        if len(indices) == 0:
            return 0.0

        diversity = self._compute_diversity(indices, similarity_matrix)
        penalty = self.length_penalty * len(indices)

        if self._feedback_assignment:
            coverage = self._compute_coverage(indices)
            return (
                self.alpha * coverage
                + (1 - self.alpha) * diversity
                - penalty
            )

        return diversity - penalty

    def _compute_diversity(
        self,
        indices: List[int],
        similarity_matrix: np.ndarray,
    ) -> float:
        """Compute diversity score for a subset.

        Diversity = 1 - average pairwise similarity
        """
        if len(indices) <= 1:
            return 1.0  # Single item is maximally diverse

        total_sim = 0.0
        count = 0

        for i in range(len(indices)):
            for j in range(i + 1, len(indices)):
                total_sim += similarity_matrix[indices[i], indices[j]]
                count += 1

        avg_sim = total_sim / count if count > 0 else 0.0
        return 1.0 - avg_sim

    def _classify_feedback(self, checklist: Checklist) -> Dict[int, Set[int]]:
        """Classify each feedback item against the current checklist questions.

        For each feedback item, asks an LLM which questions cover it.
        Builds a mapping from question index to set of feedback indices.

        Args:
            checklist: Current checklist to classify against

        Returns:
            Dict mapping question index -> set of feedback indices it covers
        """
        if not self.observations:
            return {}

        # Format questions as numbered list (0-indexed)
        questions_text = "\n".join(
            f"{i}. {item.question}" for i, item in enumerate(checklist.items)
        )

        assignment: Dict[int, Set[int]] = {
            i: set() for i in range(len(checklist.items))
        }

        for fb_idx, fb_item in enumerate(self.observations):
            prompt = self._classify_template.format(
                questions=questions_text,
                feedback_item=fb_item,
            )
            response = self._call_model(prompt)

            # Parse question numbers from response
            nums = re.findall(r"\d+", response)
            for num_str in nums:
                q_idx = int(num_str)
                if 0 <= q_idx < len(checklist.items):
                    assignment[q_idx].add(fb_idx)

        self._feedback_assignment = assignment
        self._total_feedback_count = len(self.observations)
        return assignment

    def _compute_coverage(self, indices: List[int]) -> float:
        """Compute fraction of feedback items covered by selected questions.

        Args:
            indices: Indices of selected questions

        Returns:
            Coverage score between 0.0 and 1.0
        """
        if self._total_feedback_count == 0:
            return 0.0

        covered: Set[int] = set()
        for idx in indices:
            covered |= self._feedback_assignment.get(idx, set())

        return len(covered) / self._total_feedback_count

refine(checklist, **kwargs)

Select optimal diverse subset of questions.

Parameters:

Name Type Description Default
checklist Checklist

Input checklist to select from

required

Returns:

Type Description
Checklist

Checklist with selected subset

Source code in autochecklist/refiners/selector.py
def refine(self, checklist: Checklist, **kwargs: Any) -> Checklist:
    """Select optimal diverse subset of questions.

    Args:
        checklist: Input checklist to select from

    Returns:
        Checklist with selected subset
    """
    if len(checklist.items) == 0:
        return self._create_refined_checklist(
            checklist,
            [],
            metadata_updates={
                "diversity_score": 0.0,
                "beam_width": self.beam_width,
            },
        )

    # Classify feedback if provided (before beam search)
    if self.observations:
        self._classify_feedback(checklist)

    # If already small enough, return as-is
    if len(checklist.items) <= self.max_questions:
        diversity = self._compute_diversity(
            list(range(len(checklist.items))),
            self._get_similarity_matrix(checklist),
        )
        return self._create_refined_checklist(
            checklist,
            list(checklist.items),
            metadata_updates={
                "diversity_score": diversity,
                "beam_width": self.beam_width,
            },
        )

    # Get embeddings and similarity matrix
    similarity_matrix = self._get_similarity_matrix(checklist)

    # Run beam search
    selected_indices, diversity_score = self._beam_search(
        len(checklist.items),
        similarity_matrix,
    )

    # Build selected items with metadata
    selected_items = []
    for order, idx in enumerate(selected_indices):
        item = checklist.items[idx]
        selected_items.append(
            ChecklistItem(
                id=item.id,
                question=item.question,
                weight=item.weight,
                category=item.category,
                metadata={
                    **(item.metadata or {}),
                    "selection_order": order,
                },
            )
        )

    return self._create_refined_checklist(
        checklist,
        selected_items,
        metadata_updates={
            "diversity_score": diversity_score,
            "beam_width": self.beam_width,
            "length_penalty": self.length_penalty,
        },
    )

ChecklistPipeline

Chains: Generator → Refiners → Scorer.

A composable pipeline for checklist-based evaluation. Three construction modes:

  1. Preset: ChecklistPipeline(from_preset="tick") — resolves generator AND auto-attaches the preset's default scorer.
  2. Explicit components: ChecklistPipeline(generator="tick", scorer="batch") — resolves each component by name. No auto scorer.
  3. Pre-configured instances: ChecklistPipeline(generator=my_gen, scorer=my_scorer)

The :func:pipeline factory is equivalent to mode 1.

Parameters:

Name Type Description Default
generator Optional[Any]

Generator name string (e.g., "tick", "rlcf_direct") or a pre-configured generator instance.

None
refiners Optional[List[Union[str, Any]]]

Optional list of refiner instances or name strings.

None
scorer Optional[Union[str, Any]]

Optional scorer instance or name string (e.g., "batch", "weighted"). Not auto-resolved unless using from_preset.

None
generator_model Optional[str]

Model for the generator (used when generator is a string).

None
scorer_model Optional[str]

Model for the scorer (used when scorer is a string).

None
provider Optional[str]

LLM provider ("openrouter", "openai", "vllm").

None
base_url Optional[str]

Override base URL for the LLM provider.

None
client Any

Injected LLM client instance.

None
api_key Optional[str]

API key for the provider.

None
api_format Optional[str]

API format ("chat" or "responses").

None
generator_kwargs Optional[Dict[str, Any]]

Extra kwargs passed to generator constructor.

None
scorer_kwargs Optional[Dict[str, Any]]

Extra kwargs passed to scorer constructor.

None
from_preset Optional[str]

Pipeline preset name (e.g., "tick"). Resolves generator and auto-attaches default scorer. Mutually exclusive with generator.

None
Example

pipe = ChecklistPipeline(from_preset="tick", ... generator_model="gpt-4o", scorer_model="gpt-4o-mini")

pipe = ChecklistPipeline(generator="tick", scorer="batch")

gen = DirectGenerator(method_name="tick", model="gpt-4o") pipe = ChecklistPipeline(generator=gen, scorer="batch")

Source code in autochecklist/pipeline.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
class ChecklistPipeline:
    """Chains: Generator → Refiners → Scorer.

    A composable pipeline for checklist-based evaluation. Three construction
    modes:

    1. **Preset**: ``ChecklistPipeline(from_preset="tick")`` — resolves
       generator AND auto-attaches the preset's default scorer.
    2. **Explicit components**: ``ChecklistPipeline(generator="tick", scorer="batch")``
       — resolves each component by name. No auto scorer.
    3. **Pre-configured instances**: ``ChecklistPipeline(generator=my_gen, scorer=my_scorer)``

    The :func:`pipeline` factory is equivalent to mode 1.

    Args:
        generator: Generator name string (e.g., ``"tick"``, ``"rlcf_direct"``)
            or a pre-configured generator instance.
        refiners: Optional list of refiner instances or name strings.
        scorer: Optional scorer instance or name string (e.g., ``"batch"``,
            ``"weighted"``). Not auto-resolved unless using ``from_preset``.
        generator_model: Model for the generator (used when generator is a string).
        scorer_model: Model for the scorer (used when scorer is a string).
        provider: LLM provider ("openrouter", "openai", "vllm").
        base_url: Override base URL for the LLM provider.
        client: Injected LLM client instance.
        api_key: API key for the provider.
        api_format: API format ("chat" or "responses").
        generator_kwargs: Extra kwargs passed to generator constructor.
        scorer_kwargs: Extra kwargs passed to scorer constructor.
        from_preset: Pipeline preset name (e.g., ``"tick"``). Resolves generator
            and auto-attaches default scorer. Mutually exclusive with
            ``generator``.

    Example:
        >>> pipe = ChecklistPipeline(from_preset="tick",
        ...     generator_model="gpt-4o", scorer_model="gpt-4o-mini")

        >>> pipe = ChecklistPipeline(generator="tick", scorer="batch")

        >>> gen = DirectGenerator(method_name="tick", model="gpt-4o")
        >>> pipe = ChecklistPipeline(generator=gen, scorer="batch")
    """

    def __init__(
        self,
        generator: Optional[Any] = None,
        refiners: Optional[List[Union[str, Any]]] = None,
        scorer: Optional[Union[str, Any]] = None,
        # Model config
        generator_model: Optional[str] = None,
        scorer_model: Optional[str] = None,
        # Provider config (shared)
        provider: Optional[str] = None,
        base_url: Optional[str] = None,
        client: Any = None,
        api_key: Optional[str] = None,
        api_format: Optional[str] = None,
        reasoning_effort: Optional[str] = None,
        # Component-specific kwargs passthrough
        generator_kwargs: Optional[Dict[str, Any]] = None,
        scorer_kwargs: Optional[Dict[str, Any]] = None,
        # Preset loading (resolves generator + default scorer)
        from_preset: Optional[str] = None,
        # Custom prompt override for generator
        custom_prompt: Optional[Union[str, Path]] = None,
    ):
        # Thread custom_prompt into generator_kwargs
        if custom_prompt is not None:
            generator_kwargs = generator_kwargs or {}
            generator_kwargs["custom_prompt"] = custom_prompt

        # from_preset: resolve generator string AND auto-attach default scorer
        if from_preset is not None:
            if generator is not None:
                raise ValueError("Cannot specify both from_preset and generator")
            generator = from_preset
            # Auto-resolve scorer from preset if not explicitly provided
            if scorer is None:
                scorer = DEFAULT_SCORERS.get(from_preset, "batch")

        # Shared provider kwargs
        provider_kwargs = {k: v for k, v in {
            "provider": provider, "base_url": base_url,
            "client": client, "api_key": api_key, "api_format": api_format,
            "reasoning_effort": reasoning_effort,
        }.items() if v is not None}

        # -- Generator resolution --
        if isinstance(generator, str):
            gen_cls = get_generator(generator)
            gen_kw = {**provider_kwargs, **(generator_kwargs or {})}
            if generator_model:
                gen_kw["model"] = generator_model
            self.generator = gen_cls(**gen_kw)
            self._generator_name = generator
        elif generator is not None:
            self.generator = generator
            self._generator_name = getattr(generator, 'method_name', 'custom')
        else:
            raise ValueError("Must provide generator (name string or instance)")

        # -- Refiner resolution --
        self.refiners = []
        if refiners:
            for refiner in refiners:
                if isinstance(refiner, str):
                    ref_cls = get_refiner(refiner)
                    ref_kw = {**provider_kwargs}
                    self.refiners.append(ref_cls(**ref_kw))
                else:
                    self.refiners.append(refiner)

        # -- Scorer resolution --
        if isinstance(scorer, dict):
            # Config dict from pipeline presets (new style)
            # Resolve scorer_prompt key → custom_prompt before constructing
            scorer = dict(scorer)  # avoid mutating the preset dict
            if "scorer_prompt" in scorer:
                prompt_ref = scorer.pop("scorer_prompt")
                try:
                    from .prompts import load_template
                    scorer["custom_prompt"] = load_template(
                        "scoring", prompt_ref
                    )
                except FileNotFoundError:
                    # Not a built-in name — treat as inline prompt text
                    scorer["custom_prompt"] = prompt_ref
            from .scorers import ChecklistScorer
            scorer_kw = {**provider_kwargs, **scorer, **(scorer_kwargs or {})}
            if scorer_model:
                scorer_kw["model"] = scorer_model
            self.scorer = ChecklistScorer(**scorer_kw)
        elif isinstance(scorer, str):
            scorer_cls = get_scorer(scorer)
            scorer_kw = {**provider_kwargs, **(scorer_kwargs or {})}
            if scorer_model:
                scorer_kw["model"] = scorer_model
            self.scorer = scorer_cls(**scorer_kw)
        elif scorer is not None:
            self.scorer = scorer  # pre-configured instance
        else:
            self.scorer = None  # generator instance without scorer

    @property
    def is_instance_level(self) -> bool:
        """Check if the generator is instance-level."""
        return self.generator.generation_level == "instance"

    @property
    def is_corpus_level(self) -> bool:
        """Check if the generator is corpus-level."""
        return self.generator.generation_level == "corpus"

    def __call__(
        self,
        input: Optional[str] = None,
        target: Optional[str] = None,
        **kwargs: Any,
    ) -> PipelineResult:
        """Run the full pipeline: generate → refine → score.

        For instance-level generators, pass input and target.
        For corpus-level generators, pass the appropriate inputs via kwargs
        (e.g., feedback=..., dimensions=...).

        Args:
            input: Input instruction/query (for instance-level)
            target: Target response to evaluate (optional for generation-only)
            **kwargs: Additional arguments passed to generator

        Returns:
            PipelineResult with checklist and optional score
        """
        checklist = self.generate(input=input, **kwargs)

        score = None
        if target is not None:
            score = self.score(checklist, target, input=input)

        return PipelineResult(checklist=checklist, score=score)

    def generate(
        self,
        input: Optional[str] = None,
        **kwargs: Any,
    ) -> Checklist:
        """Generate a checklist (without scoring).

        Args:
            input: Input instruction/query (for instance-level)
            **kwargs: Additional arguments for generator

        Returns:
            Generated and refined checklist
        """
        if self.generator is None:
            raise RuntimeError("No generator configured. Provide generator (name or instance) to use generate().")

        if self.is_instance_level:
            if input is None:
                raise ValueError("input is required for instance-level generators")
            checklist = self.generator.generate(input=input, **kwargs)
        else:
            checklist = self.generator.generate(**kwargs)

        return self.refine(checklist)

    def refine(self, checklist: Checklist) -> Checklist:
        """Apply refiners to a checklist."""
        for refiner in self.refiners:
            checklist = refiner.refine(checklist)
        return checklist

    def score(
        self,
        checklist: Checklist,
        target: str,
        input: Optional[str] = None,
    ) -> Score:
        """Score a target response against a checklist.

        Args:
            checklist: Checklist to evaluate against
            target: Target response to score
            input: Optional input for context

        Returns:
            Score object
        """
        if self.scorer is None:
            raise RuntimeError(
                "No scorer configured. Provide scorer (name or instance) to use score(). Use pipeline() for automatic scorer defaults."
            )
        return self.scorer.score(
            checklist=checklist,
            target=target,
            input=input,
        )

    def score_group(
        self,
        sub_checklists: Dict[str, Checklist],
        target: str,
        input: Optional[str] = None,
    ) -> "GroupedScore":
        """Score a target against sub-checklists (one per category).

        Typically used with ``checklist.by_category()`` output.

        Args:
            sub_checklists: Dict mapping category name to sub-Checklist
            target: Target response to score
            input: Optional input for context

        Returns:
            GroupedScore with per-category Score objects
        """
        if self.scorer is None:
            raise RuntimeError("No scorer configured.")
        scores = {}
        for name, sub_cl in sub_checklists.items():
            scores[name] = self.score(sub_cl, target, input=input)
        return GroupedScore(scores=scores)

    def score_batch(
        self,
        checklist: Checklist,
        targets: List[str],
        inputs: Optional[List[str]] = None,
        show_progress: bool = False,
        on_progress: Optional[Callable[[int, int], None]] = None,
    ) -> List[Score]:
        """Score multiple targets against a single checklist."""
        return self._run_batch_scoring(
            checklist=checklist,
            targets=targets,
            inputs=inputs,
            show_progress=show_progress,
            on_progress=on_progress,
        )

    def generate_batch(
        self,
        data: Optional[List[Dict[str, Any]]] = None,
        inputs: Optional[List[str]] = None,
        show_progress: bool = False,
        on_progress: Optional[Callable[[int, int], None]] = None,
        output_path: Optional[str] = None,
        overwrite: bool = False,
    ) -> List[Checklist]:
        """Generate checklists for a batch of inputs (no scoring).

        Only works for instance-level generators (1:1 input → checklist).
        For corpus-level generators, call generator.generate() directly.

        Args:
            data: List of dicts with "input" key
            inputs: List of input strings (convenience alternative to data)
            show_progress: Show progress bar
            on_progress: Callback(completed, total) fired after each item
            output_path: Path to JSONL file for incremental writes + resume
            overwrite: If True, delete existing output_path before starting

        Returns:
            List of Checklist objects
        """
        if self.generator is None:
            raise RuntimeError("No generator configured.")
        if not self.is_instance_level:
            raise RuntimeError(
                "generate_batch() only works for instance-level generators. "
                "Corpus-level generators produce one checklist from all inputs — "
                "call generator.generate() directly."
            )

        if data is None:
            if inputs is None:
                raise ValueError("Provide either 'data' or 'inputs'")
            data = [{"input": inp} for inp in inputs]

        total = len(data)

        # Resume logic
        if overwrite and output_path and os.path.exists(output_path):
            os.remove(output_path)
        completed = _load_completed_indices(output_path) if output_path else {}

        checklists: List[Optional[Checklist]] = [None] * total
        for idx, record in completed.items():
            if 0 <= idx < total:
                checklists[idx] = _checklist_from_record(record)

        out_file = open(output_path, "a") if output_path else None
        try:
            for i, item in enumerate(data):
                if i in completed:
                    if on_progress:
                        on_progress(i + 1, total)
                    continue
                checklist = self.generate(input=item.get("input", ""), **{
                    k: v for k, v in item.items() if k not in ("input", "target")
                })
                checklists[i] = checklist
                if out_file:
                    record = _checklist_record(i, item, checklist)
                    out_file.write(json.dumps(record) + "\n")
                    out_file.flush()
                if on_progress:
                    on_progress(i + 1, total)
        finally:
            if out_file:
                out_file.close()

        return [c for c in checklists if c is not None]

    def run_batch(
        self,
        data: Optional[List[Dict[str, Any]]] = None,
        checklist: Optional[Checklist] = None,
        inputs: Optional[List[str]] = None,
        targets: Optional[List[str]] = None,
        show_progress: bool = False,
        on_progress: Optional[Callable[[int, int], None]] = None,
        output_path: Optional[str] = None,
        overwrite: bool = False,
    ) -> BatchResult:
        """Run batch evaluation on a corpus.

        Can be called with either:
        1. data: List of dicts with "input" and "target" keys
        2. inputs + targets: Separate lists

        Args:
            data: List of dicts with input/target pairs
            checklist: Optional shared checklist to use for all evaluations
            inputs: Optional list of inputs (alternative to data)
            targets: Optional list of targets (alternative to data)
            show_progress: Show progress bar
            on_progress: Optional callback
            output_path: Path to JSONL file for incremental writes + resume
            overwrite: If True, delete existing output_path before starting

        Returns:
            BatchResult with scores and aggregated metrics
        """
        if data is None:
            if inputs is None or targets is None:
                raise ValueError("Provide either 'data' or both 'inputs' and 'targets'")
            if len(inputs) != len(targets):
                raise ValueError("inputs and targets must have same length")
            data = [
                {"input": inp, "target": tgt}
                for inp, tgt in zip(inputs, targets)
            ]

        if output_path is not None and checklist is not None:
            raise ValueError(
                "output_path with shared checklist is not supported. "
                "Use score_batch() for shared-checklist scoring."
            )

        if checklist is not None:
            target_list = [d.get("target", "") for d in data]
            input_list = [d.get("input") for d in data]
            scores = self._run_batch_scoring(
                checklist=checklist,
                targets=target_list,
                inputs=input_list,
                show_progress=show_progress,
                on_progress=on_progress,
            )
            return BatchResult(
                scores=scores,
                data=data,
                checklist=checklist,
            )
        else:
            return self._run_batch_generation_and_scoring(
                data=data,
                show_progress=show_progress,
                on_progress=on_progress,
                output_path=output_path,
                overwrite=overwrite,
            )

    def _run_batch_generation_and_scoring(
        self,
        data: List[Dict[str, Any]],
        show_progress: bool = False,
        on_progress: Optional[Callable[[int, int], None]] = None,
        output_path: Optional[str] = None,
        overwrite: bool = False,
    ) -> BatchResult:
        """Generate and score for each item in the batch."""
        total = len(data)

        # Resume logic
        if overwrite and output_path and os.path.exists(output_path):
            os.remove(output_path)
        completed = _load_completed_indices(output_path) if output_path else {}

        gen_pbar = None
        score_pbar = None
        if show_progress:
            try:
                from tqdm import tqdm
                num_completed = len(completed)
                gen_pbar = tqdm(total=total, desc="Generating", initial=num_completed)
                score_pbar = tqdm(total=total, desc="Scoring", initial=num_completed)
            except ImportError:
                pass

        checklists: List[Optional[Checklist]] = [None] * total
        scores: List[Optional[Score]] = [None] * total

        # Load completed results
        for idx, record in completed.items():
            if 0 <= idx < total:
                checklists[idx] = _checklist_from_record(record)
                scores[idx] = _score_from_record(record)

        out_file = open(output_path, "a") if output_path else None
        try:
            for i, item in enumerate(data):
                if i in completed:
                    if on_progress:
                        on_progress(i + 1, total)
                    continue

                inp = item.get("input", "")
                tgt = item.get("target", "")

                cl = self.generate(input=inp, **{k: v for k, v in item.items() if k not in ("input", "target")})
                checklists[i] = cl
                if gen_pbar:
                    gen_pbar.update(1)

                sc = self.score(cl, tgt, input=inp)
                scores[i] = sc
                if score_pbar:
                    score_pbar.update(1)

                if out_file:
                    record = _record_from_result(i, item, cl, sc)
                    out_file.write(json.dumps(record) + "\n")
                    out_file.flush()

                if on_progress:
                    on_progress(i + 1, total)

            return BatchResult(
                scores=[s for s in scores if s is not None],
                data=data,
                checklists=[c for c in checklists if c is not None],
            )
        finally:
            if out_file:
                out_file.close()
            if gen_pbar:
                gen_pbar.close()
            if score_pbar:
                score_pbar.close()

    def _run_batch_scoring(
        self,
        checklist: Checklist,
        targets: List[str],
        inputs: Optional[List[str]] = None,
        show_progress: bool = False,
        on_progress: Optional[Callable[[int, int], None]] = None,
    ) -> List[Score]:
        """Score multiple targets with concurrency support."""
        pbar = None
        total = len(targets)

        if show_progress:
            try:
                from tqdm import tqdm
                pbar = tqdm(total=total, desc="Scoring")
            except ImportError:
                pass

        try:
            scores = []
            for i, target in enumerate(targets):
                inp = inputs[i] if inputs else None
                score = self.score(checklist, target, input=inp)
                scores.append(score)
                if pbar:
                    pbar.update(1)
                if on_progress:
                    on_progress(i + 1, total)
            return scores
        finally:
            if pbar:
                pbar.close()

    def run_batch_from_file(
        self,
        path: str,
        checklist: Optional[Checklist] = None,
        input_key: str = "input",
        target_key: str = "target",
        show_progress: bool = False,
    ) -> BatchResult:
        """Run batch evaluation from a JSONL file."""
        data = []
        with open(path) as f:
            for line in f:
                obj = json.loads(line)
                data.append({
                    "input": obj.get(input_key, ""),
                    "target": obj.get(target_key, ""),
                })

        return self.run_batch(
            data=data,
            checklist=checklist,
            show_progress=show_progress,
        )

is_instance_level property

Check if the generator is instance-level.

is_corpus_level property

Check if the generator is corpus-level.

__call__(input=None, target=None, **kwargs)

Run the full pipeline: generate → refine → score.

For instance-level generators, pass input and target. For corpus-level generators, pass the appropriate inputs via kwargs (e.g., feedback=..., dimensions=...).

Parameters:

Name Type Description Default
input Optional[str]

Input instruction/query (for instance-level)

None
target Optional[str]

Target response to evaluate (optional for generation-only)

None
**kwargs Any

Additional arguments passed to generator

{}

Returns:

Type Description
PipelineResult

PipelineResult with checklist and optional score

Source code in autochecklist/pipeline.py
def __call__(
    self,
    input: Optional[str] = None,
    target: Optional[str] = None,
    **kwargs: Any,
) -> PipelineResult:
    """Run the full pipeline: generate → refine → score.

    For instance-level generators, pass input and target.
    For corpus-level generators, pass the appropriate inputs via kwargs
    (e.g., feedback=..., dimensions=...).

    Args:
        input: Input instruction/query (for instance-level)
        target: Target response to evaluate (optional for generation-only)
        **kwargs: Additional arguments passed to generator

    Returns:
        PipelineResult with checklist and optional score
    """
    checklist = self.generate(input=input, **kwargs)

    score = None
    if target is not None:
        score = self.score(checklist, target, input=input)

    return PipelineResult(checklist=checklist, score=score)

generate(input=None, **kwargs)

Generate a checklist (without scoring).

Parameters:

Name Type Description Default
input Optional[str]

Input instruction/query (for instance-level)

None
**kwargs Any

Additional arguments for generator

{}

Returns:

Type Description
Checklist

Generated and refined checklist

Source code in autochecklist/pipeline.py
def generate(
    self,
    input: Optional[str] = None,
    **kwargs: Any,
) -> Checklist:
    """Generate a checklist (without scoring).

    Args:
        input: Input instruction/query (for instance-level)
        **kwargs: Additional arguments for generator

    Returns:
        Generated and refined checklist
    """
    if self.generator is None:
        raise RuntimeError("No generator configured. Provide generator (name or instance) to use generate().")

    if self.is_instance_level:
        if input is None:
            raise ValueError("input is required for instance-level generators")
        checklist = self.generator.generate(input=input, **kwargs)
    else:
        checklist = self.generator.generate(**kwargs)

    return self.refine(checklist)

refine(checklist)

Apply refiners to a checklist.

Source code in autochecklist/pipeline.py
def refine(self, checklist: Checklist) -> Checklist:
    """Apply refiners to a checklist."""
    for refiner in self.refiners:
        checklist = refiner.refine(checklist)
    return checklist

score(checklist, target, input=None)

Score a target response against a checklist.

Parameters:

Name Type Description Default
checklist Checklist

Checklist to evaluate against

required
target str

Target response to score

required
input Optional[str]

Optional input for context

None

Returns:

Type Description
Score

Score object

Source code in autochecklist/pipeline.py
def score(
    self,
    checklist: Checklist,
    target: str,
    input: Optional[str] = None,
) -> Score:
    """Score a target response against a checklist.

    Args:
        checklist: Checklist to evaluate against
        target: Target response to score
        input: Optional input for context

    Returns:
        Score object
    """
    if self.scorer is None:
        raise RuntimeError(
            "No scorer configured. Provide scorer (name or instance) to use score(). Use pipeline() for automatic scorer defaults."
        )
    return self.scorer.score(
        checklist=checklist,
        target=target,
        input=input,
    )

score_group(sub_checklists, target, input=None)

Score a target against sub-checklists (one per category).

Typically used with checklist.by_category() output.

Parameters:

Name Type Description Default
sub_checklists Dict[str, Checklist]

Dict mapping category name to sub-Checklist

required
target str

Target response to score

required
input Optional[str]

Optional input for context

None

Returns:

Type Description
GroupedScore

GroupedScore with per-category Score objects

Source code in autochecklist/pipeline.py
def score_group(
    self,
    sub_checklists: Dict[str, Checklist],
    target: str,
    input: Optional[str] = None,
) -> "GroupedScore":
    """Score a target against sub-checklists (one per category).

    Typically used with ``checklist.by_category()`` output.

    Args:
        sub_checklists: Dict mapping category name to sub-Checklist
        target: Target response to score
        input: Optional input for context

    Returns:
        GroupedScore with per-category Score objects
    """
    if self.scorer is None:
        raise RuntimeError("No scorer configured.")
    scores = {}
    for name, sub_cl in sub_checklists.items():
        scores[name] = self.score(sub_cl, target, input=input)
    return GroupedScore(scores=scores)

score_batch(checklist, targets, inputs=None, show_progress=False, on_progress=None)

Score multiple targets against a single checklist.

Source code in autochecklist/pipeline.py
def score_batch(
    self,
    checklist: Checklist,
    targets: List[str],
    inputs: Optional[List[str]] = None,
    show_progress: bool = False,
    on_progress: Optional[Callable[[int, int], None]] = None,
) -> List[Score]:
    """Score multiple targets against a single checklist."""
    return self._run_batch_scoring(
        checklist=checklist,
        targets=targets,
        inputs=inputs,
        show_progress=show_progress,
        on_progress=on_progress,
    )

generate_batch(data=None, inputs=None, show_progress=False, on_progress=None, output_path=None, overwrite=False)

Generate checklists for a batch of inputs (no scoring).

Only works for instance-level generators (1:1 input → checklist). For corpus-level generators, call generator.generate() directly.

Parameters:

Name Type Description Default
data Optional[List[Dict[str, Any]]]

List of dicts with "input" key

None
inputs Optional[List[str]]

List of input strings (convenience alternative to data)

None
show_progress bool

Show progress bar

False
on_progress Optional[Callable[[int, int], None]]

Callback(completed, total) fired after each item

None
output_path Optional[str]

Path to JSONL file for incremental writes + resume

None
overwrite bool

If True, delete existing output_path before starting

False

Returns:

Type Description
List[Checklist]

List of Checklist objects

Source code in autochecklist/pipeline.py
def generate_batch(
    self,
    data: Optional[List[Dict[str, Any]]] = None,
    inputs: Optional[List[str]] = None,
    show_progress: bool = False,
    on_progress: Optional[Callable[[int, int], None]] = None,
    output_path: Optional[str] = None,
    overwrite: bool = False,
) -> List[Checklist]:
    """Generate checklists for a batch of inputs (no scoring).

    Only works for instance-level generators (1:1 input → checklist).
    For corpus-level generators, call generator.generate() directly.

    Args:
        data: List of dicts with "input" key
        inputs: List of input strings (convenience alternative to data)
        show_progress: Show progress bar
        on_progress: Callback(completed, total) fired after each item
        output_path: Path to JSONL file for incremental writes + resume
        overwrite: If True, delete existing output_path before starting

    Returns:
        List of Checklist objects
    """
    if self.generator is None:
        raise RuntimeError("No generator configured.")
    if not self.is_instance_level:
        raise RuntimeError(
            "generate_batch() only works for instance-level generators. "
            "Corpus-level generators produce one checklist from all inputs — "
            "call generator.generate() directly."
        )

    if data is None:
        if inputs is None:
            raise ValueError("Provide either 'data' or 'inputs'")
        data = [{"input": inp} for inp in inputs]

    total = len(data)

    # Resume logic
    if overwrite and output_path and os.path.exists(output_path):
        os.remove(output_path)
    completed = _load_completed_indices(output_path) if output_path else {}

    checklists: List[Optional[Checklist]] = [None] * total
    for idx, record in completed.items():
        if 0 <= idx < total:
            checklists[idx] = _checklist_from_record(record)

    out_file = open(output_path, "a") if output_path else None
    try:
        for i, item in enumerate(data):
            if i in completed:
                if on_progress:
                    on_progress(i + 1, total)
                continue
            checklist = self.generate(input=item.get("input", ""), **{
                k: v for k, v in item.items() if k not in ("input", "target")
            })
            checklists[i] = checklist
            if out_file:
                record = _checklist_record(i, item, checklist)
                out_file.write(json.dumps(record) + "\n")
                out_file.flush()
            if on_progress:
                on_progress(i + 1, total)
    finally:
        if out_file:
            out_file.close()

    return [c for c in checklists if c is not None]

run_batch(data=None, checklist=None, inputs=None, targets=None, show_progress=False, on_progress=None, output_path=None, overwrite=False)

Run batch evaluation on a corpus.

Can be called with either: 1. data: List of dicts with "input" and "target" keys 2. inputs + targets: Separate lists

Parameters:

Name Type Description Default
data Optional[List[Dict[str, Any]]]

List of dicts with input/target pairs

None
checklist Optional[Checklist]

Optional shared checklist to use for all evaluations

None
inputs Optional[List[str]]

Optional list of inputs (alternative to data)

None
targets Optional[List[str]]

Optional list of targets (alternative to data)

None
show_progress bool

Show progress bar

False
on_progress Optional[Callable[[int, int], None]]

Optional callback

None
output_path Optional[str]

Path to JSONL file for incremental writes + resume

None
overwrite bool

If True, delete existing output_path before starting

False

Returns:

Type Description
BatchResult

BatchResult with scores and aggregated metrics

Source code in autochecklist/pipeline.py
def run_batch(
    self,
    data: Optional[List[Dict[str, Any]]] = None,
    checklist: Optional[Checklist] = None,
    inputs: Optional[List[str]] = None,
    targets: Optional[List[str]] = None,
    show_progress: bool = False,
    on_progress: Optional[Callable[[int, int], None]] = None,
    output_path: Optional[str] = None,
    overwrite: bool = False,
) -> BatchResult:
    """Run batch evaluation on a corpus.

    Can be called with either:
    1. data: List of dicts with "input" and "target" keys
    2. inputs + targets: Separate lists

    Args:
        data: List of dicts with input/target pairs
        checklist: Optional shared checklist to use for all evaluations
        inputs: Optional list of inputs (alternative to data)
        targets: Optional list of targets (alternative to data)
        show_progress: Show progress bar
        on_progress: Optional callback
        output_path: Path to JSONL file for incremental writes + resume
        overwrite: If True, delete existing output_path before starting

    Returns:
        BatchResult with scores and aggregated metrics
    """
    if data is None:
        if inputs is None or targets is None:
            raise ValueError("Provide either 'data' or both 'inputs' and 'targets'")
        if len(inputs) != len(targets):
            raise ValueError("inputs and targets must have same length")
        data = [
            {"input": inp, "target": tgt}
            for inp, tgt in zip(inputs, targets)
        ]

    if output_path is not None and checklist is not None:
        raise ValueError(
            "output_path with shared checklist is not supported. "
            "Use score_batch() for shared-checklist scoring."
        )

    if checklist is not None:
        target_list = [d.get("target", "") for d in data]
        input_list = [d.get("input") for d in data]
        scores = self._run_batch_scoring(
            checklist=checklist,
            targets=target_list,
            inputs=input_list,
            show_progress=show_progress,
            on_progress=on_progress,
        )
        return BatchResult(
            scores=scores,
            data=data,
            checklist=checklist,
        )
    else:
        return self._run_batch_generation_and_scoring(
            data=data,
            show_progress=show_progress,
            on_progress=on_progress,
            output_path=output_path,
            overwrite=overwrite,
        )

run_batch_from_file(path, checklist=None, input_key='input', target_key='target', show_progress=False)

Run batch evaluation from a JSONL file.

Source code in autochecklist/pipeline.py
def run_batch_from_file(
    self,
    path: str,
    checklist: Optional[Checklist] = None,
    input_key: str = "input",
    target_key: str = "target",
    show_progress: bool = False,
) -> BatchResult:
    """Run batch evaluation from a JSONL file."""
    data = []
    with open(path) as f:
        for line in f:
            obj = json.loads(line)
            data.append({
                "input": obj.get(input_key, ""),
                "target": obj.get(target_key, ""),
            })

    return self.run_batch(
        data=data,
        checklist=checklist,
        show_progress=show_progress,
    )

PipelineResult dataclass

Result from a single pipeline execution.

Attributes:

Name Type Description
checklist Checklist

Generated (and optionally refined) checklist

score Optional[Score]

Score object if target was provided, None otherwise

Source code in autochecklist/pipeline.py
@dataclass
class PipelineResult:
    """Result from a single pipeline execution.

    Attributes:
        checklist: Generated (and optionally refined) checklist
        score: Score object if target was provided, None otherwise
    """
    checklist: Checklist
    score: Optional[Score] = None

    @property
    def pass_rate(self) -> Optional[float]:
        """Shortcut to score.pass_rate."""
        return self.score.pass_rate if self.score else None

    @property
    def weighted_score(self) -> Optional[float]:
        """Shortcut to score.weighted_score."""
        return self.score.weighted_score if self.score else None

    @property
    def normalized_score(self) -> Optional[float]:
        """Shortcut to score.normalized_score."""
        return self.score.normalized_score if self.score else None

pass_rate property

Shortcut to score.pass_rate.

weighted_score property

Shortcut to score.weighted_score.

normalized_score property

Shortcut to score.normalized_score.

BatchResult dataclass

Result from batch corpus evaluation.

Attributes:

Name Type Description
checklist Optional[Checklist]

The checklist used for evaluation (shared if provided, otherwise each score references its own checklist)

scores List[Score]

List of Score objects, one per input

data List[Dict[str, Any]]

Original input data

checklists List[Checklist]

Individual checklists when not using shared checklist

Source code in autochecklist/pipeline.py
@dataclass
class BatchResult:
    """Result from batch corpus evaluation.

    Attributes:
        checklist: The checklist used for evaluation (shared if provided,
            otherwise each score references its own checklist)
        scores: List of Score objects, one per input
        data: Original input data
        checklists: Individual checklists when not using shared checklist
    """
    scores: List[Score]
    data: List[Dict[str, Any]]
    checklist: Optional[Checklist] = None
    checklists: List[Checklist] = field(default_factory=list)

    @property
    def macro_pass_rate(self) -> float:
        """Macro-averaged pass rate across all scored examples.

        Computes pass_rate for each example independently, then averages.
        Each example contributes equally regardless of checklist size.

        Example: If example A scores 2/4 (0.5) and example B scores 3/3 (1.0),
        macro_pass_rate = (0.5 + 1.0) / 2 = 0.75
        """
        if not self.scores:
            return 0.0
        return sum(s.pass_rate for s in self.scores) / len(self.scores)

    @property
    def micro_pass_rate(self) -> float:
        """Micro-averaged pass rate (DFPR: Decomposed Requirements Following Ratio).

        Pools all checklist items across all examples into a single count.
        Examples with more checklist items have proportionally more influence.

        Example: If example A scores 2/4 and example B scores 3/3,
        micro_pass_rate = (2 + 3) / (4 + 3) = 5/7 ≈ 0.714
        """
        total_yes = 0
        total = 0
        for score in self.scores:
            for item_score in score.item_scores:
                total += 1
                if item_score.answer == ChecklistItemAnswer.YES:
                    total_yes += 1
        return total_yes / total if total > 0 else 0.0

    @property
    def mean_score(self) -> float:
        """Mean of Score.primary_score across all examples.

        Respects each Score's primary_metric — averages weighted_score
        for weighted pipelines, normalized_score for normalized, pass_rate for pass.
        """
        if not self.scores:
            return 0.0
        return sum(s.primary_score for s in self.scores) / len(self.scores)

    def per_category_pass_rates(self) -> List[Dict[str, float]]:
        """Compute per-category pass rates for each example.

        Uses the checklist(s) to map item IDs to categories, then computes
        pass rates per category for each scored example.

        Returns:
            List of dicts, one per example, mapping category -> pass_rate
        """
        results = []
        for i, score in enumerate(self.scores):
            # Get the checklist for this example
            if self.checklist is not None:
                cl = self.checklist
            elif i < len(self.checklists):
                cl = self.checklists[i]
            else:
                results.append({})
                continue

            # Build item_id -> category mapping
            id_to_cat = {item.id: (item.category or "ungrouped") for item in cl.items}

            # Group item scores by category
            cat_yes: Dict[str, int] = {}
            cat_total: Dict[str, int] = {}
            for item_score in score.item_scores:
                cat = id_to_cat.get(item_score.item_id, "ungrouped")
                cat_total[cat] = cat_total.get(cat, 0) + 1
                if item_score.answer == ChecklistItemAnswer.YES:
                    cat_yes[cat] = cat_yes.get(cat, 0) + 1

            rates = {}
            for cat, total in cat_total.items():
                rates[cat] = cat_yes.get(cat, 0) / total if total > 0 else 0.0
            results.append(rates)

        return results

    def to_dataframe(self) -> "pd.DataFrame":
        """Export results to pandas DataFrame."""
        try:
            import pandas as pd
        except ImportError:
            raise ImportError("pandas is required for to_dataframe(). Install with: pip install pandas")

        rows = []
        for i, (item, score) in enumerate(zip(self.data, self.scores)):
            row = {
                "index": i,
                "input": item.get("input", ""),
                "target": item.get("target", ""),
                "pass_rate": score.pass_rate,
            }
            if score.weighted_score is not None:
                row["weighted_score"] = score.weighted_score
            if score.normalized_score is not None:
                row["normalized_score"] = score.normalized_score

            for item_score in score.item_scores:
                row[f"item_{item_score.item_id}"] = item_score.answer.value

            rows.append(row)

        return pd.DataFrame(rows)

    def to_jsonl(self, path: str) -> None:
        """Export results to JSONL file."""
        with open(path, "w") as f:
            for item, score in zip(self.data, self.scores):
                record = {
                    "input": item.get("input", ""),
                    "target": item.get("target", ""),
                    "pass_rate": score.pass_rate,
                    "item_scores": [
                        {
                            "item_id": s.item_id,
                            "answer": s.answer.value,
                            "reasoning": s.reasoning,
                        }
                        for s in score.item_scores
                    ],
                }
                if score.weighted_score is not None:
                    record["weighted_score"] = score.weighted_score
                if score.normalized_score is not None:
                    record["normalized_score"] = score.normalized_score
                f.write(json.dumps(record) + "\n")

macro_pass_rate property

Macro-averaged pass rate across all scored examples.

Computes pass_rate for each example independently, then averages. Each example contributes equally regardless of checklist size.

Example: If example A scores 2/4 (0.5) and example B scores 3/3 (1.0), macro_pass_rate = (0.5 + 1.0) / 2 = 0.75

micro_pass_rate property

Micro-averaged pass rate (DFPR: Decomposed Requirements Following Ratio).

Pools all checklist items across all examples into a single count. Examples with more checklist items have proportionally more influence.

Example: If example A scores 2/4 and example B scores 3/3, micro_pass_rate = (2 + 3) / (4 + 3) = 5/7 ≈ 0.714

mean_score property

Mean of Score.primary_score across all examples.

Respects each Score's primary_metric — averages weighted_score for weighted pipelines, normalized_score for normalized, pass_rate for pass.

per_category_pass_rates()

Compute per-category pass rates for each example.

Uses the checklist(s) to map item IDs to categories, then computes pass rates per category for each scored example.

Returns:

Type Description
List[Dict[str, float]]

List of dicts, one per example, mapping category -> pass_rate

Source code in autochecklist/pipeline.py
def per_category_pass_rates(self) -> List[Dict[str, float]]:
    """Compute per-category pass rates for each example.

    Uses the checklist(s) to map item IDs to categories, then computes
    pass rates per category for each scored example.

    Returns:
        List of dicts, one per example, mapping category -> pass_rate
    """
    results = []
    for i, score in enumerate(self.scores):
        # Get the checklist for this example
        if self.checklist is not None:
            cl = self.checklist
        elif i < len(self.checklists):
            cl = self.checklists[i]
        else:
            results.append({})
            continue

        # Build item_id -> category mapping
        id_to_cat = {item.id: (item.category or "ungrouped") for item in cl.items}

        # Group item scores by category
        cat_yes: Dict[str, int] = {}
        cat_total: Dict[str, int] = {}
        for item_score in score.item_scores:
            cat = id_to_cat.get(item_score.item_id, "ungrouped")
            cat_total[cat] = cat_total.get(cat, 0) + 1
            if item_score.answer == ChecklistItemAnswer.YES:
                cat_yes[cat] = cat_yes.get(cat, 0) + 1

        rates = {}
        for cat, total in cat_total.items():
            rates[cat] = cat_yes.get(cat, 0) / total if total > 0 else 0.0
        results.append(rates)

    return results

to_dataframe()

Export results to pandas DataFrame.

Source code in autochecklist/pipeline.py
def to_dataframe(self) -> "pd.DataFrame":
    """Export results to pandas DataFrame."""
    try:
        import pandas as pd
    except ImportError:
        raise ImportError("pandas is required for to_dataframe(). Install with: pip install pandas")

    rows = []
    for i, (item, score) in enumerate(zip(self.data, self.scores)):
        row = {
            "index": i,
            "input": item.get("input", ""),
            "target": item.get("target", ""),
            "pass_rate": score.pass_rate,
        }
        if score.weighted_score is not None:
            row["weighted_score"] = score.weighted_score
        if score.normalized_score is not None:
            row["normalized_score"] = score.normalized_score

        for item_score in score.item_scores:
            row[f"item_{item_score.item_id}"] = item_score.answer.value

        rows.append(row)

    return pd.DataFrame(rows)

to_jsonl(path)

Export results to JSONL file.

Source code in autochecklist/pipeline.py
def to_jsonl(self, path: str) -> None:
    """Export results to JSONL file."""
    with open(path, "w") as f:
        for item, score in zip(self.data, self.scores):
            record = {
                "input": item.get("input", ""),
                "target": item.get("target", ""),
                "pass_rate": score.pass_rate,
                "item_scores": [
                    {
                        "item_id": s.item_id,
                        "answer": s.answer.value,
                        "reasoning": s.reasoning,
                    }
                    for s in score.item_scores
                ],
            }
            if score.weighted_score is not None:
                record["weighted_score"] = score.weighted_score
            if score.normalized_score is not None:
                record["normalized_score"] = score.normalized_score
            f.write(json.dumps(record) + "\n")

LLMClient

Bases: Protocol

Protocol that all LLM providers must satisfy.

Returns OpenAI-format dicts everywhere so existing parsing code works unchanged regardless of provider.

Source code in autochecklist/providers/base.py
@runtime_checkable
class LLMClient(Protocol):
    """Protocol that all LLM providers must satisfy.

    Returns OpenAI-format dicts everywhere so existing parsing code
    works unchanged regardless of provider.
    """

    def chat_completion(
        self,
        model: str,
        messages: List[Dict[str, str]],
        temperature: float = 0.7,
        max_tokens: int = 2048,
        **kwargs: Any,
    ) -> Dict[str, Any]: ...

    def get_logprobs(
        self,
        model: str,
        messages: List[Dict[str, str]],
        **kwargs: Any,
    ) -> Dict[str, float]: ...

    def supports_logprobs(self, model: str) -> bool: ...

    def batch_completions(
        self,
        requests: List[Dict[str, Any]],
        concurrency: int = 5,
        progress_callback: Optional[Callable[[int], None]] = None,
    ) -> List[Dict[str, Any]]: ...

    def close(self) -> None: ...

    def __enter__(self) -> "LLMClient": ...

    def __exit__(self, *args: Any) -> None: ...

LLMHTTPClient

HTTP client for OpenAI-compatible LLM APIs.

Works with OpenRouter, OpenAI, and vLLM server mode. Supports both Chat Completions and Responses API formats.

Source code in autochecklist/providers/http_client.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
class LLMHTTPClient:
    """HTTP client for OpenAI-compatible LLM APIs.

    Works with OpenRouter, OpenAI, and vLLM server mode. Supports both
    Chat Completions and Responses API formats.
    """

    # Class-level cache for model capabilities, keyed by (provider, base_url)
    _models_cache: Dict[tuple, List[Dict[str, Any]]] = {}
    _models_cache_time: Dict[tuple, float] = {}
    _CACHE_TTL = 3600  # 1 hour

    def __init__(
        self,
        provider: str = "openrouter",
        api_key: Optional[str] = None,
        base_url: Optional[str] = None,
        timeout: int = 60,
        api_format: str = "chat",
    ):
        config = get_provider_config(provider, base_url=base_url)
        self.provider = provider
        self.api_format = api_format

        # Resolve base URL
        self.base_url = base_url or config.base_url or ""

        # Resolve API key: explicit > env var > config
        self.api_key = self._resolve_api_key(api_key, config)

        self.timeout = timeout
        self._provider_config = config

        # Build headers
        headers = {
            "Content-Type": "application/json",
            **config.default_headers,
        }
        if self.api_key:
            headers["Authorization"] = f"Bearer {self.api_key}"

        self._client = httpx.Client(
            base_url=self.base_url,
            headers=headers,
            timeout=self.timeout,
        )

    def _resolve_api_key(
        self,
        explicit_key: Optional[str],
        config: Any,
    ) -> Optional[str]:
        """Resolve API key from explicit param, env var, or global config."""
        if explicit_key:
            return explicit_key

        # Try provider-specific env var
        if config.api_key_env_var:
            env_key = os.getenv(config.api_key_env_var)
            if env_key:
                return env_key

        # Try global config for backward compat
        if self.provider == "openrouter":
            from ..config import get_config
            cfg = get_config()
            if cfg.openrouter_api_key:
                return cfg.openrouter_api_key

        if config.requires_api_key:
            env_var = config.api_key_env_var or "API_KEY"
            raise ValueError(
                f"{self.provider.title()} API key required. "
                f"Set {env_var} env var or pass api_key parameter."
            )

        return None

    def close(self) -> None:
        """Close the HTTP client."""
        self._client.close()

    def __enter__(self) -> "LLMHTTPClient":
        return self

    def __exit__(self, *args: Any) -> None:
        self.close()

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=1, max=10),
        retry=retry_if_exception(_is_retryable),
    )
    def chat_completion(
        self,
        model: str,
        messages: List[Dict[str, str]],
        temperature: float = 0.7,
        max_tokens: int = 2048,
        **kwargs: Any,
    ) -> Dict[str, Any]:
        """Make a chat completion request.

        For api_format="responses", translates to/from the Responses API format.
        Always returns normalized OpenAI Chat Completions format.
        """
        model = _normalize_model_name(model, self.provider)

        if self.api_format == "responses":
            return self._chat_completion_responses(
                model, messages, temperature, max_tokens, **kwargs
            )

        # Reasoning models require max_completion_tokens and only support temperature=1
        is_reasoning = _is_reasoning_model(model)
        token_key = "max_completion_tokens" if is_reasoning else "max_tokens"
        body = {
            "model": model,
            "messages": messages,
            token_key: max_tokens,
            **kwargs,
        }
        if not is_reasoning:
            body["temperature"] = temperature
        response = self._client.post("/chat/completions", json=body)
        _raise_with_detail(response)
        raw = response.json()
        return self._normalize_response(raw)

    def _chat_completion_responses(
        self,
        model: str,
        messages: List[Dict[str, str]],
        temperature: float = 0.7,
        max_tokens: int = 2048,
        **kwargs: Any,
    ) -> Dict[str, Any]:
        """Make a Responses API request and normalize to Chat Completions format."""
        # Build Responses API request body
        body: Dict[str, Any] = {
            "model": model,
            "input": messages,
            "max_output_tokens": max_tokens,
        }
        if not _is_reasoning_model(model):
            body["temperature"] = temperature

        # Handle response_format for Responses API
        # Chat Completions uses: {"response_format": {"type": "json_schema", "json_schema": {"name": ..., "schema": ...}}}
        # Responses API uses:    {"text": {"format": {"type": "json_schema", "name": ..., "schema": ...}}}
        response_format = kwargs.pop("response_format", None)
        if response_format is not None:
            if (
                isinstance(response_format, dict)
                and response_format.get("type") == "json_schema"
                and "json_schema" in response_format
            ):
                # Flatten: unwrap json_schema wrapper for Responses API
                inner = response_format["json_schema"]
                body["text"] = {"format": {
                    "type": "json_schema",
                    **inner,
                }}
            else:
                body["text"] = {"format": response_format}

        # Map Chat Completions kwargs to Responses API equivalents
        if kwargs.get("logprobs"):
            if "text" not in body:
                body["text"] = {"format": {"type": "text"}}
            body["include"] = ["message.output_text.logprobs"]
            # Responses API uses top_logprobs at request level
            if "top_logprobs" in kwargs:
                body["top_logprobs"] = kwargs.pop("top_logprobs")
            kwargs.pop("logprobs", None)

        # Pass through remaining kwargs
        for k, v in kwargs.items():
            if k not in ("max_tokens",):
                body[k] = v

        response = self._client.post("/responses", json=body)
        _raise_with_detail(response)
        raw = response.json()
        return self._normalize_response(raw)

    def _normalize_response(self, raw: Dict[str, Any]) -> Dict[str, Any]:
        """Normalize response to Chat Completions format.

        Handles both Chat Completions (passthrough) and Responses API
        (translation) formats.
        """
        # Chat Completions format — already has "choices"
        if "choices" in raw:
            return raw

        # Responses API format — has "output"
        if "output" in raw:
            return self._normalize_responses_api(raw)

        # Unknown format — return as-is
        return raw

    def _normalize_responses_api(self, raw: Dict[str, Any]) -> Dict[str, Any]:
        """Translate Responses API format to Chat Completions format."""
        # Find message outputs
        text_parts = []
        all_logprobs = []

        for item in raw.get("output", []):
            if item.get("type") != "message":
                continue
            for content in item.get("content", []):
                if content.get("type") == "output_text":
                    text_parts.append(content.get("text", ""))
                    if "logprobs" in content and content["logprobs"]:
                        all_logprobs.extend(content["logprobs"])

        combined_text = "".join(text_parts)

        choice: Dict[str, Any] = {
            "message": {
                "role": "assistant",
                "content": combined_text,
            },
            "finish_reason": "stop",
        }

        # Include logprobs if present
        if all_logprobs:
            choice["logprobs"] = {"content": all_logprobs}

        result: Dict[str, Any] = {
            "choices": [choice],
            "model": raw.get("model", ""),
        }

        if "usage" in raw:
            result["usage"] = raw["usage"]

        return result

    def chat_completion_stream(
        self,
        model: str,
        messages: List[Dict[str, str]],
        **kwargs: Any,
    ) -> Iterator[str]:
        """Stream chat completion. Yields content chunks."""
        model = _normalize_model_name(model, self.provider)
        with self._client.stream(
            "POST",
            "/chat/completions",
            json={
                "model": model,
                "messages": messages,
                "stream": True,
                **kwargs,
            },
        ) as response:
            for line in response.iter_lines():
                if line.startswith("data: "):
                    data = line[6:]
                    if data != "[DONE]":
                        try:
                            chunk = json.loads(data)
                            if content := chunk["choices"][0]["delta"].get("content"):
                                yield content
                        except (json.JSONDecodeError, KeyError, IndexError):
                            pass

    def get_logprobs(
        self,
        model: str,
        messages: List[Dict[str, str]],
        **kwargs: Any,
    ) -> Dict[str, float]:
        """Get Yes/No log probabilities for normalized scoring.

        Returns dict with "yes" and "no" probability values.
        """
        response = self.chat_completion(
            model=model,
            messages=messages,
            logprobs=True,
            top_logprobs=5,
            max_tokens=512,
            **kwargs,
        )

        try:
            logprobs_data = response["choices"][0].get("logprobs")
            if logprobs_data is None:
                return {"yes": 0.0, "no": 0.0}

            logprobs = logprobs_data["content"][0]["top_logprobs"]
            probs = {
                lp["token"].lower().strip(): math.exp(lp["logprob"])
                for lp in logprobs
            }
            return {
                "yes": probs.get("yes", 0.0),
                "no": probs.get("no", 0.0),
            }
        except (KeyError, IndexError, TypeError):
            return {"yes": 0.0, "no": 0.0}

    def _get_models(self) -> List[Dict[str, Any]]:
        """Get models list (OpenRouter only), using cache if fresh."""
        cache_key = (self.provider, self.base_url)
        now = time.time()

        cached = LLMHTTPClient._models_cache.get(cache_key)
        cached_time = LLMHTTPClient._models_cache_time.get(cache_key, 0)

        if cached is not None and now - cached_time < self._CACHE_TTL:
            return cached

        try:
            response = self._client.get("/models")
            response.raise_for_status()
            models = response.json().get("data", [])
            LLMHTTPClient._models_cache[cache_key] = models
            LLMHTTPClient._models_cache_time[cache_key] = now
            return models
        except Exception:
            return []

    def supports_logprobs(self, model: str) -> bool:
        """Check if a model supports logprobs.

        vLLM and OpenAI always support logprobs.
        OpenRouter queries the /models endpoint.
        """
        if self.provider in ("vllm", "openai"):
            return True

        # OpenRouter: query models endpoint
        models = self._get_models()
        for m in models:
            if m.get("id") == model:
                supported = m.get("supported_parameters", [])
                return "logprobs" in supported
        return False

    async def chat_completion_async(
        self,
        model: str,
        messages: List[Dict[str, str]],
        temperature: float = 0.7,
        max_tokens: int = 2048,
        **kwargs: Any,
    ) -> Dict[str, Any]:
        """Async chat completion request."""
        headers = {
            "Content-Type": "application/json",
            **self._provider_config.default_headers,
        }
        if self.api_key:
            headers["Authorization"] = f"Bearer {self.api_key}"

        model = _normalize_model_name(model, self.provider)

        async with httpx.AsyncClient(
            base_url=self.base_url,
            headers=headers,
            timeout=self.timeout,
        ) as client:
            is_reasoning = _is_reasoning_model(model)
            token_key = "max_completion_tokens" if is_reasoning else "max_tokens"
            body = {
                "model": model,
                "messages": messages,
                token_key: max_tokens,
                **kwargs,
            }
            if not is_reasoning:
                body["temperature"] = temperature
            response = await client.post("/chat/completions", json=body)
            _raise_with_detail(response)
            return response.json()

    async def _batch_completions_async(
        self,
        requests: List[Dict[str, Any]],
        concurrency: int = 5,
        progress_callback: Optional[Callable[[int], None]] = None,
    ) -> List[Dict[str, Any]]:
        """Process multiple requests concurrently (async implementation)."""
        semaphore = asyncio.Semaphore(concurrency)
        results: List[Optional[Dict[str, Any]]] = [None] * len(requests)
        completed = 0

        async def limited_request(idx: int, req: Dict[str, Any]):
            nonlocal completed
            async with semaphore:
                try:
                    result = await self.chat_completion_async(**req)
                    results[idx] = result
                except Exception as e:
                    results[idx] = {"error": str(e)}
                finally:
                    completed += 1
                    if progress_callback:
                        progress_callback(completed)

        await asyncio.gather(*[
            limited_request(i, req) for i, req in enumerate(requests)
        ])

        return results  # type: ignore

    def batch_completions(
        self,
        requests: List[Dict[str, Any]],
        concurrency: int = 5,
        progress_callback: Optional[Callable[[int], None]] = None,
    ) -> List[Dict[str, Any]]:
        """Process multiple requests concurrently (sync wrapper)."""
        try:
            loop = asyncio.get_event_loop()
            if loop.is_running():
                import concurrent.futures
                with concurrent.futures.ThreadPoolExecutor() as executor:
                    future = executor.submit(
                        asyncio.run,
                        self._batch_completions_async(requests, concurrency, progress_callback)
                    )
                    return future.result()
            else:
                return loop.run_until_complete(
                    self._batch_completions_async(requests, concurrency, progress_callback)
                )
        except RuntimeError:
            return asyncio.run(
                self._batch_completions_async(requests, concurrency, progress_callback)
            )

close()

Close the HTTP client.

Source code in autochecklist/providers/http_client.py
def close(self) -> None:
    """Close the HTTP client."""
    self._client.close()

chat_completion(model, messages, temperature=0.7, max_tokens=2048, **kwargs)

Make a chat completion request.

For api_format="responses", translates to/from the Responses API format. Always returns normalized OpenAI Chat Completions format.

Source code in autochecklist/providers/http_client.py
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10),
    retry=retry_if_exception(_is_retryable),
)
def chat_completion(
    self,
    model: str,
    messages: List[Dict[str, str]],
    temperature: float = 0.7,
    max_tokens: int = 2048,
    **kwargs: Any,
) -> Dict[str, Any]:
    """Make a chat completion request.

    For api_format="responses", translates to/from the Responses API format.
    Always returns normalized OpenAI Chat Completions format.
    """
    model = _normalize_model_name(model, self.provider)

    if self.api_format == "responses":
        return self._chat_completion_responses(
            model, messages, temperature, max_tokens, **kwargs
        )

    # Reasoning models require max_completion_tokens and only support temperature=1
    is_reasoning = _is_reasoning_model(model)
    token_key = "max_completion_tokens" if is_reasoning else "max_tokens"
    body = {
        "model": model,
        "messages": messages,
        token_key: max_tokens,
        **kwargs,
    }
    if not is_reasoning:
        body["temperature"] = temperature
    response = self._client.post("/chat/completions", json=body)
    _raise_with_detail(response)
    raw = response.json()
    return self._normalize_response(raw)

chat_completion_stream(model, messages, **kwargs)

Stream chat completion. Yields content chunks.

Source code in autochecklist/providers/http_client.py
def chat_completion_stream(
    self,
    model: str,
    messages: List[Dict[str, str]],
    **kwargs: Any,
) -> Iterator[str]:
    """Stream chat completion. Yields content chunks."""
    model = _normalize_model_name(model, self.provider)
    with self._client.stream(
        "POST",
        "/chat/completions",
        json={
            "model": model,
            "messages": messages,
            "stream": True,
            **kwargs,
        },
    ) as response:
        for line in response.iter_lines():
            if line.startswith("data: "):
                data = line[6:]
                if data != "[DONE]":
                    try:
                        chunk = json.loads(data)
                        if content := chunk["choices"][0]["delta"].get("content"):
                            yield content
                    except (json.JSONDecodeError, KeyError, IndexError):
                        pass

get_logprobs(model, messages, **kwargs)

Get Yes/No log probabilities for normalized scoring.

Returns dict with "yes" and "no" probability values.

Source code in autochecklist/providers/http_client.py
def get_logprobs(
    self,
    model: str,
    messages: List[Dict[str, str]],
    **kwargs: Any,
) -> Dict[str, float]:
    """Get Yes/No log probabilities for normalized scoring.

    Returns dict with "yes" and "no" probability values.
    """
    response = self.chat_completion(
        model=model,
        messages=messages,
        logprobs=True,
        top_logprobs=5,
        max_tokens=512,
        **kwargs,
    )

    try:
        logprobs_data = response["choices"][0].get("logprobs")
        if logprobs_data is None:
            return {"yes": 0.0, "no": 0.0}

        logprobs = logprobs_data["content"][0]["top_logprobs"]
        probs = {
            lp["token"].lower().strip(): math.exp(lp["logprob"])
            for lp in logprobs
        }
        return {
            "yes": probs.get("yes", 0.0),
            "no": probs.get("no", 0.0),
        }
    except (KeyError, IndexError, TypeError):
        return {"yes": 0.0, "no": 0.0}

supports_logprobs(model)

Check if a model supports logprobs.

vLLM and OpenAI always support logprobs. OpenRouter queries the /models endpoint.

Source code in autochecklist/providers/http_client.py
def supports_logprobs(self, model: str) -> bool:
    """Check if a model supports logprobs.

    vLLM and OpenAI always support logprobs.
    OpenRouter queries the /models endpoint.
    """
    if self.provider in ("vllm", "openai"):
        return True

    # OpenRouter: query models endpoint
    models = self._get_models()
    for m in models:
        if m.get("id") == model:
            supported = m.get("supported_parameters", [])
            return "logprobs" in supported
    return False

chat_completion_async(model, messages, temperature=0.7, max_tokens=2048, **kwargs) async

Async chat completion request.

Source code in autochecklist/providers/http_client.py
async def chat_completion_async(
    self,
    model: str,
    messages: List[Dict[str, str]],
    temperature: float = 0.7,
    max_tokens: int = 2048,
    **kwargs: Any,
) -> Dict[str, Any]:
    """Async chat completion request."""
    headers = {
        "Content-Type": "application/json",
        **self._provider_config.default_headers,
    }
    if self.api_key:
        headers["Authorization"] = f"Bearer {self.api_key}"

    model = _normalize_model_name(model, self.provider)

    async with httpx.AsyncClient(
        base_url=self.base_url,
        headers=headers,
        timeout=self.timeout,
    ) as client:
        is_reasoning = _is_reasoning_model(model)
        token_key = "max_completion_tokens" if is_reasoning else "max_tokens"
        body = {
            "model": model,
            "messages": messages,
            token_key: max_tokens,
            **kwargs,
        }
        if not is_reasoning:
            body["temperature"] = temperature
        response = await client.post("/chat/completions", json=body)
        _raise_with_detail(response)
        return response.json()

batch_completions(requests, concurrency=5, progress_callback=None)

Process multiple requests concurrently (sync wrapper).

Source code in autochecklist/providers/http_client.py
def batch_completions(
    self,
    requests: List[Dict[str, Any]],
    concurrency: int = 5,
    progress_callback: Optional[Callable[[int], None]] = None,
) -> List[Dict[str, Any]]:
    """Process multiple requests concurrently (sync wrapper)."""
    try:
        loop = asyncio.get_event_loop()
        if loop.is_running():
            import concurrent.futures
            with concurrent.futures.ThreadPoolExecutor() as executor:
                future = executor.submit(
                    asyncio.run,
                    self._batch_completions_async(requests, concurrency, progress_callback)
                )
                return future.result()
        else:
            return loop.run_until_complete(
                self._batch_completions_async(requests, concurrency, progress_callback)
            )
    except RuntimeError:
        return asyncio.run(
            self._batch_completions_async(requests, concurrency, progress_callback)
        )

VLLMOfflineClient

Offline inference client using vLLM's Python API.

Loads a model once at init and reuses it for all calls. The model parameter in method signatures is ignored — the model is fixed at construction time.

Context manager is a no-op: the model stays loaded. This is critical because existing code does with Client() as client: in tight loops.

Source code in autochecklist/providers/vllm_offline.py
class VLLMOfflineClient:
    """Offline inference client using vLLM's Python API.

    Loads a model once at __init__ and reuses it for all calls.
    The model parameter in method signatures is ignored — the model
    is fixed at construction time.

    Context manager is a no-op: the model stays loaded. This is critical
    because existing code does `with Client() as client:` in tight loops.
    """

    def __init__(
        self,
        model: str,
        tensor_parallel_size: int = 1,
        gpu_memory_utilization: float = 0.9,
        max_model_len: Optional[int] = None,
        **vllm_kwargs: Any,
    ):
        try:
            from vllm import LLM, SamplingParams
        except (ImportError, ModuleNotFoundError):
            raise ImportError(
                "vllm is required for offline inference. "
                "Install with: pip install vllm"
            )

        self._SamplingParams = SamplingParams
        self._model_name = model
        self._llm = LLM(
            model=model,
            tensor_parallel_size=tensor_parallel_size,
            gpu_memory_utilization=gpu_memory_utilization,
            max_model_len=max_model_len,
            **vllm_kwargs,
        )
        self._tokenizer = self._llm.get_tokenizer()

    def _apply_chat_template(self, messages: List[Dict[str, str]]) -> str:
        """Convert chat messages to a prompt string using the model's template."""
        if hasattr(self._tokenizer, "apply_chat_template"):
            return self._tokenizer.apply_chat_template(
                messages, tokenize=False, add_generation_prompt=True
            )
        # Fallback for models without chat templates
        parts = []
        for msg in messages:
            role = msg["role"]
            content = msg["content"]
            if role == "system":
                parts.append(f"System: {content}")
            elif role == "user":
                parts.append(f"User: {content}")
            elif role == "assistant":
                parts.append(f"Assistant: {content}")
        parts.append("Assistant:")
        return "\n\n".join(parts)

    def chat_completion(
        self,
        model: str,
        messages: List[Dict[str, str]],
        temperature: float = 0.7,
        max_tokens: int = 2048,
        **kwargs: Any,
    ) -> Dict[str, Any]:
        """Generate completion, returning OpenAI-format dict.

        The model parameter is ignored — uses the model loaded at init.
        """
        prompt = self._apply_chat_template(messages)

        logprobs_count = kwargs.pop("top_logprobs", None)
        request_logprobs = kwargs.pop("logprobs", False)
        kwargs.pop("reasoning_effort", None)  # Not supported by vLLM

        # Handle response_format → guided decoding
        response_format = kwargs.pop("response_format", None)
        guided_params = self._build_guided_params(response_format)

        sampling_params = self._SamplingParams(
            temperature=temperature,
            max_tokens=max_tokens,
            logprobs=logprobs_count if request_logprobs else None,
            guided_decoding=guided_params,
        )

        outputs = self._llm.generate([prompt], sampling_params)
        output = outputs[0].outputs[0]

        # Build OpenAI-format response
        response: Dict[str, Any] = {
            "choices": [{
                "message": {
                    "role": "assistant",
                    "content": output.text,
                },
                "finish_reason": output.finish_reason,
            }],
            "model": self._model_name,
            "usage": {
                "prompt_tokens": len(outputs[0].prompt_token_ids),
                "completion_tokens": len(output.token_ids),
            },
        }

        # Include logprobs if requested
        if request_logprobs and output.logprobs:
            response["choices"][0]["logprobs"] = self._format_logprobs(
                output.logprobs, logprobs_count or 5
            )

        return response

    @staticmethod
    def _build_guided_params(response_format: Optional[Dict[str, Any]]) -> Any:
        """Convert OpenAI-style response_format to vLLM GuidedDecodingParams.

        Returns None when no response_format is provided.
        """
        if response_format is None:
            return None
        from vllm import GuidedDecodingParams

        json_schema = response_format
        if isinstance(response_format, dict) and "json_schema" in response_format:
            json_schema = response_format["json_schema"].get("schema", response_format)
        return GuidedDecodingParams(json=json_schema)

    def _format_logprobs(
        self,
        vllm_logprobs: List,
        top_n: int,
    ) -> Dict[str, Any]:
        """Convert vLLM logprobs to OpenAI format.

        vLLM returns: List[Dict[int, Logprob]] per token
        OpenAI format: {"content": [{"token": str, "logprob": float, "top_logprobs": [...]}]}
        """
        content = []
        for token_logprobs in vllm_logprobs:
            if token_logprobs is None:
                continue

            sorted_lps = sorted(
                token_logprobs.values(),
                key=lambda lp: lp.logprob,
                reverse=True,
            )[:top_n]

            top_logprobs_list = []
            for lp in sorted_lps:
                token_str = (
                    lp.decoded_token
                    if hasattr(lp, "decoded_token") and lp.decoded_token
                    else str(lp.rank if hasattr(lp, "rank") else "")
                )
                top_logprobs_list.append({
                    "token": token_str,
                    "logprob": lp.logprob,
                })

            if top_logprobs_list:
                content.append({
                    "token": top_logprobs_list[0]["token"],
                    "logprob": top_logprobs_list[0]["logprob"],
                    "top_logprobs": top_logprobs_list,
                })

        return {"content": content}

    def get_logprobs(
        self,
        model: str,
        messages: List[Dict[str, str]],
        **kwargs: Any,
    ) -> Dict[str, float]:
        """Get Yes/No log probabilities."""
        response = self.chat_completion(
            model=model,
            messages=messages,
            logprobs=True,
            top_logprobs=5,
            max_tokens=512,
            **kwargs,
        )

        try:
            logprobs_data = response["choices"][0].get("logprobs")
            if logprobs_data is None:
                return {"yes": 0.0, "no": 0.0}

            logprobs = logprobs_data["content"][0]["top_logprobs"]
            probs = {
                lp["token"].lower().strip(): math.exp(lp["logprob"])
                for lp in logprobs
            }
            return {
                "yes": probs.get("yes", 0.0),
                "no": probs.get("no", 0.0),
            }
        except (KeyError, IndexError, TypeError):
            return {"yes": 0.0, "no": 0.0}

    def supports_logprobs(self, model: str) -> bool:
        """vLLM always supports logprobs."""
        return True

    def batch_completions(
        self,
        requests: List[Dict[str, Any]],
        progress_callback: Optional[Callable[[int], None]] = None,
    ) -> List[Dict[str, Any]]:
        """Process batch using vLLM's native batching."""
        prompts = []
        sampling_params_list = []

        for req in requests:
            msgs = req["messages"]
            prompt = self._apply_chat_template(msgs)
            prompts.append(prompt)
            guided_params = self._build_guided_params(
                req.get("response_format")
            )
            sampling_params_list.append(self._SamplingParams(
                temperature=req.get("temperature", 0.7),
                max_tokens=req.get("max_tokens", 2048),
                guided_decoding=guided_params,
            ))

        all_outputs = self._llm.generate(prompts, sampling_params_list)

        results = []
        for i, output in enumerate(all_outputs):
            results.append({
                "choices": [{
                    "message": {
                        "role": "assistant",
                        "content": output.outputs[0].text,
                    },
                }],
                "model": self._model_name,
            })
            if progress_callback:
                progress_callback(i + 1)

        return results

    def close(self) -> None:
        """No-op — model stays loaded until garbage collection."""
        pass

    def __enter__(self) -> "VLLMOfflineClient":
        return self

    def __exit__(self, *args: Any) -> None:
        # Do NOT unload model on context exit
        pass

chat_completion(model, messages, temperature=0.7, max_tokens=2048, **kwargs)

Generate completion, returning OpenAI-format dict.

The model parameter is ignored — uses the model loaded at init.

Source code in autochecklist/providers/vllm_offline.py
def chat_completion(
    self,
    model: str,
    messages: List[Dict[str, str]],
    temperature: float = 0.7,
    max_tokens: int = 2048,
    **kwargs: Any,
) -> Dict[str, Any]:
    """Generate completion, returning OpenAI-format dict.

    The model parameter is ignored — uses the model loaded at init.
    """
    prompt = self._apply_chat_template(messages)

    logprobs_count = kwargs.pop("top_logprobs", None)
    request_logprobs = kwargs.pop("logprobs", False)
    kwargs.pop("reasoning_effort", None)  # Not supported by vLLM

    # Handle response_format → guided decoding
    response_format = kwargs.pop("response_format", None)
    guided_params = self._build_guided_params(response_format)

    sampling_params = self._SamplingParams(
        temperature=temperature,
        max_tokens=max_tokens,
        logprobs=logprobs_count if request_logprobs else None,
        guided_decoding=guided_params,
    )

    outputs = self._llm.generate([prompt], sampling_params)
    output = outputs[0].outputs[0]

    # Build OpenAI-format response
    response: Dict[str, Any] = {
        "choices": [{
            "message": {
                "role": "assistant",
                "content": output.text,
            },
            "finish_reason": output.finish_reason,
        }],
        "model": self._model_name,
        "usage": {
            "prompt_tokens": len(outputs[0].prompt_token_ids),
            "completion_tokens": len(output.token_ids),
        },
    }

    # Include logprobs if requested
    if request_logprobs and output.logprobs:
        response["choices"][0]["logprobs"] = self._format_logprobs(
            output.logprobs, logprobs_count or 5
        )

    return response

get_logprobs(model, messages, **kwargs)

Get Yes/No log probabilities.

Source code in autochecklist/providers/vllm_offline.py
def get_logprobs(
    self,
    model: str,
    messages: List[Dict[str, str]],
    **kwargs: Any,
) -> Dict[str, float]:
    """Get Yes/No log probabilities."""
    response = self.chat_completion(
        model=model,
        messages=messages,
        logprobs=True,
        top_logprobs=5,
        max_tokens=512,
        **kwargs,
    )

    try:
        logprobs_data = response["choices"][0].get("logprobs")
        if logprobs_data is None:
            return {"yes": 0.0, "no": 0.0}

        logprobs = logprobs_data["content"][0]["top_logprobs"]
        probs = {
            lp["token"].lower().strip(): math.exp(lp["logprob"])
            for lp in logprobs
        }
        return {
            "yes": probs.get("yes", 0.0),
            "no": probs.get("no", 0.0),
        }
    except (KeyError, IndexError, TypeError):
        return {"yes": 0.0, "no": 0.0}

supports_logprobs(model)

vLLM always supports logprobs.

Source code in autochecklist/providers/vllm_offline.py
def supports_logprobs(self, model: str) -> bool:
    """vLLM always supports logprobs."""
    return True

batch_completions(requests, progress_callback=None)

Process batch using vLLM's native batching.

Source code in autochecklist/providers/vllm_offline.py
def batch_completions(
    self,
    requests: List[Dict[str, Any]],
    progress_callback: Optional[Callable[[int], None]] = None,
) -> List[Dict[str, Any]]:
    """Process batch using vLLM's native batching."""
    prompts = []
    sampling_params_list = []

    for req in requests:
        msgs = req["messages"]
        prompt = self._apply_chat_template(msgs)
        prompts.append(prompt)
        guided_params = self._build_guided_params(
            req.get("response_format")
        )
        sampling_params_list.append(self._SamplingParams(
            temperature=req.get("temperature", 0.7),
            max_tokens=req.get("max_tokens", 2048),
            guided_decoding=guided_params,
        ))

    all_outputs = self._llm.generate(prompts, sampling_params_list)

    results = []
    for i, output in enumerate(all_outputs):
        results.append({
            "choices": [{
                "message": {
                    "role": "assistant",
                    "content": output.outputs[0].text,
                },
            }],
            "model": self._model_name,
        })
        if progress_callback:
            progress_callback(i + 1)

    return results

close()

No-op — model stays loaded until garbage collection.

Source code in autochecklist/providers/vllm_offline.py
def close(self) -> None:
    """No-op — model stays loaded until garbage collection."""
    pass

configure(**kwargs)

Update configuration.

Example

configure( openrouter_api_key="sk-...", generator_model=ModelConfig(model_id="anthropic/claude-3-sonnet") )

Source code in autochecklist/config.py
def configure(**kwargs) -> None:
    """Update configuration.

    Example:
        configure(
            openrouter_api_key="sk-...",
            generator_model=ModelConfig(model_id="anthropic/claude-3-sonnet")
        )
    """
    global _config
    current = get_config()
    _config = current.model_copy(update=kwargs)

get_config()

Get current configuration.

Source code in autochecklist/config.py
def get_config() -> AutoChecklistsConfig:
    """Get current configuration."""
    global _config
    if _config is None:
        # Load .env file if present
        load_dotenv()

        _config = AutoChecklistsConfig()
        # Load from environment
        if api_key := os.getenv("OPENROUTER_API_KEY"):
            _config.openrouter_api_key = api_key
    return _config

BatchScorer(**kwargs)

Deprecated: use ChecklistScorer(mode='batch').

Source code in autochecklist/scorers/__init__.py
def BatchScorer(**kwargs):
    """Deprecated: use ``ChecklistScorer(mode='batch')``."""
    warnings.warn(
        "BatchScorer is deprecated, use ChecklistScorer(mode='batch')",
        DeprecationWarning,
        stacklevel=2,
    )
    return ChecklistScorer(mode="batch", **kwargs)

WeightedScorer(**kwargs)

Deprecated: use ChecklistScorer(mode='item', primary_metric='weighted').

Source code in autochecklist/scorers/__init__.py
def WeightedScorer(**kwargs):
    """Deprecated: use ``ChecklistScorer(mode='item', primary_metric='weighted')``."""
    warnings.warn(
        "WeightedScorer is deprecated, use ChecklistScorer(mode='item', primary_metric='weighted')",
        DeprecationWarning,
        stacklevel=2,
    )
    kwargs.setdefault("primary_metric", "weighted")
    return ChecklistScorer(mode="item", **kwargs)

NormalizedScorer(**kwargs)

Deprecated: use ChecklistScorer(mode='item', use_logprobs=True, primary_metric='normalized').

Source code in autochecklist/scorers/__init__.py
def NormalizedScorer(**kwargs):
    """Deprecated: use ``ChecklistScorer(mode='item', use_logprobs=True, primary_metric='normalized')``."""
    warnings.warn(
        "NormalizedScorer is deprecated, use ChecklistScorer(mode='item', use_logprobs=True, primary_metric='normalized')",
        DeprecationWarning,
        stacklevel=2,
    )
    kwargs.setdefault("primary_metric", "normalized")
    kwargs.setdefault("use_logprobs", True)
    return ChecklistScorer(mode="item", **kwargs)

ItemScorer(**kwargs)

Deprecated: use ChecklistScorer(mode='item', capture_reasoning=True).

Source code in autochecklist/scorers/__init__.py
def ItemScorer(**kwargs):
    """Deprecated: use ``ChecklistScorer(mode='item', capture_reasoning=True)``."""
    warnings.warn(
        "ItemScorer is deprecated, use ChecklistScorer(mode='item', capture_reasoning=True)",
        DeprecationWarning,
        stacklevel=2,
    )
    kwargs.setdefault("capture_reasoning", True)
    return ChecklistScorer(mode="item", **kwargs)

get_client(provider='openrouter', base_url=None, api_key=None, model=None, api_format=None, **kwargs)

Create an LLM client for the given provider.

Parameters:

Name Type Description Default
provider str

Provider name ("openrouter", "openai", "vllm")

'openrouter'
base_url Optional[str]

Override the default base URL. For vLLM, None means offline mode.

None
api_key Optional[str]

API key (resolved from env if not provided)

None
model Optional[str]

Model name (required for vLLM offline mode)

None
api_format Optional[str]

API format ("chat" or "responses"). Defaults to "responses" for OpenAI, "chat" for other providers.

None
**kwargs Any

Additional kwargs passed to the client constructor

{}

Returns:

Type Description
LLMClient

An LLMClient instance

Raises:

Type Description
ValueError

If provider is unknown or vLLM offline mode missing model

Source code in autochecklist/providers/factory.py
def get_client(
    provider: str = "openrouter",
    base_url: Optional[str] = None,
    api_key: Optional[str] = None,
    model: Optional[str] = None,
    api_format: Optional[str] = None,
    **kwargs: Any,
) -> LLMClient:
    """Create an LLM client for the given provider.

    Args:
        provider: Provider name ("openrouter", "openai", "vllm")
        base_url: Override the default base URL. For vLLM, None means offline mode.
        api_key: API key (resolved from env if not provided)
        model: Model name (required for vLLM offline mode)
        api_format: API format ("chat" or "responses"). Defaults to "responses"
            for OpenAI, "chat" for other providers.
        **kwargs: Additional kwargs passed to the client constructor

    Returns:
        An LLMClient instance

    Raises:
        ValueError: If provider is unknown or vLLM offline mode missing model
    """
    if provider == "vllm" and base_url is None:
        # Offline mode — direct Python inference
        from .vllm_offline import VLLMOfflineClient

        if model is None:
            raise ValueError(
                "model is required for vLLM offline mode. "
                "Pass model='your-model-name' or set base_url for server mode."
            )
        return VLLMOfflineClient(model=model, **kwargs)

    # HTTP mode (OpenRouter, OpenAI, vLLM server)
    from .http_client import LLMHTTPClient

    # Default to Responses API for OpenAI (handles reasoning models natively)
    if api_format is None:
        api_format = "responses" if provider == "openai" else "chat"

    return LLMHTTPClient(
        provider=provider,
        api_key=api_key,
        base_url=base_url,
        api_format=api_format,
        **kwargs,
    )

list_generators()

List all registered generator names.

Source code in autochecklist/registry.py
def list_generators() -> List[str]:
    """List all registered generator names."""
    return list(_generators.keys())

list_scorers()

List all registered scorer names.

Source code in autochecklist/registry.py
def list_scorers() -> List[str]:
    """List all registered scorer names."""
    return list(_scorers.keys())

list_refiners()

List all registered refiner names.

Source code in autochecklist/registry.py
def list_refiners() -> List[str]:
    """List all registered refiner names."""
    return list(_refiners.keys())

get_generator(name)

Get generator class by name.

Source code in autochecklist/registry.py
def get_generator(name: str) -> Type:
    """Get generator class by name."""
    if name not in _generators:
        raise KeyError(f"Unknown generator: {name}. Available: {list_generators()}")
    return _generators[name]

get_scorer(name)

Get scorer class by name.

Source code in autochecklist/registry.py
def get_scorer(name: str) -> Type:
    """Get scorer class by name."""
    if name not in _scorers:
        raise KeyError(f"Unknown scorer: {name}. Available: {list_scorers()}")
    return _scorers[name]

get_refiner(name)

Get refiner class by name.

Source code in autochecklist/registry.py
def get_refiner(name: str) -> Type:
    """Get refiner class by name."""
    if name not in _refiners:
        raise KeyError(f"Unknown refiner: {name}. Available: {list_refiners()}")
    return _refiners[name]

register_generator(name)

Decorator to register a generator class.

Source code in autochecklist/registry.py
def register_generator(name: str):
    """Decorator to register a generator class."""
    def decorator(cls: Type) -> Type:
        _generators[name] = cls
        return cls
    return decorator

register_scorer(name)

Decorator to register a scorer class.

Source code in autochecklist/registry.py
def register_scorer(name: str):
    """Decorator to register a scorer class."""
    def decorator(cls: Type) -> Type:
        _scorers[name] = cls
        return cls
    return decorator

register_refiner(name)

Decorator to register a refiner class.

Source code in autochecklist/registry.py
def register_refiner(name: str):
    """Decorator to register a refiner class."""
    def decorator(cls: Type) -> Type:
        _refiners[name] = cls
        return cls
    return decorator

register_custom_generator(name, prompt_path)

Register a custom generator from a .md prompt file.

Once registered, the generator can be used by name in pipeline() or get_generator().

Parameters:

Name Type Description Default
name str

Name to register the generator under

required
prompt_path str

Path to .md file containing the prompt template

required
Example

register_custom_generator("my_eval", "prompts/my_eval.md") pipe = pipeline("my_eval")

Source code in autochecklist/registry.py
def register_custom_generator(name: str, prompt_path: str) -> None:
    """Register a custom generator from a .md prompt file.

    Once registered, the generator can be used by name in pipeline() or get_generator().

    Args:
        name: Name to register the generator under
        prompt_path: Path to .md file containing the prompt template

    Example:
        >>> register_custom_generator("my_eval", "prompts/my_eval.md")
        >>> pipe = pipeline("my_eval")
    """
    from .generators.instance_level.direct import DirectGenerator

    # Read file content and pass as custom_prompt text
    prompt_text = Path(prompt_path).read_text(encoding="utf-8")

    class _CustomFactory(DirectGenerator):
        def __init__(self, **kwargs):
            super().__init__(custom_prompt=prompt_text, method_name=name, **kwargs)

    _CustomFactory.__name__ = f"DirectGenerator({name})"
    _CustomFactory.__doc__ = f"Custom generator '{name}' loaded from {prompt_path}"
    _generators[name] = _CustomFactory

register_custom_scorer(name, prompt_path, mode='batch', primary_metric='pass', capture_reasoning=False)

Register a custom scorer from a .md prompt file.

Creates a ChecklistScorer with the custom prompt. Once registered, the scorer can be used by name in pipeline() or get_scorer().

When primary_metric="normalized", logprobs are automatically enabled (logprobs are required for confidence-calibrated scoring).

Parameters:

Name Type Description Default
name str

Name to register the scorer under

required
prompt_path str

Path to .md file containing the scoring prompt template

required
mode str

Scoring mode — "batch" (all items in one call) or "item" (one item per call). Default: "batch".

'batch'
primary_metric str

Which metric Score.primary_score aliases — "pass", "weighted", or "normalized". Default: "pass". Setting "normalized" auto-enables logprobs.

'pass'
capture_reasoning bool

Include per-item reasoning in output.

False
Example

register_custom_scorer("strict", "prompts/strict_scorer.md") pipe = pipeline("tick", scorer="strict")

register_custom_scorer( ... "weighted_strict", "prompts/strict_scorer.md", ... mode="item", primary_metric="weighted", ... )

Source code in autochecklist/registry.py
def register_custom_scorer(
    name: str,
    prompt_path: str,
    mode: str = "batch",
    primary_metric: str = "pass",
    capture_reasoning: bool = False,
) -> None:
    """Register a custom scorer from a .md prompt file.

    Creates a ChecklistScorer with the custom prompt. Once registered,
    the scorer can be used by name in pipeline() or get_scorer().

    When ``primary_metric="normalized"``, logprobs are automatically enabled
    (logprobs are required for confidence-calibrated scoring).

    Args:
        name: Name to register the scorer under
        prompt_path: Path to .md file containing the scoring prompt template
        mode: Scoring mode — "batch" (all items in one call) or "item"
            (one item per call). Default: "batch".
        primary_metric: Which metric ``Score.primary_score`` aliases —
            "pass", "weighted", or "normalized". Default: "pass".
            Setting "normalized" auto-enables logprobs.
        capture_reasoning: Include per-item reasoning in output.

    Example:
        >>> register_custom_scorer("strict", "prompts/strict_scorer.md")
        >>> pipe = pipeline("tick", scorer="strict")

        >>> register_custom_scorer(
        ...     "weighted_strict", "prompts/strict_scorer.md",
        ...     mode="item", primary_metric="weighted",
        ... )
    """
    custom_prompt = Path(prompt_path).read_text(encoding="utf-8")
    _mode = mode
    _primary_metric = primary_metric
    _use_logprobs = primary_metric == "normalized"
    _capture_reasoning = capture_reasoning

    def _factory(**kwargs):
        from .scorers import ChecklistScorer
        kwargs.setdefault("custom_prompt", custom_prompt)
        return ChecklistScorer(
            mode=_mode,
            primary_metric=_primary_metric,
            use_logprobs=_use_logprobs,
            capture_reasoning=_capture_reasoning,
            **kwargs,
        )

    _factory.__name__ = f"CustomScorer({name})"
    _factory.__doc__ = f"Custom scorer '{name}' loaded from {prompt_path}"
    _scorers[name] = _factory

register_custom_pipeline(name, pipeline=None, generator_prompt=None, generator_class='direct', scorer=None, scorer_mode=None, scorer_prompt=None, primary_metric=None, capture_reasoning=None, force=False)

Register a custom pipeline as a reusable preset.

Can register from either an instantiated pipeline or from config values. Once registered, the pipeline can be used by name: pipeline("my_eval", generator_model="openai/gpt-4o")

When primary_metric="normalized", logprobs are automatically enabled (logprobs are required for confidence-calibrated scoring).

Parameters:

Name Type Description Default
name str

Name to register the pipeline under.

required
pipeline Optional[Any]

An instantiated ChecklistPipeline to extract config from. Mutually exclusive with generator_prompt.

None
generator_prompt Optional[Union[str, Path]]

Custom generator prompt text, or Path to a prompt file. Mutually exclusive with pipeline.

None
generator_class str

Generator class to use ("direct" or "contrastive"). Only used with generator_prompt. Default: "direct".

'direct'
scorer Optional[str]

Deprecated scorer name (e.g., "batch", "weighted"). Use scorer_mode and primary_metric instead.

None
scorer_mode Optional[str]

Scoring mode — "batch" or "item". None means no default scorer is attached.

None
scorer_prompt Optional[Union[str, Path]]

Custom scorer prompt text, built-in name ("rlcf", "rocketeval"), or Path to a prompt file. None means use the default prompt for the mode.

None
primary_metric Optional[str]

Which metric Score.primary_score aliases — "pass" (default), "weighted", or "normalized". Setting "normalized" auto-enables logprobs.

None
capture_reasoning Optional[bool]

Include per-item reasoning in output.

None
force bool

If True, allow overriding built-in pipelines (with a warning).

False

Raises:

Type Description
ValueError

If overriding a built-in name without force=True, if neither pipeline nor generator_prompt is provided, or if both scorer and scorer_mode are provided.

Example

From config with scorer settings

register_custom_pipeline( ... "my_eval", ... generator_prompt="Generate yes/no questions for:\n\n{input}", ... scorer_mode="item", ... primary_metric="weighted", ... ) pipe = pipeline("my_eval", generator_model="openai/gpt-4o-mini")

From instantiated pipeline

pipe = ChecklistPipeline( ... generator=DirectGenerator(custom_prompt="...", model="gpt-4o-mini"), ... scorer=ChecklistScorer(mode="item", primary_metric="weighted", ... model="gpt-4o-mini"), ... ) register_custom_pipeline("my_eval", pipe)

Source code in autochecklist/registry.py
def register_custom_pipeline(
    name: str,
    pipeline: Optional[Any] = None,
    generator_prompt: Optional[Union[str, Path]] = None,
    generator_class: str = "direct",
    scorer: Optional[str] = None,
    scorer_mode: Optional[str] = None,
    scorer_prompt: Optional[Union[str, Path]] = None,
    primary_metric: Optional[str] = None,
    capture_reasoning: Optional[bool] = None,
    force: bool = False,
) -> None:
    """Register a custom pipeline as a reusable preset.

    Can register from either an instantiated pipeline or from config values.
    Once registered, the pipeline can be used by name:
        ``pipeline("my_eval", generator_model="openai/gpt-4o")``

    When ``primary_metric="normalized"``, logprobs are automatically enabled
    (logprobs are required for confidence-calibrated scoring).

    Args:
        name: Name to register the pipeline under.
        pipeline: An instantiated ChecklistPipeline to extract config from.
            Mutually exclusive with generator_prompt.
        generator_prompt: Custom generator prompt text, or Path to a prompt file.
            Mutually exclusive with pipeline.
        generator_class: Generator class to use ("direct" or "contrastive").
            Only used with generator_prompt. Default: "direct".
        scorer: Deprecated scorer name (e.g., "batch", "weighted"). Use
            ``scorer_mode`` and ``primary_metric`` instead.
        scorer_mode: Scoring mode — "batch" or "item". None means no
            default scorer is attached.
        scorer_prompt: Custom scorer prompt text, built-in name ("rlcf",
            "rocketeval"), or Path to a prompt file. None means use the
            default prompt for the mode.
        primary_metric: Which metric ``Score.primary_score`` aliases —
            "pass" (default), "weighted", or "normalized".
            Setting "normalized" auto-enables logprobs.
        capture_reasoning: Include per-item reasoning in output.
        force: If True, allow overriding built-in pipelines (with a warning).

    Raises:
        ValueError: If overriding a built-in name without force=True, if
            neither pipeline nor generator_prompt is provided, or if both
            ``scorer`` and ``scorer_mode`` are provided.

    Example:
        >>> # From config with scorer settings
        >>> register_custom_pipeline(
        ...     "my_eval",
        ...     generator_prompt="Generate yes/no questions for:\\n\\n{input}",
        ...     scorer_mode="item",
        ...     primary_metric="weighted",
        ... )
        >>> pipe = pipeline("my_eval", generator_model="openai/gpt-4o-mini")

        >>> # From instantiated pipeline
        >>> pipe = ChecklistPipeline(
        ...     generator=DirectGenerator(custom_prompt="...", model="gpt-4o-mini"),
        ...     scorer=ChecklistScorer(mode="item", primary_metric="weighted",
        ...                            model="gpt-4o-mini"),
        ... )
        >>> register_custom_pipeline("my_eval", pipe)
    """
    from .generators.instance_level.direct import DirectGenerator
    from .generators.instance_level.contrastive import ContrastiveGenerator

    # Validate: scorer (legacy name) and scorer_mode are mutually exclusive
    if scorer is not None and scorer_mode is not None:
        raise ValueError(
            "Cannot specify both 'scorer' (legacy name) and 'scorer_mode'. "
            "Use scorer_mode with primary_metric/capture_reasoning instead."
        )

    # Override protection
    if name in _builtin_generators and not force:
        raise ValueError(
            f"Cannot override built-in pipeline '{name}'. "
            f"Use force=True to override."
        )
    if name in _builtin_generators and force:
        warnings.warn(
            f"Overriding built-in pipeline '{name}'.",
            UserWarning,
            stacklevel=2,
        )

    # Extract config from pipeline instance or direct args
    if pipeline is not None and generator_prompt is not None:
        raise ValueError(
            "Provide either 'pipeline' or 'generator_prompt', not both."
        )

    scorer_config = None  # Will hold the config dict for DEFAULT_SCORERS

    if pipeline is not None:
        # Extract from instantiated pipeline
        gen = pipeline.generator
        gen_prompt_text = gen.prompt_text

        # Detect generator class
        if isinstance(gen, ContrastiveGenerator):
            gen_class_name = "contrastive"
        else:
            gen_class_name = "direct"

        # Extract full scorer config from pipeline instance
        if pipeline.scorer is not None:
            s = pipeline.scorer
            scorer_config = {
                "mode": s.mode,
                "primary_metric": s.primary_metric,
            }
            if s.use_logprobs:
                scorer_config["use_logprobs"] = True
            if s.capture_reasoning:
                scorer_config["capture_reasoning"] = True

            # Check if scorer has a custom prompt (differs from default)
            scorer_prompt_text = s.prompt_text
            try:
                from .scorers import ChecklistScorer as _SC
                default_instance = _SC(mode=s.mode)
                if default_instance.prompt_text != scorer_prompt_text:
                    scorer_config["scorer_prompt"] = scorer_prompt_text
            except Exception:
                scorer_config["scorer_prompt"] = scorer_prompt_text

    elif generator_prompt is not None:
        # Read from path if needed
        if isinstance(generator_prompt, Path):
            gen_prompt_text = generator_prompt.read_text(encoding="utf-8")
        else:
            gen_prompt_text = generator_prompt
        gen_class_name = generator_class

        # Build scorer config from flat kwargs or legacy scorer name
        if scorer_mode is not None or primary_metric is not None or \
           capture_reasoning is not None:
            scorer_config = {}
            if scorer_mode is not None:
                scorer_config["mode"] = scorer_mode
            if primary_metric is not None:
                scorer_config["primary_metric"] = primary_metric
                # Auto-enable logprobs for normalized metric
                if primary_metric == "normalized":
                    scorer_config["use_logprobs"] = True
            if capture_reasoning is not None and capture_reasoning:
                scorer_config["capture_reasoning"] = True
            # Handle scorer_prompt
            if scorer_prompt is not None:
                scorer_config["scorer_prompt"] = _read_scorer_prompt(scorer_prompt)
        elif scorer is not None:
            # Legacy path: map old scorer name string to config dict
            warnings.warn(
                "The 'scorer' parameter is deprecated. Use 'scorer_mode' and "
                "'primary_metric' instead (e.g., scorer_mode='item', "
                "primary_metric='weighted').",
                DeprecationWarning,
                stacklevel=2,
            )
            scorer_config = _scorer_name_to_config(scorer)
            # Add scorer_prompt if provided
            if scorer_prompt is not None:
                scorer_config["scorer_prompt"] = _read_scorer_prompt(scorer_prompt)
        elif scorer_prompt is not None:
            # scorer_prompt alone (no mode/metric kwargs) — default batch mode
            scorer_config = {"scorer_prompt": _read_scorer_prompt(scorer_prompt)}
    else:
        raise ValueError(
            "Must provide either 'pipeline' (ChecklistPipeline instance) "
            "or 'generator_prompt' (str or Path)."
        )

    # Create generator factory
    gen_cls_map = {
        "direct": DirectGenerator,
        "contrastive": ContrastiveGenerator,
    }
    gen_cls = gen_cls_map.get(gen_class_name)
    if gen_cls is None:
        raise ValueError(
            f"Unknown generator_class '{gen_class_name}'. "
            f"Available: {list(gen_cls_map.keys())}"
        )

    # Capture in closure
    _prompt = gen_prompt_text

    class _CustomFactory(gen_cls):
        def __init__(self, **kwargs):
            super().__init__(custom_prompt=_prompt, method_name=name, **kwargs)

    _CustomFactory.__name__ = f"{gen_cls.__name__}({name})"
    _CustomFactory.__doc__ = f"Custom pipeline '{name}'"
    _generators[name] = _CustomFactory

    # Store scorer config in DEFAULT_SCORERS
    if scorer_config is not None:
        from .pipeline import DEFAULT_SCORERS
        DEFAULT_SCORERS[name] = scorer_config

save_pipeline_config(name, path)

Export a registered pipeline's config to JSON.

The saved JSON uses flat scorer config keys (scorer_mode, primary_metric, capture_reasoning, scorer_prompt) extracted from the DEFAULT_SCORERS entry.

Logprobs are auto-derived from primary_metric="normalized" and not stored separately in the config.

In the output JSON:

  • scorer_mode: null if no default scorer is configured.
  • scorer_prompt: null means use the default prompt for the mode.
  • primary_metric: null defaults to "pass" when loaded.

Parameters:

Name Type Description Default
name str

Name of a registered pipeline.

required
path Union[str, Path]

Path to write the JSON config file.

required

Raises:

Type Description
KeyError

If the pipeline name is not registered.

Source code in autochecklist/registry.py
def save_pipeline_config(name: str, path: Union[str, Path]) -> None:
    """Export a registered pipeline's config to JSON.

    The saved JSON uses flat scorer config keys (``scorer_mode``,
    ``primary_metric``, ``capture_reasoning``, ``scorer_prompt``)
    extracted from the ``DEFAULT_SCORERS`` entry.

    Logprobs are auto-derived from ``primary_metric="normalized"``
    and not stored separately in the config.

    In the output JSON:

    - ``scorer_mode``: ``null`` if no default scorer is configured.
    - ``scorer_prompt``: ``null`` means use the default prompt for the mode.
    - ``primary_metric``: ``null`` defaults to ``"pass"`` when loaded.

    Args:
        name: Name of a registered pipeline.
        path: Path to write the JSON config file.

    Raises:
        KeyError: If the pipeline name is not registered.
    """
    from .generators.instance_level.contrastive import ContrastiveGenerator
    from .pipeline import DEFAULT_SCORERS

    gen_cls = get_generator(name)
    # Instantiate to extract prompt text and class type
    try:
        gen_instance = gen_cls()
    except Exception:
        raise ValueError(
            f"Cannot save config for '{name}': generator requires arguments."
        )

    if isinstance(gen_instance, ContrastiveGenerator):
        gen_class_name = "contrastive"
    else:
        gen_class_name = "direct"

    config = {
        "name": name,
        "generator_class": gen_class_name,
        "generator_prompt": gen_instance.prompt_text,
    }

    # Extract scorer config from DEFAULT_SCORERS
    # use_logprobs is omitted — it's auto-derived from primary_metric="normalized"
    scorer_entry = DEFAULT_SCORERS.get(name)
    if isinstance(scorer_entry, dict):
        config["scorer_mode"] = scorer_entry.get("mode")
        config["scorer_prompt"] = scorer_entry.get("scorer_prompt")
        config["primary_metric"] = scorer_entry.get("primary_metric")
        config["capture_reasoning"] = scorer_entry.get("capture_reasoning", False)
    elif isinstance(scorer_entry, str):
        # Legacy: stored as a name string — convert to flat keys
        legacy_config = _scorer_name_to_config(scorer_entry)
        config["scorer_mode"] = legacy_config.get("mode")
        config["scorer_prompt"] = None
        config["primary_metric"] = legacy_config.get("primary_metric")
        config["capture_reasoning"] = legacy_config.get("capture_reasoning", False)
    else:
        # No scorer configured
        config["scorer_mode"] = None
        config["scorer_prompt"] = None
        config["primary_metric"] = None
        config["capture_reasoning"] = False

    path = Path(path)
    path.write_text(json.dumps(config, indent=2), encoding="utf-8")

load_pipeline_config(path, force=False)

Load and register a pipeline from a JSON config file.

Supports both the new format (flat scorer config keys: scorer_mode, primary_metric, etc.) and the old format (scorer name string + scorer_prompt text).

Parameters:

Name Type Description Default
path Union[str, Path]

Path to the JSON config file.

required
force bool

If True, allow overriding built-in pipelines.

False

Returns:

Type Description
str

The pipeline name (for use with pipeline(name)).

Source code in autochecklist/registry.py
def load_pipeline_config(path: Union[str, Path], force: bool = False) -> str:
    """Load and register a pipeline from a JSON config file.

    Supports both the new format (flat scorer config keys: ``scorer_mode``,
    ``primary_metric``, etc.) and the old format (``scorer`` name string +
    ``scorer_prompt`` text).

    Args:
        path: Path to the JSON config file.
        force: If True, allow overriding built-in pipelines.

    Returns:
        The pipeline name (for use with ``pipeline(name)``).
    """
    path = Path(path)
    config = json.loads(path.read_text(encoding="utf-8"))

    name = config["name"]

    # Detect format: new style has "scorer_mode", old style has "scorer"
    if "scorer_mode" in config:
        # New format — flat scorer config keys
        # Note: use_logprobs is silently ignored if present in old JSON files
        # (it's auto-derived from primary_metric="normalized")
        register_custom_pipeline(
            name=name,
            generator_prompt=config.get("generator_prompt"),
            generator_class=config.get("generator_class", "direct"),
            scorer_mode=config.get("scorer_mode"),
            scorer_prompt=config.get("scorer_prompt"),
            primary_metric=config.get("primary_metric"),
            capture_reasoning=config.get("capture_reasoning"),
            force=force,
        )
    else:
        # Old format — backward compatibility
        register_custom_pipeline(
            name=name,
            generator_prompt=config.get("generator_prompt"),
            generator_class=config.get("generator_class", "direct"),
            scorer=config.get("scorer"),
            scorer_prompt=config.get("scorer_prompt"),
            force=force,
        )
    return name

list_generators_with_info()

List all generators with metadata (for UI dropdowns).

Source code in autochecklist/registry.py
def list_generators_with_info() -> List[Dict[str, Any]]:
    """List all generators with metadata (for UI dropdowns)."""
    return [get_generator_info(name) for name in list_generators()]

list_scorers_with_info()

List all scorers with metadata (for UI dropdowns).

Source code in autochecklist/registry.py
def list_scorers_with_info() -> List[Dict[str, Any]]:
    """List all scorers with metadata (for UI dropdowns)."""
    return [get_scorer_info(name) for name in list_scorers()]

list_refiners_with_info()

List all refiners with metadata (for UI dropdowns).

Source code in autochecklist/registry.py
def list_refiners_with_info() -> List[Dict[str, Any]]:
    """List all refiners with metadata (for UI dropdowns)."""
    return [get_refiner_info(name) for name in list_refiners()]