Skip to content

refiners

refiners

Checklist refiners for improving generated checklists.

Refiners are optional building blocks that can be used to improve corpus-level checklists through deduplication, filtering, and optimization.

ChecklistRefiner

Bases: ABC

Base class for all checklist refiners.

Refiners take a checklist and improve it through various operations: - Deduplication (merge similar questions) - Filtering (remove low-quality questions) - Selection (choose optimal subset) - Testing (validate discriminativeness)

Source code in autochecklist/refiners/base.py
class ChecklistRefiner(ABC):
    """Base class for all checklist refiners.

    Refiners take a checklist and improve it through various operations:
    - Deduplication (merge similar questions)
    - Filtering (remove low-quality questions)
    - Selection (choose optimal subset)
    - Testing (validate discriminativeness)
    """

    def __init__(
        self,
        model: Optional[str] = None,
        temperature: Optional[float] = None,
        api_key: Optional[str] = None,
        provider: Optional[str] = None,
        base_url: Optional[str] = None,
        client: Any = None,
        api_format: Optional[str] = None,
        reasoning_effort: Optional[str] = None,
    ):
        config = get_config()
        self.model = model or config.generator_model.model_id
        self.temperature = temperature if temperature is not None else config.generator_model.temperature
        self.api_key = api_key
        self._client = client
        self._provider = provider or config.generator_model.provider or "openrouter"
        self._base_url = base_url
        self._api_format = api_format or "chat"
        self.reasoning_effort = reasoning_effort

    @property
    @abstractmethod
    def refiner_name(self) -> str:
        """Return the refiner name (e.g., 'deduplicator', 'tagger')."""
        pass

    @abstractmethod
    def refine(self, checklist: Checklist, **kwargs: Any) -> Checklist:
        """Refine the checklist.

        Args:
            checklist: Input checklist to refine
            **kwargs: Refiner-specific arguments

        Returns:
            Refined checklist
        """
        pass

    def _get_or_create_client(self) -> Any:
        """Get injected client or create one from provider settings."""
        if self._client is not None:
            return self._client
        from ..providers.factory import get_client
        return get_client(
            provider=self._provider,
            api_key=self.api_key,
            base_url=self._base_url,
            model=self.model,
            api_format=self._api_format,
        )

    def _call_model(
        self,
        prompt: str,
        system_prompt: Optional[str] = None,
        response_format: Optional[Dict] = None,
    ) -> str:
        """Call the LLM and return the response text."""
        messages: List[Dict[str, str]] = []
        if system_prompt:
            messages.append({"role": "system", "content": system_prompt})
        messages.append({"role": "user", "content": prompt})

        client = self._get_or_create_client()
        kwargs: Dict[str, Any] = {
            "model": self.model,
            "messages": messages,
            "temperature": self.temperature,
            "max_tokens": 2048,
        }
        if response_format:
            kwargs["response_format"] = response_format
        if self.reasoning_effort is not None:
            kwargs["reasoning_effort"] = self.reasoning_effort

        response = client.chat_completion(**kwargs)
        return response["choices"][0]["message"]["content"]

    def _create_refined_checklist(
        self,
        original: Checklist,
        items: List[ChecklistItem],
        metadata_updates: Optional[Dict[str, Any]] = None,
    ) -> Checklist:
        """Create a new checklist with refined items.

        Args:
            original: Original checklist to base metadata on
            items: New refined items
            metadata_updates: Additional metadata to add

        Returns:
            New Checklist instance
        """
        metadata = dict(original.metadata) if original.metadata else {}
        metadata["refined_by"] = self.refiner_name
        metadata["original_count"] = len(original.items)
        if metadata_updates:
            metadata.update(metadata_updates)

        return Checklist(
            items=items,
            source_method=original.source_method,
            generation_level=original.generation_level,
            input=original.input,
            metadata=metadata,
        )

refiner_name abstractmethod property

Return the refiner name (e.g., 'deduplicator', 'tagger').

refine(checklist, **kwargs) abstractmethod

Refine the checklist.

Parameters:

Name Type Description Default
checklist Checklist

Input checklist to refine

required
**kwargs Any

Refiner-specific arguments

{}

Returns:

Type Description
Checklist

Refined checklist

Source code in autochecklist/refiners/base.py
@abstractmethod
def refine(self, checklist: Checklist, **kwargs: Any) -> Checklist:
    """Refine the checklist.

    Args:
        checklist: Input checklist to refine
        **kwargs: Refiner-specific arguments

    Returns:
        Refined checklist
    """
    pass

Deduplicator

Bases: ChecklistRefiner

Refiner that merges semantically similar checklist questions.

Pipeline: 1. Compute embeddings for all questions 2. Build similarity graph (edge if cosine >= threshold) 3. Find connected components (clusters) 4. Keep isolated nodes (unique questions) as-is 5. Use LLM to merge multi-node clusters into single questions

Source code in autochecklist/refiners/deduplicator.py
class Deduplicator(ChecklistRefiner):
    """Refiner that merges semantically similar checklist questions.

    Pipeline:
    1. Compute embeddings for all questions
    2. Build similarity graph (edge if cosine >= threshold)
    3. Find connected components (clusters)
    4. Keep isolated nodes (unique questions) as-is
    5. Use LLM to merge multi-node clusters into single questions
    """

    def __init__(
        self,
        similarity_threshold: float = 0.85,
        model: Optional[str] = None,
        temperature: Optional[float] = None,
        api_key: Optional[str] = None,
        embedding_api_key: Optional[str] = None,
        custom_prompt: Optional[Union[str, Path]] = None,
        **kwargs,
    ):
        super().__init__(model=model, temperature=temperature, api_key=api_key, **kwargs)
        self.similarity_threshold = similarity_threshold
        self.embedding_api_key = embedding_api_key
        # Load merge prompt template
        if custom_prompt is not None:
            if isinstance(custom_prompt, Path):
                template_str = custom_prompt.read_text(encoding="utf-8")
            else:
                template_str = custom_prompt
        else:
            template_str = load_template("generators/feedback", "merge")
        self._merge_template = PromptTemplate(template_str)

    @property
    def refiner_name(self) -> str:
        return "deduplicator"

    def refine(self, checklist: Checklist, **kwargs: Any) -> Checklist:
        """Deduplicate the checklist by merging similar questions.

        Args:
            checklist: Input checklist to deduplicate

        Returns:
            Checklist with similar questions merged
        """
        if len(checklist.items) <= 1:
            return self._create_refined_checklist(
                checklist,
                list(checklist.items),
                metadata_updates={"clusters_merged": 0},
            )

        # Get questions and compute embeddings
        questions = [item.question for item in checklist.items]
        embeddings = get_embeddings(questions, api_key=self.embedding_api_key)

        # Build similarity graph
        graph, clusters = self._build_similarity_graph(
            checklist.items, embeddings
        )

        # Process clusters
        refined_items: List[ChecklistItem] = []
        clusters_merged = 0

        for cluster_ids in clusters:
            cluster_items = [
                item for item in checklist.items if item.id in cluster_ids
            ]

            if len(cluster_items) == 1:
                # Isolated node - keep as-is
                refined_items.append(cluster_items[0])
            else:
                # Multi-node cluster - merge with LLM
                merged_item = self._merge_cluster(cluster_items)
                refined_items.append(merged_item)
                clusters_merged += 1

        return self._create_refined_checklist(
            checklist,
            refined_items,
            metadata_updates={
                "clusters_merged": clusters_merged,
                "similarity_threshold": self.similarity_threshold,
            },
        )

    def _build_similarity_graph(
        self,
        items: List[ChecklistItem],
        embeddings,
    ) -> Tuple[nx.Graph, List[Set[str]]]:
        """Build similarity graph and find connected components.

        Args:
            items: Checklist items
            embeddings: Numpy array of embeddings

        Returns:
            Tuple of (graph, list of component sets)
        """
        try:
            import networkx as nx
        except ImportError:
            raise ImportError(
                "networkx is required for checklist deduplication. "
                "Install with: pip install 'autochecklist[ml]'"
            ) from None

        G = nx.Graph()

        # Add all items as nodes
        for item in items:
            G.add_node(item.id)

        # Compute similarity matrix and add edges
        similarity_matrix = cosine_similarity(embeddings)

        for i in range(len(items)):
            for j in range(i + 1, len(items)):
                if similarity_matrix[i, j] >= self.similarity_threshold:
                    G.add_edge(items[i].id, items[j].id)

        # Find connected components
        components = list(nx.connected_components(G))

        return G, components

    def _merge_cluster(self, items: List[ChecklistItem]) -> ChecklistItem:
        """Merge a cluster of similar questions into one.

        Args:
            items: List of similar checklist items

        Returns:
            Single merged ChecklistItem
        """
        # Format questions for the prompt
        questions_text = "\n".join(f"- {item.question}" for item in items)

        # Call LLM to merge
        prompt = self._merge_template.format(questions=questions_text)

        response_format = {
            "type": "json_schema",
            "json_schema": {
                "name": "merged_question",
                "strict": True,
                "schema": {
                    "type": "object",
                    "properties": {
                        "question": {"type": "string"}
                    },
                    "required": ["question"],
                    "additionalProperties": False,
                },
            },
        }

        response = self._call_model(prompt, response_format=response_format)

        # Parse response
        try:
            result = json.loads(response)
            merged_question = result["question"]
        except (json.JSONDecodeError, KeyError):
            # Fallback: try to extract question from response
            merged_question = response.strip()
            if merged_question.startswith('"') and merged_question.endswith('"'):
                merged_question = merged_question[1:-1]

        # Create merged item
        # Use first item's ID with -merged suffix (like reference impl)
        merged_id = f"{items[0].id}-merged"

        # Average weights if items have weights
        avg_weight = sum(item.weight for item in items) / len(items)

        return ChecklistItem(
            id=merged_id,
            question=merged_question,
            weight=avg_weight,
            metadata={
                "merged_from": [item.id for item in items],
                "original_questions": [item.question for item in items],
            },
        )

refine(checklist, **kwargs)

Deduplicate the checklist by merging similar questions.

Parameters:

Name Type Description Default
checklist Checklist

Input checklist to deduplicate

required

Returns:

Type Description
Checklist

Checklist with similar questions merged

Source code in autochecklist/refiners/deduplicator.py
def refine(self, checklist: Checklist, **kwargs: Any) -> Checklist:
    """Deduplicate the checklist by merging similar questions.

    Args:
        checklist: Input checklist to deduplicate

    Returns:
        Checklist with similar questions merged
    """
    if len(checklist.items) <= 1:
        return self._create_refined_checklist(
            checklist,
            list(checklist.items),
            metadata_updates={"clusters_merged": 0},
        )

    # Get questions and compute embeddings
    questions = [item.question for item in checklist.items]
    embeddings = get_embeddings(questions, api_key=self.embedding_api_key)

    # Build similarity graph
    graph, clusters = self._build_similarity_graph(
        checklist.items, embeddings
    )

    # Process clusters
    refined_items: List[ChecklistItem] = []
    clusters_merged = 0

    for cluster_ids in clusters:
        cluster_items = [
            item for item in checklist.items if item.id in cluster_ids
        ]

        if len(cluster_items) == 1:
            # Isolated node - keep as-is
            refined_items.append(cluster_items[0])
        else:
            # Multi-node cluster - merge with LLM
            merged_item = self._merge_cluster(cluster_items)
            refined_items.append(merged_item)
            clusters_merged += 1

    return self._create_refined_checklist(
        checklist,
        refined_items,
        metadata_updates={
            "clusters_merged": clusters_merged,
            "similarity_threshold": self.similarity_threshold,
        },
    )

Tagger

Bases: ChecklistRefiner

Refiner that filters checklist items based on applicability and specificity.

Uses LLM (default: gpt-5-mini) with zero-shot CoT to classify each question: - Generally applicable: Can be answered Yes/No for any input (no N/A scenarios) - Section specific: Evaluates single aspect without cross-references

Source code in autochecklist/refiners/tagger.py
class Tagger(ChecklistRefiner):
    """Refiner that filters checklist items based on applicability and specificity.

    Uses LLM (default: gpt-5-mini) with zero-shot CoT to classify each question:
    - Generally applicable: Can be answered Yes/No for any input (no N/A scenarios)
    - Section specific: Evaluates single aspect without cross-references
    """

    def __init__(
        self,
        model: Optional[str] = None,
        temperature: Optional[float] = None,
        api_key: Optional[str] = None,
        custom_prompt: Optional[Union[str, Path]] = None,
        **kwargs,
    ):
        # Use gpt-5-mini as default (instead of o3-mini from paper)
        super().__init__(
            model=model or "openai/gpt-5-mini",
            temperature=temperature,
            api_key=api_key,
            **kwargs,
        )
        # Load tagging prompt template
        if custom_prompt is not None:
            if isinstance(custom_prompt, Path):
                template_str = custom_prompt.read_text(encoding="utf-8")
            else:
                template_str = custom_prompt
        else:
            template_str = load_template("generators/feedback", "tag")
        self._tag_template = PromptTemplate(template_str)

    @property
    def refiner_name(self) -> str:
        return "tagger"

    def refine(self, checklist: Checklist, **kwargs: Any) -> Checklist:
        """Filter checklist items based on tagging criteria.

        Args:
            checklist: Input checklist to filter

        Returns:
            Checklist with only items that pass both criteria
        """
        if len(checklist.items) == 0:
            return self._create_refined_checklist(
                checklist,
                [],
                metadata_updates={
                    "filtered_count": 0,
                    "filtered_items": [],
                },
            )

        passing_items: List[ChecklistItem] = []
        filtered_items: List[Dict[str, Any]] = []

        for item in checklist.items:
            tag_result = self._tag_question(item)

            if tag_result["generally_applicable"] and tag_result["section_specific"]:
                # Question passes - add tagging metadata
                item_with_metadata = ChecklistItem(
                    id=item.id,
                    question=item.question,
                    weight=item.weight,
                    category=item.category,
                    metadata={
                        **(item.metadata or {}),
                        "generally_applicable": True,
                        "section_specific": True,
                        "tag_reasoning": tag_result.get("reasoning", ""),
                    },
                )
                passing_items.append(item_with_metadata)
            else:
                # Question filtered out - track reason
                filtered_items.append({
                    "id": item.id,
                    "question": item.question,
                    "generally_applicable": tag_result["generally_applicable"],
                    "section_specific": tag_result["section_specific"],
                    "reasoning": tag_result.get("reasoning", ""),
                })

        return self._create_refined_checklist(
            checklist,
            passing_items,
            metadata_updates={
                "filtered_count": len(filtered_items),
                "filtered_items": filtered_items,
            },
        )

    def _tag_question(self, item: ChecklistItem) -> Dict[str, Any]:
        """Tag a single question for applicability and specificity.

        Args:
            item: Checklist item to tag

        Returns:
            Dict with keys: generally_applicable, section_specific, reasoning
        """
        prompt = self._tag_template.format(question=item.question)

        response_format = {
            "type": "json_schema",
            "json_schema": {
                "name": "tag_result",
                "strict": True,
                "schema": {
                    "type": "object",
                    "properties": {
                        "reasoning": {"type": "string"},
                        "generally_applicable": {"type": "boolean"},
                        "section_specific": {"type": "boolean"},
                    },
                    "required": ["reasoning", "generally_applicable", "section_specific"],
                    "additionalProperties": False,
                },
            },
        }

        try:
            response = self._call_model(prompt, response_format=response_format)
            result = json.loads(response)
            return {
                "generally_applicable": result.get("generally_applicable", False),
                "section_specific": result.get("section_specific", False),
                "reasoning": result.get("reasoning", ""),
            }
        except (json.JSONDecodeError, KeyError, TypeError):
            # Malformed response - default to filtering out
            return {
                "generally_applicable": False,
                "section_specific": False,
                "reasoning": "Failed to parse LLM response",
            }

refine(checklist, **kwargs)

Filter checklist items based on tagging criteria.

Parameters:

Name Type Description Default
checklist Checklist

Input checklist to filter

required

Returns:

Type Description
Checklist

Checklist with only items that pass both criteria

Source code in autochecklist/refiners/tagger.py
def refine(self, checklist: Checklist, **kwargs: Any) -> Checklist:
    """Filter checklist items based on tagging criteria.

    Args:
        checklist: Input checklist to filter

    Returns:
        Checklist with only items that pass both criteria
    """
    if len(checklist.items) == 0:
        return self._create_refined_checklist(
            checklist,
            [],
            metadata_updates={
                "filtered_count": 0,
                "filtered_items": [],
            },
        )

    passing_items: List[ChecklistItem] = []
    filtered_items: List[Dict[str, Any]] = []

    for item in checklist.items:
        tag_result = self._tag_question(item)

        if tag_result["generally_applicable"] and tag_result["section_specific"]:
            # Question passes - add tagging metadata
            item_with_metadata = ChecklistItem(
                id=item.id,
                question=item.question,
                weight=item.weight,
                category=item.category,
                metadata={
                    **(item.metadata or {}),
                    "generally_applicable": True,
                    "section_specific": True,
                    "tag_reasoning": tag_result.get("reasoning", ""),
                },
            )
            passing_items.append(item_with_metadata)
        else:
            # Question filtered out - track reason
            filtered_items.append({
                "id": item.id,
                "question": item.question,
                "generally_applicable": tag_result["generally_applicable"],
                "section_specific": tag_result["section_specific"],
                "reasoning": tag_result.get("reasoning", ""),
            })

    return self._create_refined_checklist(
        checklist,
        passing_items,
        metadata_updates={
            "filtered_count": len(filtered_items),
            "filtered_items": filtered_items,
        },
    )

UnitTester

Bases: ChecklistRefiner

Refiner that validates questions via unit test rewrites.

Pipeline: 1. For each question, find samples that pass (answer=Yes) 2. LLM rewrites each sample to fail the criterion 3. Score rewritten samples - should get "No" 4. Enforceability rate = proportion of rewrites correctly failing 5. Filter questions below threshold

Source code in autochecklist/refiners/unit_tester.py
class UnitTester(ChecklistRefiner):
    """Refiner that validates questions via unit test rewrites.

    Pipeline:
    1. For each question, find samples that pass (answer=Yes)
    2. LLM rewrites each sample to fail the criterion
    3. Score rewritten samples - should get "No"
    4. Enforceability rate = proportion of rewrites correctly failing
    5. Filter questions below threshold
    """

    def __init__(
        self,
        enforceability_threshold: float = 0.7,
        max_samples: int = 10,
        model: Optional[str] = None,
        scorer_model: Optional[str] = None,
        temperature: Optional[float] = None,
        api_key: Optional[str] = None,
        custom_prompt: Optional[Union[str, Path]] = None,
        **kwargs,
    ):
        super().__init__(model=model, temperature=temperature, api_key=api_key, **kwargs)
        self.enforceability_threshold = enforceability_threshold
        self.max_samples = max_samples
        self.scorer_model = scorer_model or model
        # Load rewrite prompt template
        if custom_prompt is not None:
            if isinstance(custom_prompt, Path):
                template_str = custom_prompt.read_text(encoding="utf-8")
            else:
                template_str = custom_prompt
        else:
            template_str = load_template("generators/feedback", "rewrite_fail")
        self._rewrite_template = PromptTemplate(template_str)

    @property
    def refiner_name(self) -> str:
        return "unit_tester"

    def refine(
        self,
        checklist: Checklist,
        samples: Optional[List[Dict[str, Any]]] = None,
        sample_scores: Optional[Dict[str, Dict[str, str]]] = None,
        raw_samples: Optional[List[Dict[str, Any]]] = None,
        **kwargs: Any,
    ) -> Checklist:
        """Filter checklist based on LLM enforceability.

        Args:
            checklist: Input checklist to validate
            samples: List of sample dicts with 'id' and 'text' keys
            sample_scores: Dict mapping sample_id -> {question_id -> "Yes"/"No"}
            raw_samples: Samples to auto-score when sample_scores not provided.
                         Each dict must have 'id' and 'text' keys.

        Returns:
            Checklist with only enforceable questions
        """
        # Auto-score raw_samples if provided without sample_scores
        if raw_samples and not sample_scores:
            sample_scores = self._auto_score_samples(checklist, raw_samples)
            samples = raw_samples
        else:
            samples = samples or []
            sample_scores = sample_scores or {}

        if len(checklist.items) == 0 or len(samples) == 0:
            return self._create_refined_checklist(
                checklist,
                [],
                metadata_updates={
                    "filtered_count": len(checklist.items),
                    "enforceability_rates": {},
                },
            )

        passing_items: List[ChecklistItem] = []
        enforceability_rates: Dict[str, float] = {}
        filtered_count = 0

        for item in checklist.items:
            # Find samples that pass this question
            passing_samples = self._get_passing_samples(
                item.id, samples, sample_scores
            )

            if len(passing_samples) == 0:
                # No passing samples - can't test enforceability
                filtered_count += 1
                enforceability_rates[item.id] = 0.0
                continue

            # Limit samples
            if len(passing_samples) > self.max_samples:
                random.seed(0)  # Reproducibility
                passing_samples = random.sample(passing_samples, self.max_samples)

            # Test enforceability
            rate = self._compute_enforceability(item, passing_samples)
            enforceability_rates[item.id] = rate

            if rate >= self.enforceability_threshold:
                # Question passes - add enforceability metadata
                item_with_metadata = ChecklistItem(
                    id=item.id,
                    question=item.question,
                    weight=item.weight,
                    category=item.category,
                    metadata={
                        **(item.metadata or {}),
                        "enforceability_rate": rate,
                        "samples_tested": len(passing_samples),
                    },
                )
                passing_items.append(item_with_metadata)
            else:
                filtered_count += 1

        return self._create_refined_checklist(
            checklist,
            passing_items,
            metadata_updates={
                "filtered_count": filtered_count,
                "enforceability_rates": enforceability_rates,
                "enforceability_threshold": self.enforceability_threshold,
            },
        )

    def _get_passing_samples(
        self,
        question_id: str,
        samples: List[Dict[str, Any]],
        sample_scores: Dict[str, Dict[str, str]],
    ) -> List[Dict[str, Any]]:
        """Get samples that pass a specific question."""
        passing = []
        for sample in samples:
            sample_id = sample["id"]
            if sample_id in sample_scores:
                if sample_scores[sample_id].get(question_id) == "Yes":
                    passing.append(sample)
        return passing

    def _compute_enforceability(
        self,
        item: ChecklistItem,
        passing_samples: List[Dict[str, Any]],
    ) -> float:
        """Compute enforceability rate for a question.

        Args:
            item: Checklist item to test
            passing_samples: Samples that pass this question

        Returns:
            Enforceability rate (0.0 to 1.0)
        """
        if len(passing_samples) == 0:
            return 0.0

        correct_failures = 0
        for sample in passing_samples:
            # Rewrite sample to fail
            rewritten = self._rewrite_sample(item.question, sample["text"])

            # Score rewritten sample
            score = self._score_sample(item.id, item.question, rewritten)

            if score == "No":
                correct_failures += 1

        return correct_failures / len(passing_samples)

    def _rewrite_sample(self, question: str, sample_text: str) -> str:
        """Rewrite a passing sample to fail the criterion.

        Args:
            question: The checklist question
            sample_text: The sample text to rewrite

        Returns:
            Rewritten sample that should fail
        """
        prompt = self._rewrite_template.format(
            question=question,
            sample=sample_text,
        )

        response = self._call_model(prompt)
        return response.strip()

    def _score_sample(
        self,
        question_id: str,
        question: str,
        sample_text: str,
    ) -> str:
        """Score a sample against a question.

        Args:
            question_id: ID of the question
            question: The checklist question
            sample_text: The sample to score

        Returns:
            "Yes" or "No"
        """
        prompt = f"""Evaluate the following sample against the criterion.

Question: {question}

Sample:
{sample_text}

Does this sample meet the criterion? Answer only "Yes" or "No"."""

        response = self._call_model(prompt)
        response = response.strip().lower()

        # Normalize response
        if "yes" in response:
            return "Yes"
        elif "no" in response:
            return "No"
        else:
            # Ambiguous - treat as failure (conservative)
            return "No"

    def _auto_score_samples(
        self,
        checklist: Checklist,
        raw_samples: List[Dict[str, Any]],
    ) -> Dict[str, Dict[str, str]]:
        """Score each sample against every checklist question.

        Builds the sample_scores dict that ``refine()`` normally expects as
        a pre-computed input.

        Args:
            checklist: Checklist with items to score against
            raw_samples: Sample dicts with 'id' and 'text' keys

        Returns:
            Dict mapping sample_id -> {question_id -> "Yes"/"No"}
        """
        scores: Dict[str, Dict[str, str]] = {}
        for sample in raw_samples:
            sample_id = sample["id"]
            sample_text = sample["text"]
            scores[sample_id] = {}
            for item in checklist.items:
                result = self._score_sample(item.id, item.question, sample_text)
                scores[sample_id][item.id] = result
        return scores

refine(checklist, samples=None, sample_scores=None, raw_samples=None, **kwargs)

Filter checklist based on LLM enforceability.

Parameters:

Name Type Description Default
checklist Checklist

Input checklist to validate

required
samples Optional[List[Dict[str, Any]]]

List of sample dicts with 'id' and 'text' keys

None
sample_scores Optional[Dict[str, Dict[str, str]]]

Dict mapping sample_id -> {question_id -> "Yes"/"No"}

None
raw_samples Optional[List[Dict[str, Any]]]

Samples to auto-score when sample_scores not provided. Each dict must have 'id' and 'text' keys.

None

Returns:

Type Description
Checklist

Checklist with only enforceable questions

Source code in autochecklist/refiners/unit_tester.py
def refine(
    self,
    checklist: Checklist,
    samples: Optional[List[Dict[str, Any]]] = None,
    sample_scores: Optional[Dict[str, Dict[str, str]]] = None,
    raw_samples: Optional[List[Dict[str, Any]]] = None,
    **kwargs: Any,
) -> Checklist:
    """Filter checklist based on LLM enforceability.

    Args:
        checklist: Input checklist to validate
        samples: List of sample dicts with 'id' and 'text' keys
        sample_scores: Dict mapping sample_id -> {question_id -> "Yes"/"No"}
        raw_samples: Samples to auto-score when sample_scores not provided.
                     Each dict must have 'id' and 'text' keys.

    Returns:
        Checklist with only enforceable questions
    """
    # Auto-score raw_samples if provided without sample_scores
    if raw_samples and not sample_scores:
        sample_scores = self._auto_score_samples(checklist, raw_samples)
        samples = raw_samples
    else:
        samples = samples or []
        sample_scores = sample_scores or {}

    if len(checklist.items) == 0 or len(samples) == 0:
        return self._create_refined_checklist(
            checklist,
            [],
            metadata_updates={
                "filtered_count": len(checklist.items),
                "enforceability_rates": {},
            },
        )

    passing_items: List[ChecklistItem] = []
    enforceability_rates: Dict[str, float] = {}
    filtered_count = 0

    for item in checklist.items:
        # Find samples that pass this question
        passing_samples = self._get_passing_samples(
            item.id, samples, sample_scores
        )

        if len(passing_samples) == 0:
            # No passing samples - can't test enforceability
            filtered_count += 1
            enforceability_rates[item.id] = 0.0
            continue

        # Limit samples
        if len(passing_samples) > self.max_samples:
            random.seed(0)  # Reproducibility
            passing_samples = random.sample(passing_samples, self.max_samples)

        # Test enforceability
        rate = self._compute_enforceability(item, passing_samples)
        enforceability_rates[item.id] = rate

        if rate >= self.enforceability_threshold:
            # Question passes - add enforceability metadata
            item_with_metadata = ChecklistItem(
                id=item.id,
                question=item.question,
                weight=item.weight,
                category=item.category,
                metadata={
                    **(item.metadata or {}),
                    "enforceability_rate": rate,
                    "samples_tested": len(passing_samples),
                },
            )
            passing_items.append(item_with_metadata)
        else:
            filtered_count += 1

    return self._create_refined_checklist(
        checklist,
        passing_items,
        metadata_updates={
            "filtered_count": filtered_count,
            "enforceability_rates": enforceability_rates,
            "enforceability_threshold": self.enforceability_threshold,
        },
    )

Selector

Bases: ChecklistRefiner

Refiner that selects optimal diverse subset via beam search.

Since we lack source feedback mapping, uses embedding diversity only. Beam search explores multiple candidate subsets to find optimal score.

Source code in autochecklist/refiners/selector.py
class Selector(ChecklistRefiner):
    """Refiner that selects optimal diverse subset via beam search.

    Since we lack source feedback mapping, uses embedding diversity only.
    Beam search explores multiple candidate subsets to find optimal score.
    """

    def __init__(
        self,
        max_questions: int = 20,
        beam_width: int = 5,
        length_penalty: float = 0.0005,
        model: Optional[str] = None,
        temperature: Optional[float] = None,
        api_key: Optional[str] = None,
        embedding_api_key: Optional[str] = None,
        # Coverage-aware selection
        observations: Optional[List[str]] = None,
        classifier_model: Optional[str] = None,
        alpha: float = 0.5,
        custom_prompt: Optional[Union[str, Path]] = None,
        **kwargs,
    ):
        super().__init__(model=model, temperature=temperature, api_key=api_key, **kwargs)
        self.max_questions = max_questions
        self.beam_width = beam_width
        self.length_penalty = length_penalty
        self.embedding_api_key = embedding_api_key
        # Coverage
        self.observations = observations
        self.classifier_model = classifier_model or "openai/gpt-4o-mini"
        self.alpha = alpha
        # Populated during refine() when observations are provided
        self._feedback_assignment: Dict[int, Set[int]] = {}
        self._total_feedback_count: int = 0
        # Load classify template
        if custom_prompt is not None:
            if isinstance(custom_prompt, Path):
                template_str = custom_prompt.read_text(encoding="utf-8")
            else:
                template_str = custom_prompt
        else:
            template_str = load_template("generators/feedback", "classify")
        self._classify_template = PromptTemplate(template_str)

    @property
    def refiner_name(self) -> str:
        return "selector"

    def refine(self, checklist: Checklist, **kwargs: Any) -> Checklist:
        """Select optimal diverse subset of questions.

        Args:
            checklist: Input checklist to select from

        Returns:
            Checklist with selected subset
        """
        if len(checklist.items) == 0:
            return self._create_refined_checklist(
                checklist,
                [],
                metadata_updates={
                    "diversity_score": 0.0,
                    "beam_width": self.beam_width,
                },
            )

        # Classify feedback if provided (before beam search)
        if self.observations:
            self._classify_feedback(checklist)

        # If already small enough, return as-is
        if len(checklist.items) <= self.max_questions:
            diversity = self._compute_diversity(
                list(range(len(checklist.items))),
                self._get_similarity_matrix(checklist),
            )
            return self._create_refined_checklist(
                checklist,
                list(checklist.items),
                metadata_updates={
                    "diversity_score": diversity,
                    "beam_width": self.beam_width,
                },
            )

        # Get embeddings and similarity matrix
        similarity_matrix = self._get_similarity_matrix(checklist)

        # Run beam search
        selected_indices, diversity_score = self._beam_search(
            len(checklist.items),
            similarity_matrix,
        )

        # Build selected items with metadata
        selected_items = []
        for order, idx in enumerate(selected_indices):
            item = checklist.items[idx]
            selected_items.append(
                ChecklistItem(
                    id=item.id,
                    question=item.question,
                    weight=item.weight,
                    category=item.category,
                    metadata={
                        **(item.metadata or {}),
                        "selection_order": order,
                    },
                )
            )

        return self._create_refined_checklist(
            checklist,
            selected_items,
            metadata_updates={
                "diversity_score": diversity_score,
                "beam_width": self.beam_width,
                "length_penalty": self.length_penalty,
            },
        )

    def _get_similarity_matrix(self, checklist: Checklist) -> np.ndarray:
        """Compute similarity matrix for checklist questions."""
        questions = [item.question for item in checklist.items]
        embeddings = get_embeddings(questions, api_key=self.embedding_api_key)
        return cosine_similarity(embeddings)

    def _beam_search(
        self,
        n_items: int,
        similarity_matrix: np.ndarray,
    ) -> Tuple[List[int], float]:
        """Run beam search to find optimal subset.

        Args:
            n_items: Total number of items
            similarity_matrix: Pairwise similarity matrix

        Returns:
            Tuple of (selected indices, diversity score)
        """
        # Initialize beam with empty set
        # Each candidate is (frozenset of indices, score)
        beam: List[Tuple[frozenset, float]] = [(frozenset(), 0.0)]

        for step in range(self.max_questions):
            candidates = []

            for current_set, _ in beam:
                # Try adding each available item
                available = set(range(n_items)) - current_set

                for idx in available:
                    new_set = current_set | {idx}
                    score = self._score_subset(list(new_set), similarity_matrix)
                    candidates.append((new_set, score))

            if not candidates:
                break

            # Keep top beam_width candidates
            candidates.sort(key=lambda x: x[1], reverse=True)
            beam = candidates[: self.beam_width]

        # Return best candidate
        if beam:
            best_set, best_score = max(beam, key=lambda x: x[1])
            selected = sorted(best_set)  # Sort for consistent ordering
            diversity = self._compute_diversity(selected, similarity_matrix)
            return selected, diversity
        else:
            return [], 0.0

    def _score_subset(
        self,
        indices: List[int],
        similarity_matrix: np.ndarray,
    ) -> float:
        """Score a subset based on diversity, coverage, and length penalty.

        Without observations: Score = Diversity - λ·k
        With observations:    Score = α·Coverage + (1-α)·Diversity - λ·k
        """
        if len(indices) == 0:
            return 0.0

        diversity = self._compute_diversity(indices, similarity_matrix)
        penalty = self.length_penalty * len(indices)

        if self._feedback_assignment:
            coverage = self._compute_coverage(indices)
            return (
                self.alpha * coverage
                + (1 - self.alpha) * diversity
                - penalty
            )

        return diversity - penalty

    def _compute_diversity(
        self,
        indices: List[int],
        similarity_matrix: np.ndarray,
    ) -> float:
        """Compute diversity score for a subset.

        Diversity = 1 - average pairwise similarity
        """
        if len(indices) <= 1:
            return 1.0  # Single item is maximally diverse

        total_sim = 0.0
        count = 0

        for i in range(len(indices)):
            for j in range(i + 1, len(indices)):
                total_sim += similarity_matrix[indices[i], indices[j]]
                count += 1

        avg_sim = total_sim / count if count > 0 else 0.0
        return 1.0 - avg_sim

    def _classify_feedback(self, checklist: Checklist) -> Dict[int, Set[int]]:
        """Classify each feedback item against the current checklist questions.

        For each feedback item, asks an LLM which questions cover it.
        Builds a mapping from question index to set of feedback indices.

        Args:
            checklist: Current checklist to classify against

        Returns:
            Dict mapping question index -> set of feedback indices it covers
        """
        if not self.observations:
            return {}

        # Format questions as numbered list (0-indexed)
        questions_text = "\n".join(
            f"{i}. {item.question}" for i, item in enumerate(checklist.items)
        )

        assignment: Dict[int, Set[int]] = {
            i: set() for i in range(len(checklist.items))
        }

        for fb_idx, fb_item in enumerate(self.observations):
            prompt = self._classify_template.format(
                questions=questions_text,
                feedback_item=fb_item,
            )
            response = self._call_model(prompt)

            # Parse question numbers from response
            nums = re.findall(r"\d+", response)
            for num_str in nums:
                q_idx = int(num_str)
                if 0 <= q_idx < len(checklist.items):
                    assignment[q_idx].add(fb_idx)

        self._feedback_assignment = assignment
        self._total_feedback_count = len(self.observations)
        return assignment

    def _compute_coverage(self, indices: List[int]) -> float:
        """Compute fraction of feedback items covered by selected questions.

        Args:
            indices: Indices of selected questions

        Returns:
            Coverage score between 0.0 and 1.0
        """
        if self._total_feedback_count == 0:
            return 0.0

        covered: Set[int] = set()
        for idx in indices:
            covered |= self._feedback_assignment.get(idx, set())

        return len(covered) / self._total_feedback_count

refine(checklist, **kwargs)

Select optimal diverse subset of questions.

Parameters:

Name Type Description Default
checklist Checklist

Input checklist to select from

required

Returns:

Type Description
Checklist

Checklist with selected subset

Source code in autochecklist/refiners/selector.py
def refine(self, checklist: Checklist, **kwargs: Any) -> Checklist:
    """Select optimal diverse subset of questions.

    Args:
        checklist: Input checklist to select from

    Returns:
        Checklist with selected subset
    """
    if len(checklist.items) == 0:
        return self._create_refined_checklist(
            checklist,
            [],
            metadata_updates={
                "diversity_score": 0.0,
                "beam_width": self.beam_width,
            },
        )

    # Classify feedback if provided (before beam search)
    if self.observations:
        self._classify_feedback(checklist)

    # If already small enough, return as-is
    if len(checklist.items) <= self.max_questions:
        diversity = self._compute_diversity(
            list(range(len(checklist.items))),
            self._get_similarity_matrix(checklist),
        )
        return self._create_refined_checklist(
            checklist,
            list(checklist.items),
            metadata_updates={
                "diversity_score": diversity,
                "beam_width": self.beam_width,
            },
        )

    # Get embeddings and similarity matrix
    similarity_matrix = self._get_similarity_matrix(checklist)

    # Run beam search
    selected_indices, diversity_score = self._beam_search(
        len(checklist.items),
        similarity_matrix,
    )

    # Build selected items with metadata
    selected_items = []
    for order, idx in enumerate(selected_indices):
        item = checklist.items[idx]
        selected_items.append(
            ChecklistItem(
                id=item.id,
                question=item.question,
                weight=item.weight,
                category=item.category,
                metadata={
                    **(item.metadata or {}),
                    "selection_order": order,
                },
            )
        )

    return self._create_refined_checklist(
        checklist,
        selected_items,
        metadata_updates={
            "diversity_score": diversity_score,
            "beam_width": self.beam_width,
            "length_penalty": self.length_penalty,
        },
    )