Skip to content

API Reference

Complete reference for all public APIs.


DocumentProcessor

DocumentProcessor(provider: Union[str, Provider] = 'gemini', model_name: str = 'gemini-3-flash-preview', api_key: Optional[str] = None, security: Optional[SecurityPlugin] = None, cache: Optional[Any] = None, on_pre_process: Optional[PreProcessCallback] = None, on_post_process: Optional[PostProcessCallback] = None, on_error: Optional[ErrorCallback] = None, validators: Optional[List[Validator]] = None)

Facade for document processing, providing backwards compatibility.

This class delegates to specialized processor implementations: - SimpleProcessor: For single-call extraction. - VerifiedProcessor: For extraction with verification. - RagProcessor: For retrieval-augmented generation. - BatchProcessor: For parallel processing.

Initialize the document processor facade.

Source code in strutex/processor.py
def __init__(
    self,
    provider: Union[str, Provider] = "gemini",
    model_name: str = "gemini-3-flash-preview",
    api_key: Optional[str] = None,
    security: Optional[SecurityPlugin] = None,
    cache: Optional[Any] = None,
    on_pre_process: Optional[PreProcessCallback] = None,
    on_post_process: Optional[PostProcessCallback] = None,
    on_error: Optional[ErrorCallback] = None,
    validators: Optional[List[Validator]] = None,
):
    """Initialize the document processor facade."""
    # Generic config for all internal processors
    self._config = {
        "provider": provider,
        "model_name": model_name,
        "api_key": api_key,
        "security": security,
        "cache": cache,
        "validators": validators,
        "on_pre_process": on_pre_process,
        "on_post_process": on_post_process,
        "on_error": on_error,
    }

    # Lazy-loaded processors
    self._simple: Optional[SimpleProcessor] = None
    self._verified: Optional[VerifiedProcessor] = None
    self._rag: Optional[RagProcessor] = None
    self._batch: Optional[BatchProcessor] = None
    self._fallback: Optional[FallbackProcessor] = None
    self._router: Optional[RouterProcessor] = None
    self._ensemble: Optional[EnsembleProcessor] = None
    self._sequential: Optional[SequentialProcessor] = None
    self._privacy: Optional[PrivacyProcessor] = None
    self._active: Optional[ActiveLearningProcessor] = None
    self._agentic: Optional[AgenticProcessor] = None

agentic: AgenticProcessor property

Get the agentic processor instance.

batch: BatchProcessor property

Get the batch processor instance.

rag: RagProcessor property

Get the RAG processor instance.

simple: SimpleProcessor property

Get the simple processor instance.

verified: VerifiedProcessor property

Get the verified processor instance.

aprocess(file_path: str, prompt: str, schema: Optional[Schema] = None, model: Optional[Type] = None, security: Optional[Union[SecurityPlugin, bool]] = None, verify: bool = False, **kwargs) -> Any async

Async process a document.

Source code in strutex/processor.py
async def aprocess(
    self,
    file_path: str,
    prompt: str,
    schema: Optional[Schema] = None,
    model: Optional[Type] = None,
    security: Optional[Union[SecurityPlugin, bool]] = None,
    verify: bool = False,
    **kwargs
) -> Any:
    """Async process a document."""
    if verify:
        return await self.verified.aprocess(file_path, prompt, schema, model, security=security, **kwargs)
    return await self.simple.aprocess(file_path, prompt, schema, model, security=security, **kwargs)

aprocess_batch(file_paths: List[str], prompt: str, schema: Optional[Schema] = None, model: Optional[Type] = None, max_concurrency: int = 4, **kwargs) -> BatchContext async

Async process documents in batch.

Source code in strutex/processor.py
async def aprocess_batch(
    self,
    file_paths: List[str],
    prompt: str,
    schema: Optional[Schema] = None,
    model: Optional[Type] = None,
    max_concurrency: int = 4,
    **kwargs
) -> BatchContext:
    """Async process documents in batch."""
    self.batch.max_workers = max_concurrency
    return await self.batch.aprocess_batch(file_paths, prompt, schema, model, **kwargs)

create_active(**kwargs) -> ActiveLearningProcessor

Create a custom active learning processor.

Source code in strutex/processor.py
def create_active(self, **kwargs) -> ActiveLearningProcessor:
    """Create a custom active learning processor."""
    return ActiveLearningProcessor(**self._config, **kwargs)

create_ensemble(providers: List[Processor], **kwargs) -> EnsembleProcessor

Create a custom ensemble processor.

Source code in strutex/processor.py
def create_ensemble(self, providers: List[Processor], **kwargs) -> EnsembleProcessor:
    """Create a custom ensemble processor."""
    return EnsembleProcessor(providers=providers, **self._config, **kwargs)

create_fallback(configs: List[Dict[str, Any]]) -> FallbackProcessor

Create a custom fallback processor.

Source code in strutex/processor.py
def create_fallback(self, configs: List[Dict[str, Any]]) -> FallbackProcessor:
    """Create a custom fallback processor."""
    return FallbackProcessor(configs=configs, **self._config)

create_privacy(**kwargs) -> PrivacyProcessor

Create a custom privacy processor.

Source code in strutex/processor.py
def create_privacy(self, **kwargs) -> PrivacyProcessor:
    """Create a custom privacy processor."""
    return PrivacyProcessor(**self._config, **kwargs)

create_router(routes: Dict[str, Processor], **kwargs) -> RouterProcessor

Create a custom router processor.

Source code in strutex/processor.py
def create_router(self, routes: Dict[str, Processor], **kwargs) -> RouterProcessor:
    """Create a custom router processor."""
    return RouterProcessor(routes=routes, **self._config, **kwargs)

create_sequential(**kwargs) -> SequentialProcessor

Create a custom sequential processor.

Source code in strutex/processor.py
def create_sequential(self, **kwargs) -> SequentialProcessor:
    """Create a custom sequential processor."""
    return SequentialProcessor(**self._config, **kwargs)

on_error(func: ErrorCallback) -> ErrorCallback

Register error hook.

Source code in strutex/processor.py
def on_error(self, func: ErrorCallback) -> ErrorCallback:
    """Register error hook."""
    self._config["on_error"] = func
    if self._simple: self._simple.on_error(func)
    if self._verified: self._verified.on_error(func)
    if self._rag: self._rag.on_error(func)
    if self._batch: self._batch.on_error(func)
    return func

on_post_process(func: PostProcessCallback) -> PostProcessCallback

Register post-process hook.

Source code in strutex/processor.py
def on_post_process(self, func: PostProcessCallback) -> PostProcessCallback:
    """Register post-process hook."""
    self._config["on_post_process"] = func
    if self._simple: self._simple.on_post_process(func)
    if self._verified: self._verified.on_post_process(func)
    if self._rag: self._rag.on_post_process(func)
    if self._batch: self._batch.on_post_process(func)
    return func

on_pre_process(func: PreProcessCallback) -> PreProcessCallback

Register pre-process hook.

Source code in strutex/processor.py
def on_pre_process(self, func: PreProcessCallback) -> PreProcessCallback:
    """Register pre-process hook."""
    self._config["on_pre_process"] = func
    # If processors already exist, update them
    if self._simple: self._simple.on_pre_process(func)
    if self._verified: self._verified.on_pre_process(func)
    if self._rag: self._rag.on_pre_process(func)
    if self._batch: self._batch.on_pre_process(func)
    return func

process(file_path: str, prompt: str, schema: Optional[Schema] = None, model: Optional[Type] = None, security: Optional[Union[SecurityPlugin, bool]] = None, verify: bool = False, **kwargs) -> Any

Process a document (delegates to Simple or Verified processor).

Source code in strutex/processor.py
def process(
    self,
    file_path: str,
    prompt: str,
    schema: Optional[Schema] = None,
    model: Optional[Type] = None,
    security: Optional[Union[SecurityPlugin, bool]] = None,
    verify: bool = False,
    **kwargs
) -> Any:
    """Process a document (delegates to Simple or Verified processor)."""
    if verify:
        return self.verified.process(file_path, prompt, schema, model, security=security, **kwargs)
    return self.simple.process(file_path, prompt, schema, model, security=security, **kwargs)

process_batch(file_paths: List[str], prompt: str, schema: Optional[Schema] = None, model: Optional[Type] = None, max_workers: int = 4, **kwargs) -> BatchContext

Process documents in batch.

Source code in strutex/processor.py
def process_batch(
    self,
    file_paths: List[str],
    prompt: str,
    schema: Optional[Schema] = None,
    model: Optional[Type] = None,
    max_workers: int = 4,
    **kwargs
) -> BatchContext:
    """Process documents in batch."""
    # Update batch processor workers if different
    self.batch.max_workers = max_workers
    return self.batch.process_batch(file_paths, prompt, schema, model, **kwargs)

rag_ingest(file_path: str, collection_name: Optional[str] = None)

Ingest document for RAG.

Source code in strutex/processor.py
def rag_ingest(self, file_path: str, collection_name: Optional[str] = None):
    """Ingest document for RAG."""
    return self.rag.ingest(file_path, collection=collection_name)

rag_query(query: str, collection_name: Optional[str] = None, schema: Optional[Schema] = None, model: Optional[Type] = None) -> Any

Perform RAG query.

Source code in strutex/processor.py
def rag_query(
    self, 
    query: str, 
    collection_name: Optional[str] = None,
    schema: Optional[Schema] = None,
    model: Optional[Type] = None
) -> Any:
    """Perform RAG query."""
    return self.rag.query(query, collection=collection_name, schema=schema, model=model)

verify(file_path: str, result: Any, schema: Optional[Schema] = None, model: Optional[Type] = None, verify_prompt: Optional[str] = None, **kwargs) -> Any

Verify an existing result.

Source code in strutex/processor.py
def verify(
    self,
    file_path: str,
    result: Any,
    schema: Optional[Schema] = None,
    model: Optional[Type] = None,
    verify_prompt: Optional[str] = None,
    **kwargs
) -> Any:
    """Verify an existing result."""
    # Create a temporary verified processor with specific prompt if needed
    proc = self.verified
    if verify_prompt:
        proc = VerifiedProcessor(**{**self._config, "verify_prompt": verify_prompt})
    return proc._verify(file_path, result, schema or proc._convert_pydantic(model)[0], get_mime_type(file_path), **kwargs)

options: show_root_heading: true members: - init - process


Schema Types

String(description: Optional[str] = None, nullable: bool = False, format: Optional[str] = None)

Bases: Schema

Source code in strutex/types.py
def __init__(self, description: Optional[str] = None, nullable: bool = False, format: Optional[str] = None):
    super().__init__(Type.STRING, description=description, nullable=nullable, format=format)

options: show_root_heading: true

Number(description: Optional[str] = None, nullable: bool = False)

Bases: Schema

Source code in strutex/types.py
def __init__(self, description: Optional[str] = None, nullable: bool = False):
    super().__init__(Type.NUMBER, description=description, nullable=nullable)

options: show_root_heading: true

Integer(description: Optional[str] = None, nullable: bool = False)

Bases: Schema

Source code in strutex/types.py
def __init__(self, description: Optional[str] = None, nullable: bool = False):
    super().__init__(Type.INTEGER, description=description, nullable=nullable)

options: show_root_heading: true

Boolean(description: Optional[str] = None, nullable: bool = False)

Bases: Schema

Source code in strutex/types.py
def __init__(self, description: Optional[str] = None, nullable: bool = False):
    super().__init__(Type.BOOLEAN, description=description, nullable=nullable)

options: show_root_heading: true

Array(items: Union[Schema, PyType[Schema]], description: Optional[str] = None, nullable: bool = False)

Bases: Schema

Represents a list of items. :param items: The Schema definition for the items inside the array. Can be an instance (String()) or a class (String).

Source code in strutex/types.py
def __init__(self, items: Union[Schema, PyType[Schema]], description: Optional[str] = None, nullable: bool = False):
    """
    Represents a list of items.
    :param items: The Schema definition for the items inside the array.
                  Can be an instance (String()) or a class (String).
    """
    # Note: Superclass Schema handles the instantiation logic for 'items'
    super().__init__(Type.ARRAY, items=items, description=description, nullable=nullable)

options: show_root_heading: true

Object(properties: Dict[str, Union[Schema, PyType[Schema]]], description: Optional[str] = None, required: Optional[List[str]] = None, nullable: bool = False)

Bases: Schema

Represents a nested object (dictionary).

:param properties: Dictionary mapping field names to Schema objects (or classes). :param required: List of keys that are mandatory. If None, ALL properties are assumed required. Pass [] explicitly if no fields are required.

Source code in strutex/types.py
def __init__(
        self,
        properties: Dict[str, Union[Schema, PyType[Schema]]],
        description: Optional[str] = None,
        required: Optional[List[str]] = None,
        nullable: bool = False
):
    """
    Represents a nested object (dictionary).

    :param properties: Dictionary mapping field names to Schema objects (or classes).
    :param required: List of keys that are mandatory.
                     If None, ALL properties are assumed required.
                     Pass [] explicitly if no fields are required.
    """
    # Superclass Schema handles instantiation logic for 'properties'

    # We need to pre-calculate required based on keys, before passing to super
    # But wait, super modifies properties (instantiates them).
    # However, the keys remain the same. So we can just use keys from the input dict.

    # Smart Default: If 'required' is missing, assume strict mode (all fields required)
    if required is None:
        calculated_required = list(properties.keys())
    else:
        calculated_required = required

    super().__init__(
        Type.OBJECT,
        properties=properties,
        description=description,
        required=calculated_required,
        nullable=nullable
    )

options: show_root_heading: true


Plugin System

PluginRegistry

Central registry for all plugin types with lazy loading.

Plugins are stored as EntryPoint objects and only loaded when first accessed via get(). This improves startup time and avoids importing unused dependencies.

Usage

Get a plugin (loads on first access)

cls = PluginRegistry.get("provider", "gemini")

List all plugins (does not load them)

all_providers = PluginRegistry.list("provider")

Force discovery from entry points

count = PluginRegistry.discover()

clear(plugin_type: Optional[str] = None) -> None classmethod

Clear registered plugins.

PARAMETER DESCRIPTION
plugin_type

If provided, only clear this type. Otherwise clear all.

TYPE: Optional[str] DEFAULT: None

Source code in strutex/plugins/registry.py
@classmethod
def clear(cls, plugin_type: Optional[str] = None) -> None:
    """
    Clear registered plugins.

    Args:
        plugin_type: If provided, only clear this type. Otherwise clear all.
    """
    if plugin_type:
        cls._entry_points.pop(plugin_type, None)
        cls._loaded.pop(plugin_type, None)
        cls._manual.pop(plugin_type, None)
    else:
        cls._entry_points.clear()
        cls._loaded.clear()
        cls._manual.clear()
        cls._discovered = False

discover(group_prefix: str = 'strutex', force: bool = False) -> int classmethod

Discover and register plugins from entry points.

Scans for entry points matching the pattern: - strutex.providers - strutex.validators - strutex.postprocessors - strutex.security - etc.

Entry points are stored for lazy loading - they are not imported until first use via get().

PARAMETER DESCRIPTION
group_prefix

Entry point group prefix (default: "strutex")

TYPE: str DEFAULT: 'strutex'

force

Force re-discovery even if already discovered

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
int

Number of entry points discovered

Example pyproject.toml: [project.entry-points."strutex.providers"] my_provider = "my_package:MyProvider"

Source code in strutex/plugins/registry.py
@classmethod
def discover(cls, group_prefix: str = "strutex", force: bool = False) -> int:
    """
    Discover and register plugins from entry points.

    Scans for entry points matching the pattern:
    - strutex.providers
    - strutex.validators
    - strutex.postprocessors
    - strutex.security
    - etc.

    Entry points are stored for lazy loading - they are not imported
    until first use via get().

    Args:
        group_prefix: Entry point group prefix (default: "strutex")
        force: Force re-discovery even if already discovered

    Returns:
        Number of entry points discovered

    Example pyproject.toml:
        [project.entry-points."strutex.providers"]
        my_provider = "my_package:MyProvider"
    """
    if cls._discovered and not force:
        return sum(len(eps) for eps in cls._entry_points.values())

    discovered = 0

    # Get entry_points function
    if sys.version_info >= (3, 10):
        from importlib.metadata import entry_points
    else:
        try:
            from importlib_metadata import entry_points
        except ImportError:
            cls._discovered = True
            return 0

    # Get all entry points
    try:
        all_eps = entry_points()

        # Strategy: collect all matching EntryPoint objects first
        matching_eps: List["EntryPoint"] = []

        # Check for dict-like interface (Python < 3.10 stdlib or SelectableGroups in 3.10/3.11)
        if hasattr(all_eps, 'items'):
            for group, eps in all_eps.items():
                if group.startswith(f"{group_prefix}."):
                    # eps can be a single EntryPoint or list, depending on impl
                    # Standard is list
                    if isinstance(eps, list):
                        matching_eps.extend(eps)
                    else:
                        # Some implementations might return single object? Unlikely but safe.
                        try:
                            matching_eps.extend(eps)
                        except TypeError:
                            matching_eps.append(eps)
        else:
            # Sequence-like interface (Python 3.12+ EntryPoints, or importlib_metadata)
            for ep in all_eps:
                if ep.group.startswith(f"{group_prefix}."):
                    matching_eps.append(ep)

        # Now register them
        for ep in matching_eps:
            # Extract plugin type from group name (e.g. "strutex.providers" -> "provider")
            plugin_type = ep.group.replace(f"{group_prefix}.", "").rstrip("s")

            if plugin_type not in cls._entry_points:
                cls._entry_points[plugin_type] = {}

            cls._entry_points[plugin_type][ep.name.lower()] = ep
            discovered += 1

    except Exception:
        pass

    cls._discovered = True
    return discovered

get(plugin_type: str, name: str) -> Optional[Type] classmethod

Get a registered plugin class by type and name.

If the plugin is registered via entry point and not yet loaded, it will be loaded on first access (lazy loading).

PARAMETER DESCRIPTION
plugin_type

Type of plugin

TYPE: str

name

Name of the plugin

TYPE: str

RETURNS DESCRIPTION
Optional[Type]

The plugin class, or None if not found

Source code in strutex/plugins/registry.py
@classmethod
def get(cls, plugin_type: str, name: str) -> Optional[Type]:
    """
    Get a registered plugin class by type and name.

    If the plugin is registered via entry point and not yet loaded,
    it will be loaded on first access (lazy loading).

    Args:
        plugin_type: Type of plugin
        name: Name of the plugin

    Returns:
        The plugin class, or None if not found
    """
    name_lower = name.lower()

    # Ensure discovery has run
    if not cls._discovered:
        cls.discover()

    # Check loaded cache first
    if name_lower in cls._loaded.get(plugin_type, {}):
        return cls._loaded[plugin_type][name_lower]

    # Check manual registrations
    if name_lower in cls._manual.get(plugin_type, {}):
        return cls._manual[plugin_type][name_lower]

    # Try to lazy load from entry point
    ep = cls._entry_points.get(plugin_type, {}).get(name_lower)
    if ep is not None:
        plugin_cls = cls._load_entry_point(ep, plugin_type, name_lower)
        if plugin_cls is not None:
            return plugin_cls

    return None

get_plugin_info(plugin_type: str, name: str) -> Optional[Dict[str, Any]] classmethod

Get metadata about a plugin without necessarily loading it.

PARAMETER DESCRIPTION
plugin_type

Type of plugin

TYPE: str

name

Name of the plugin

TYPE: str

RETURNS DESCRIPTION
Optional[Dict[str, Any]]

Dict with plugin info, or None if not found

Source code in strutex/plugins/registry.py
@classmethod
def get_plugin_info(cls, plugin_type: str, name: str) -> Optional[Dict[str, Any]]:
    """
    Get metadata about a plugin without necessarily loading it.

    Args:
        plugin_type: Type of plugin
        name: Name of the plugin

    Returns:
        Dict with plugin info, or None if not found
    """
    name_lower = name.lower()

    if not cls._discovered:
        cls.discover()

    # Check if loaded
    if name_lower in cls._loaded.get(plugin_type, {}):
        plugin_cls = cls._loaded[plugin_type][name_lower]
        return {
            "name": name_lower,
            "version": getattr(plugin_cls, "strutex_plugin_version", "unknown"),
            "priority": getattr(plugin_cls, "priority", 50),
            "cost": getattr(plugin_cls, "cost", 1.0),
            "capabilities": getattr(plugin_cls, "capabilities", []),
            "loaded": True,
            "healthy": cls._check_health(plugin_cls),
        }

    # Check entry point
    ep = cls._entry_points.get(plugin_type, {}).get(name_lower)
    if ep is not None:
        return {
            "name": name_lower,
            "entry_point": f"{ep.group}:{ep.name}",
            "loaded": False,
            "healthy": None,  # Unknown until loaded
        }

    return None

get_sorted(plugin_type: str, reverse: bool = True) -> List[Tuple[str, Type]] classmethod

Get all plugins of a type sorted by priority.

Useful for waterfall selection where you want to try higher-priority plugins first.

PARAMETER DESCRIPTION
plugin_type

Type of plugin

TYPE: str

reverse

If True (default), higher priority first

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
List[Tuple[str, Type]]

List of (name, class) tuples sorted by priority

Source code in strutex/plugins/registry.py
@classmethod
def get_sorted(cls, plugin_type: str, reverse: bool = True) -> List[Tuple[str, Type]]:
    """
    Get all plugins of a type sorted by priority.

    Useful for waterfall selection where you want to try
    higher-priority plugins first.

    Args:
        plugin_type: Type of plugin
        reverse: If True (default), higher priority first

    Returns:
        List of (name, class) tuples sorted by priority
    """
    plugins = cls.list(plugin_type)
    return sorted(
        plugins.items(),
        key=lambda x: getattr(x[1], 'priority', 50),
        reverse=reverse
    )

list(plugin_type: str) -> Dict[str, Type] classmethod

List all plugins of a given type.

Note: This loads all plugins of the type. Use list_names() for a lightweight listing without loading.

PARAMETER DESCRIPTION
plugin_type

Type of plugin

TYPE: str

RETURNS DESCRIPTION
Dict[str, Type]

Dictionary mapping names to plugin classes

Source code in strutex/plugins/registry.py
@classmethod
def list(cls, plugin_type: str) -> Dict[str, Type]:
    """
    List all plugins of a given type.

    Note: This loads all plugins of the type. Use list_names()
    for a lightweight listing without loading.

    Args:
        plugin_type: Type of plugin

    Returns:
        Dictionary mapping names to plugin classes
    """
    if not cls._discovered:
        cls.discover()

    result = {}

    # Get all names from entry points and manual registrations
    all_names: Set[str] = set()
    all_names.update(cls._entry_points.get(plugin_type, {}).keys())
    all_names.update(cls._manual.get(plugin_type, {}).keys())
    all_names.update(cls._loaded.get(plugin_type, {}).keys())

    # Load each plugin
    for name in all_names:
        plugin_cls = cls.get(plugin_type, name)
        if plugin_cls is not None:
            result[name] = plugin_cls

    return result

list_names(plugin_type: str) -> List[str] classmethod

List names of all plugins of a given type without loading them.

PARAMETER DESCRIPTION
plugin_type

Type of plugin

TYPE: str

RETURNS DESCRIPTION
List[str]

List of plugin names

Source code in strutex/plugins/registry.py
@classmethod
def list_names(cls, plugin_type: str) -> List[str]:
    """
    List names of all plugins of a given type without loading them.

    Args:
        plugin_type: Type of plugin

    Returns:
        List of plugin names
    """
    if not cls._discovered:
        cls.discover()

    names: Set[str] = set()
    names.update(cls._entry_points.get(plugin_type, {}).keys())
    names.update(cls._manual.get(plugin_type, {}).keys())
    names.update(cls._loaded.get(plugin_type, {}).keys())

    return sorted(names)

list_types() -> List[str] classmethod

List all registered plugin types.

Source code in strutex/plugins/registry.py
@classmethod
def list_types(cls) -> List[str]:
    """List all registered plugin types."""
    if not cls._discovered:
        cls.discover()

    types: Set[str] = set()
    types.update(cls._entry_points.keys())
    types.update(cls._manual.keys())
    types.update(cls._loaded.keys())

    return sorted(types)

register(plugin_type: str, name: str, plugin_cls: Type) -> None classmethod

Register a plugin class manually.

This is used by the @register decorator for backwards compatibility. Prefer using entry points in pyproject.toml for new plugins.

PARAMETER DESCRIPTION
plugin_type

Type of plugin (e.g., "provider", "security", "validator")

TYPE: str

name

Unique name for this plugin

TYPE: str

plugin_cls

The plugin class to register

TYPE: Type

Source code in strutex/plugins/registry.py
@classmethod
def register(cls, plugin_type: str, name: str, plugin_cls: Type) -> None:
    """
    Register a plugin class manually.

    This is used by the @register decorator for backwards compatibility.
    Prefer using entry points in pyproject.toml for new plugins.

    Args:
        plugin_type: Type of plugin (e.g., "provider", "security", "validator")
        name: Unique name for this plugin
        plugin_cls: The plugin class to register
    """
    if plugin_type not in cls._manual:
        cls._manual[plugin_type] = {}

    cls._manual[plugin_type][name.lower()] = plugin_cls

    # Also add to loaded cache
    if plugin_type not in cls._loaded:
        cls._loaded[plugin_type] = {}
    cls._loaded[plugin_type][name.lower()] = plugin_cls

options: show_root_heading: true members: - register - get - list - discover

register(plugin_type: str, name: Optional[str] = None) -> Callable[[Type], Type]

Decorator to register a plugin class at runtime.

Use this decorator for: - Runtime/dynamic registration based on config - Prototyping plugins without packaging - Plugins in the same codebase (not installed separately) - Conditional loading based on environment or feature flags

For distributable third-party plugin packages, use entry points in pyproject.toml instead.

PARAMETER DESCRIPTION
plugin_type

Type of plugin (e.g., "provider", "security", "validator")

TYPE: str

name

Optional name. If not provided, uses lowercase class name.

TYPE: Optional[str] DEFAULT: None

Usage

@register("provider") class MyProvider(Provider): ...

@register("provider", name="custom_name") class AnotherProvider(Provider): ...

See Also

Entry points in pyproject.toml for distributable packages:

[project.entry-points."strutex.providers"]
my_provider = "my_package:MyProvider"
Source code in strutex/plugins/registry.py
def register(
    plugin_type: str,
    name: Optional[str] = None,
) -> Callable[[Type], Type]:
    """
    Decorator to register a plugin class at runtime.

    Use this decorator for:
    - Runtime/dynamic registration based on config
    - Prototyping plugins without packaging
    - Plugins in the same codebase (not installed separately)
    - Conditional loading based on environment or feature flags

    For distributable third-party plugin packages, use entry points
    in pyproject.toml instead.

    Args:
        plugin_type: Type of plugin (e.g., "provider", "security", "validator")
        name: Optional name. If not provided, uses lowercase class name.

    Usage:
        @register("provider")
        class MyProvider(Provider):
            ...

        @register("provider", name="custom_name")
        class AnotherProvider(Provider):
            ...

    See Also:
        Entry points in pyproject.toml for distributable packages:

            [project.entry-points."strutex.providers"]
            my_provider = "my_package:MyProvider"
    """
    def decorator(cls: Type) -> Type:
        plugin_name = name if name else cls.__name__.lower()
        PluginRegistry.register(plugin_type, plugin_name, cls)
        return cls

    return decorator

options: show_root_heading: true


Base Classes

Provider

Bases: ABC

Base class for LLM providers.

All providers must implement the process method to handle document extraction via their specific LLM API.

Subclassing auto-registers the plugin. Use class arguments to customize:

class MyProvider(Provider, name="custom", priority=90):
    ...
ATTRIBUTE DESCRIPTION
strutex_plugin_version

API version for compatibility checks

TYPE: str

priority

Ordering priority (0-100, higher = preferred)

TYPE: int

cost

Cost hint for optimization (lower = cheaper)

TYPE: float

capabilities

List of supported features

TYPE: List[str]

aprocess(file_path: str, prompt: str, schema: Schema, mime_type: str, **kwargs: Any) -> Any async

Async version of process.

Runs the sync process() method in a thread pool to avoid blocking the event loop. Override this method for true native async support using async SDKs (e.g., AsyncOpenAI, AsyncAnthropic).

PARAMETER DESCRIPTION
file_path

Path to the document file

TYPE: str

prompt

Extraction prompt/instructions

TYPE: str

schema

Expected output schema

TYPE: Schema

mime_type

MIME type of the file

TYPE: str

**kwargs

Provider-specific options

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Any

Extracted data matching the schema

Source code in strutex/plugins/base.py
async def aprocess(
    self,
    file_path: str,
    prompt: str,
    schema: Schema,
    mime_type: str,
    **kwargs: Any
) -> Any:
    """
    Async version of process.

    Runs the sync process() method in a thread pool to avoid blocking
    the event loop. Override this method for true native async support
    using async SDKs (e.g., AsyncOpenAI, AsyncAnthropic).

    Args:
        file_path: Path to the document file
        prompt: Extraction prompt/instructions
        schema: Expected output schema
        mime_type: MIME type of the file
        **kwargs: Provider-specific options

    Returns:
        Extracted data matching the schema
    """
    import asyncio
    return await asyncio.to_thread(
        self.process, file_path, prompt, schema, mime_type, **kwargs
    )

has_capability(capability: str) -> bool

Check if this provider has a specific capability.

Source code in strutex/plugins/base.py
def has_capability(self, capability: str) -> bool:
    """Check if this provider has a specific capability."""
    return capability.lower() in [c.lower() for c in self.capabilities]

health_check() -> bool classmethod

Check if this provider is healthy and ready to use.

Override in subclasses for custom health checks (e.g., API connectivity).

RETURNS DESCRIPTION
bool

True if healthy, False otherwise

Source code in strutex/plugins/base.py
@classmethod
def health_check(cls) -> bool:
    """
    Check if this provider is healthy and ready to use.

    Override in subclasses for custom health checks (e.g., API connectivity).

    Returns:
        True if healthy, False otherwise
    """
    return True

process(file_path: str, prompt: str, schema: Schema, mime_type: str, **kwargs: Any) -> Any abstractmethod

Process a document and extract structured data.

PARAMETER DESCRIPTION
file_path

Path to the document file

TYPE: str

prompt

Extraction prompt/instructions

TYPE: str

schema

Expected output schema

TYPE: Schema

mime_type

MIME type of the file

TYPE: str

**kwargs

Provider-specific options

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Any

Extracted data matching the schema

Source code in strutex/plugins/base.py
@abstractmethod
def process(
    self,
    file_path: str,
    prompt: str,
    schema: Schema,
    mime_type: str,
    **kwargs: Any
) -> Any:
    """
    Process a document and extract structured data.

    Args:
        file_path: Path to the document file
        prompt: Extraction prompt/instructions
        schema: Expected output schema
        mime_type: MIME type of the file
        **kwargs: Provider-specific options

    Returns:
        Extracted data matching the schema
    """
    pass

options: show_root_heading: true

Validator

Bases: ABC

Base class for output validators.

Validators check extracted data for correctness and can optionally fix issues.

Subclassing auto-registers the plugin.

ATTRIBUTE DESCRIPTION
strutex_plugin_version

API version for compatibility checks

TYPE: str

priority

Ordering priority in validation chain

TYPE: int

health_check() -> bool classmethod

Check if this validator is healthy and ready.

Source code in strutex/plugins/base.py
@classmethod
def health_check(cls) -> bool:
    """Check if this validator is healthy and ready."""
    return True

validate(data: Dict[str, Any], schema: Optional[Schema] = None, source_text: Optional[str] = None) -> ValidationResult abstractmethod

Validate extracted data.

PARAMETER DESCRIPTION
data

The extracted data to validate

TYPE: Dict[str, Any]

schema

Optional schema to validate against

TYPE: Optional[Schema] DEFAULT: None

source_text

Optional source text for provenance checks

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
ValidationResult

ValidationResult with status and any issues

Source code in strutex/plugins/base.py
@abstractmethod
def validate(
    self,
    data: Dict[str, Any],
    schema: Optional[Schema] = None,
    source_text: Optional[str] = None
) -> "ValidationResult":
    """
    Validate extracted data.

    Args:
        data: The extracted data to validate
        schema: Optional schema to validate against
        source_text: Optional source text for provenance checks

    Returns:
        ValidationResult with status and any issues
    """
    pass

options: show_root_heading: true

Postprocessor

Bases: ABC

Base class for data postprocessors.

Postprocessors transform extracted data (e.g., normalize dates, convert currencies, standardize units).

Subclassing auto-registers the plugin.

ATTRIBUTE DESCRIPTION
strutex_plugin_version

API version for compatibility checks

TYPE: str

priority

Ordering priority in postprocessing pipeline

TYPE: int

health_check() -> bool classmethod

Check if this postprocessor is healthy and ready.

Source code in strutex/plugins/base.py
@classmethod
def health_check(cls) -> bool:
    """Check if this postprocessor is healthy and ready."""
    return True

process(data: Dict[str, Any]) -> Dict[str, Any] abstractmethod

Process/transform the extracted data.

PARAMETER DESCRIPTION
data

The data to transform

TYPE: Dict[str, Any]

RETURNS DESCRIPTION
Dict[str, Any]

Transformed data

Source code in strutex/plugins/base.py
@abstractmethod
def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Process/transform the extracted data.

    Args:
        data: The data to transform

    Returns:
        Transformed data
    """
    pass

options: show_root_heading: true

SecurityPlugin

Bases: ABC

Base class for security plugins.

Security plugins can validate/sanitize input before sending to the LLM and validate output before returning to the user.

Subclassing auto-registers the plugin.

ATTRIBUTE DESCRIPTION
strutex_plugin_version

API version for compatibility checks

TYPE: str

priority

Ordering priority in security chain

TYPE: int

health_check() -> bool classmethod

Check if this security plugin is healthy and ready.

Source code in strutex/plugins/base.py
@classmethod
def health_check(cls) -> bool:
    """Check if this security plugin is healthy and ready."""
    return True

validate_input(text: str) -> SecurityResult

Validate/sanitize input text before sending to LLM.

PARAMETER DESCRIPTION
text

The input text (prompt + document content)

TYPE: str

RETURNS DESCRIPTION
SecurityResult

SecurityResult with sanitized text or rejection

Source code in strutex/plugins/base.py
def validate_input(self, text: str) -> "SecurityResult":
    """
    Validate/sanitize input text before sending to LLM.

    Args:
        text: The input text (prompt + document content)

    Returns:
        SecurityResult with sanitized text or rejection
    """
    return SecurityResult(valid=True, text=text)

validate_output(data: Dict[str, Any]) -> SecurityResult

Validate output data before returning to user.

PARAMETER DESCRIPTION
data

The extracted data

TYPE: Dict[str, Any]

RETURNS DESCRIPTION
SecurityResult

SecurityResult with clean data or rejection

Source code in strutex/plugins/base.py
def validate_output(self, data: Dict[str, Any]) -> "SecurityResult":
    """
    Validate output data before returning to user.

    Args:
        data: The extracted data

    Returns:
        SecurityResult with clean data or rejection
    """
    return SecurityResult(valid=True, data=data)

options: show_root_heading: true


Security

SecurityChain(plugins: List[SecurityPlugin])

Bases: SecurityPlugin

Chains multiple security plugins together.

Runs each plugin in sequence. If any plugin rejects, the chain stops.

Usage

chain = SecurityChain([ InputSanitizer(collapse_whitespace=True), PromptInjectionDetector(), ]) result = chain.validate_input(text)

PARAMETER DESCRIPTION
plugins

List of security plugins to run in order

TYPE: List[SecurityPlugin]

Source code in strutex/security/chain.py
def __init__(self, plugins: List[SecurityPlugin]):
    """
    Args:
        plugins: List of security plugins to run in order
    """
    self.plugins = plugins

add(plugin: SecurityPlugin) -> SecurityChain

Add a plugin to the chain. Returns self for chaining.

Source code in strutex/security/chain.py
def add(self, plugin: SecurityPlugin) -> "SecurityChain":
    """Add a plugin to the chain. Returns self for chaining."""
    self.plugins.append(plugin)
    return self

validate_input(text: str) -> SecurityResult

Run all plugins' input validation in sequence.

Source code in strutex/security/chain.py
def validate_input(self, text: str) -> SecurityResult:
    """Run all plugins' input validation in sequence."""
    current_text = text

    for plugin in self.plugins:
        result = plugin.validate_input(current_text)
        if not result.valid:
            return result
        # Use possibly-sanitized text for next plugin
        if result.text is not None:
            current_text = result.text

    return SecurityResult(valid=True, text=current_text)

validate_output(data: Dict[str, Any]) -> SecurityResult

Run all plugins' output validation in sequence.

Source code in strutex/security/chain.py
def validate_output(self, data: Dict[str, Any]) -> SecurityResult:
    """Run all plugins' output validation in sequence."""
    current_data = data

    for plugin in self.plugins:
        result = plugin.validate_output(current_data)
        if not result.valid:
            return result
        # Use possibly-modified data for next plugin
        if result.data is not None:
            current_data = result.data

    return SecurityResult(valid=True, data=current_data)

options: show_root_heading: true

InputSanitizer(collapse_whitespace: bool = True, normalize_unicode: bool = True, remove_invisible: bool = True, max_length: Optional[int] = None)

Bases: SecurityPlugin

Sanitizes input text to prevent various attacks.

Features: - Collapse excessive whitespace - Normalize Unicode characters - Remove invisible characters - Limit input length

Usage

sanitizer = InputSanitizer(collapse_whitespace=True, max_length=50000) result = sanitizer.validate_input(text)

Source code in strutex/security/sanitizer.py
def __init__(
    self,
    collapse_whitespace: bool = True,
    normalize_unicode: bool = True,
    remove_invisible: bool = True,
    max_length: Optional[int] = None
):
    self.collapse_whitespace = collapse_whitespace
    self.normalize_unicode = normalize_unicode
    self.remove_invisible = remove_invisible
    self.max_length = max_length

validate_input(text: str) -> SecurityResult

Sanitize the input text.

Source code in strutex/security/sanitizer.py
def validate_input(self, text: str) -> SecurityResult:
    """Sanitize the input text."""
    sanitized = text

    # Normalize Unicode (NFC form)
    if self.normalize_unicode:
        sanitized = unicodedata.normalize("NFC", sanitized)

    # Remove invisible characters (zero-width, etc.)
    if self.remove_invisible:
        # Remove zero-width characters and other invisibles
        invisible_pattern = r'[\u200b\u200c\u200d\u2060\u2061\u2062\u2063\u2064\ufeff]'
        sanitized = re.sub(invisible_pattern, '', sanitized)

    # Collapse whitespace (multiple spaces/newlines -> single)
    if self.collapse_whitespace:
        # Collapse multiple spaces to single
        sanitized = re.sub(r' {2,}', ' ', sanitized)
        # Collapse multiple newlines to double (preserve paragraphs)
        sanitized = re.sub(r'\n{3,}', '\n\n', sanitized)
        # Remove trailing whitespace per line
        sanitized = re.sub(r' +$', '', sanitized, flags=re.MULTILINE)

    # Enforce max length
    if self.max_length and len(sanitized) > self.max_length:
        return SecurityResult(
            valid=False,
            text=None,
            reason=f"Input exceeds maximum length of {self.max_length} characters"
        )

    return SecurityResult(valid=True, text=sanitized)

options: show_root_heading: true

PromptInjectionDetector(block_on_detection: bool = True, additional_patterns: Optional[List[Tuple[str, str]]] = None)

Bases: SecurityPlugin

Detects common prompt injection patterns.

Checks for: - Direct instruction overrides ("ignore previous instructions") - Role manipulation ("you are now", "pretend to be") - Delimiter attacks (markdown, XML-style tags) - Encoding attacks (base64 instructions)

Usage

detector = PromptInjectionDetector(strict=True) result = detector.validate_input(text)

PARAMETER DESCRIPTION
block_on_detection

Whether to raise SecurityError on detection.

TYPE: bool DEFAULT: True

additional_patterns

List of (pattern, description) tuples to add.

TYPE: Optional[List[Tuple[str, str]]] DEFAULT: None

Source code in strutex/security/injection.py
def __init__(
    self,
    block_on_detection: bool = True,
    additional_patterns: Optional[List[Tuple[str, str]]] = None
):
    """
    Args:
        block_on_detection: Whether to raise SecurityError on detection.
        additional_patterns: List of (pattern, description) tuples to add.
    """
    self.block_on_detection = block_on_detection

    # Combine default patterns with any additional ones
    self.patterns: List[Tuple[str, str]] = self.DEFAULT_PATTERNS.copy()
    if additional_patterns:
        self.patterns.extend(additional_patterns)

get_detections(text: str) -> List[dict]

Get detailed detection information without blocking.

Source code in strutex/security/injection.py
def get_detections(self, text: str) -> List[dict]:
    """Get detailed detection information without blocking."""
    detections = []
    for pattern, category in self.patterns:
        matches = re.findall(pattern, text, re.IGNORECASE)
        if matches:
            detections.append({
                "category": category,
                "pattern": pattern,
                "matches": matches[:5]  # Limit for safety
            })
    return detections

process(file_path: str, prompt: str, schema: Any, mime_type: str, context: Dict[str, Any]) -> SecurityResult

Check for prompt injection attempts (adapter for Processor).

Source code in strutex/security/injection.py
def process(
    self,
    file_path: str,
    prompt: str,
    schema: Any,
    mime_type: str,
    context: Dict[str, Any]
) -> SecurityResult:
    """Check for prompt injection attempts (adapter for Processor)."""
    return self.validate_input(prompt)

validate_input(text: str) -> SecurityResult

Validate input text.

Source code in strutex/security/injection.py
def validate_input(self, text: str) -> SecurityResult:
    """Validate input text."""
    issues = self._check_injection(text)

    if issues:
        message = "; ".join(issues)
        if self.block_on_detection:
            from ..exceptions import SecurityError
            raise SecurityError(
                f"Security violation: {message}",
                details={"issues": issues}
            )

        return SecurityResult(
            valid=False,
            text=text,
            reason=message
        )

    return SecurityResult(valid=True, text=text)

options: show_root_heading: true

OutputValidator(check_secrets: bool = True, check_prompt_leaks: bool = True, secret_patterns: Optional[List[tuple]] = None, block_on_detection: bool = True)

Bases: SecurityPlugin

Validates LLM output for security issues.

Checks for: - Leaked API keys/secrets - Leaked system prompts - Suspicious executable patterns - PII exposure

Usage

validator = OutputValidator() result = validator.validate_output(data)

Source code in strutex/security/output.py
def __init__(
    self,
    check_secrets: bool = True,
    check_prompt_leaks: bool = True,
    secret_patterns: Optional[List[tuple]] = None,
    block_on_detection: bool = True
):
    self.check_secrets = check_secrets
    self.check_prompt_leaks = check_prompt_leaks
    self.block_on_detection = block_on_detection

    # Compile patterns
    patterns = secret_patterns or self.SECRET_PATTERNS
    self._secret_patterns = [(re.compile(p, re.IGNORECASE), name) for p, name in patterns]
    self._leak_patterns = [re.compile(p, re.IGNORECASE) for p in self.PROMPT_LEAK_PATTERNS]

validate_output(data: Dict[str, Any]) -> SecurityResult

Validate output data for security issues.

Source code in strutex/security/output.py
def validate_output(self, data: Dict[str, Any]) -> SecurityResult:
    """Validate output data for security issues."""
    issues = []

    # Convert to string for pattern matching
    text = self._flatten_to_text(data)

    # Check for secrets
    if self.check_secrets:
        for pattern, secret_type in self._secret_patterns:
            if pattern.search(text):
                issues.append(f"Potential {secret_type} detected in output")

    # Check for prompt leaks
    if self.check_prompt_leaks:
        for pattern in self._leak_patterns:
            if pattern.search(text):
                issues.append("Potential system prompt leak detected")
                break

    if issues:
        if self.block_on_detection:
            return SecurityResult(
                valid=False,
                data=None,
                reason="; ".join(issues)
            )
        else:
            return SecurityResult(
                valid=True,
                data=data,
                reason=f"Warning: {'; '.join(issues)}"
            )

    return SecurityResult(valid=True, data=data)

options: show_root_heading: true


Prompts

StructuredPrompt(persona: str = 'You are a highly accurate AI Data Extraction Assistant.')

Builder for organizing complex extraction prompts.

Provides a fluent API for constructing well-structured prompts with general rules, field-specific rules, and output guidelines.

Usage

prompt = StructuredPrompt("You are an expert...")

Variadic arguments allow adding multiple rules at once

prompt.add_general_rule("No guessing", "Use ISO dates") prompt.add_field_rule("total", "Exclude tax", "Must be numeric", critical=True) final_string = prompt.compile()

Example

prompt = ( ... StructuredPrompt() ... .add_general_rule( ... "Strict data fidelity: do not invent values.", ... "Dates must be in DD.MM.YYYY format." ... ) ... .add_field_rule( ... "artikelnummer", ... "Must be 8 digits.", ... "Ignore supplier codes.", ... critical=True ... ) ... .add_output_guideline("Return valid JSON.") ... .compile() ... )

Initialize the prompt builder.

PARAMETER DESCRIPTION
persona

The system persona/role description.

TYPE: str DEFAULT: 'You are a highly accurate AI Data Extraction Assistant.'

Source code in strutex/prompts/builder.py
def __init__(self, persona: str = "You are a highly accurate AI Data Extraction Assistant."):
    """
    Initialize the prompt builder.

    Args:
        persona: The system persona/role description.
    """
    self.persona = persona.strip()
    self.general_rules: List[str] = []
    self.field_rules: Dict[str, List[str]] = {}
    self.output_guidelines: List[str] = []

__str__() -> str

Allow using the prompt directly as a string.

Source code in strutex/prompts/builder.py
def __str__(self) -> str:
    """Allow using the prompt directly as a string."""
    return self.compile()

add_field_rule(field_name: str, *rules: str, critical: bool = False) -> StructuredPrompt

Adds one or more rules specific to a single field.

PARAMETER DESCRIPTION
field_name

The name of the field these rules apply to.

TYPE: str

*rules

Variable number of rule strings.

TYPE: str DEFAULT: ()

critical

If True, prefixes rules with CRITICAL.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
StructuredPrompt

Self for method chaining.

Example

.add_field_rule("invoice_id", "Must be numeric", "8 digits", critical=True)

Source code in strutex/prompts/builder.py
def add_field_rule(self, field_name: str, *rules: str, critical: bool = False) -> "StructuredPrompt":
    """
    Adds one or more rules specific to a single field.

    Args:
        field_name: The name of the field these rules apply to.
        *rules: Variable number of rule strings.
        critical: If True, prefixes rules with **CRITICAL**.

    Returns:
        Self for method chaining.

    Example:
        .add_field_rule("invoice_id", "Must be numeric", "8 digits", critical=True)
    """
    if field_name not in self.field_rules:
        self.field_rules[field_name] = []

    prefix = "**CRITICAL**: " if critical else ""
    for rule in rules:
        self.field_rules[field_name].append(f"{prefix}{rule}")
    return self

add_general_rule(*rules: str) -> StructuredPrompt

Adds one or more high-level rules.

PARAMETER DESCRIPTION
*rules

Variable number of rule strings.

TYPE: str DEFAULT: ()

RETURNS DESCRIPTION
StructuredPrompt

Self for method chaining.

Example

.add_general_rule("Rule 1", "Rule 2", "Rule 3")

Source code in strutex/prompts/builder.py
def add_general_rule(self, *rules: str) -> "StructuredPrompt":
    """
    Adds one or more high-level rules.

    Args:
        *rules: Variable number of rule strings.

    Returns:
        Self for method chaining.

    Example:
        .add_general_rule("Rule 1", "Rule 2", "Rule 3")
    """
    self.general_rules.extend(rules)
    return self

add_output_guideline(*guidelines: str) -> StructuredPrompt

Adds formatting instructions for the output.

PARAMETER DESCRIPTION
*guidelines

Variable number of guideline strings.

TYPE: str DEFAULT: ()

RETURNS DESCRIPTION
StructuredPrompt

Self for method chaining.

Example

.add_output_guideline("JSON only", "No markdown", "No comments")

Source code in strutex/prompts/builder.py
def add_output_guideline(self, *guidelines: str) -> "StructuredPrompt":
    """
    Adds formatting instructions for the output.

    Args:
        *guidelines: Variable number of guideline strings.

    Returns:
        Self for method chaining.

    Example:
        .add_output_guideline("JSON only", "No markdown", "No comments")
    """
    self.output_guidelines.extend(guidelines)
    return self

compile() -> str

Builds the final prompt string.

RETURNS DESCRIPTION
str

The complete formatted prompt ready for LLM consumption.

Source code in strutex/prompts/builder.py
def compile(self) -> str:
    """
    Builds the final prompt string.

    Returns:
        The complete formatted prompt ready for LLM consumption.
    """
    parts = [self.persona, ""]

    if self.general_rules:
        parts.append("### 1. General Principles")
        parts.extend([f"- {r}" for r in self.general_rules])
        parts.append("")

    if self.field_rules:
        parts.append("### 2. Field Rules")
        for field, rules in self.field_rules.items():
            parts.append(f"\n**{field}**:")
            parts.extend([f"- {r}" for r in rules])
        parts.append("")

    parts.append("### 3. Output Format")
    if self.output_guidelines:
        parts.extend([f"- {r}" for r in self.output_guidelines])
    else:
        parts.append("- Output valid JSON only. No markdown.")

    return "\n".join(parts)

from_schema(schema: Any, persona: Optional[str] = None) -> StructuredPrompt classmethod

Create a StructuredPrompt with field rules auto-generated from a Pydantic schema.

PARAMETER DESCRIPTION
schema

A Pydantic BaseModel class with Field descriptions.

TYPE: Any

persona

Optional custom persona string.

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
StructuredPrompt

A StructuredPrompt with field rules for each described field.

Example

from pydantic import BaseModel, Field class Invoice(BaseModel): ... invoice_number: str = Field(description="Unique invoice ID") ... total: float = Field(description="Final amount due")

prompt = StructuredPrompt.from_schema(Invoice) prompt.add_general_rule("Use ISO dates") print(prompt.compile())

Source code in strutex/prompts/builder.py
@classmethod
def from_schema(cls, schema: Any, persona: Optional[str] = None) -> "StructuredPrompt":
    """
    Create a StructuredPrompt with field rules auto-generated from a Pydantic schema.

    Args:
        schema: A Pydantic BaseModel class with Field descriptions.
        persona: Optional custom persona string.

    Returns:
        A StructuredPrompt with field rules for each described field.

    Example:
        >>> from pydantic import BaseModel, Field
        >>> class Invoice(BaseModel):
        ...     invoice_number: str = Field(description="Unique invoice ID")
        ...     total: float = Field(description="Final amount due")
        >>> 
        >>> prompt = StructuredPrompt.from_schema(Invoice)
        >>> prompt.add_general_rule("Use ISO dates")
        >>> print(prompt.compile())
    """
    if persona:
        instance = cls(persona=persona)
    else:
        instance = cls()

    # Check if it's a Pydantic model
    if hasattr(schema, "model_fields"):
        # Pydantic v2
        for field_name, field_info in schema.model_fields.items():
            description = field_info.description
            if description:
                # Mark required fields as critical
                is_required = field_info.is_required()
                instance.add_field_rule(field_name, description, critical=is_required)
    elif hasattr(schema, "__fields__"):
        # Pydantic v1 fallback
        for field_name, field_info in schema.__fields__.items():
            description = field_info.field_info.description
            if description:
                is_required = field_info.required
                instance.add_field_rule(field_name, description, critical=is_required)

    return instance

options: show_root_heading: true members: - init - add_general_rule - add_field_rule - add_output_guideline - compile


Pydantic Support

pydantic_to_schema(model: Type) -> Schema

Convert a Pydantic BaseModel to a strutex Schema.

PARAMETER DESCRIPTION
model

A Pydantic BaseModel class

TYPE: Type

RETURNS DESCRIPTION
Schema

Equivalent strutex Schema (Object)

Example

from pydantic import BaseModel

class Invoice(BaseModel): invoice_number: str total: float items: list[LineItem]

schema = pydantic_to_schema(Invoice)

Source code in strutex/pydantic_support.py
def pydantic_to_schema(model: Type) -> Schema:
    """
    Convert a Pydantic BaseModel to a strutex Schema.

    Args:
        model: A Pydantic BaseModel class

    Returns:
        Equivalent strutex Schema (Object)

    Example:
        from pydantic import BaseModel

        class Invoice(BaseModel):
            invoice_number: str
            total: float
            items: list[LineItem]

        schema = pydantic_to_schema(Invoice)
    """
    try:
        from pydantic import BaseModel
        from pydantic.fields import FieldInfo
    except ImportError:
        raise ImportError("Pydantic is required for pydantic_to_schema. Install with: pip install pydantic")

    if not (inspect.isclass(model) and issubclass(model, BaseModel)):
        raise TypeError(f"Expected Pydantic BaseModel, got {type(model)}")

    properties = {}
    required_fields = []

    # Get model fields
    for field_name, field_info in model.model_fields.items():
        field_type = field_info.annotation
        description = field_info.description

        # Check if required
        if field_info.is_required():
            required_fields.append(field_name)

        # Convert type to schema
        properties[field_name] = _python_type_to_schema(
            field_type, 
            description=description,
            nullable=not field_info.is_required()
        )

    return Object(
        properties=properties,
        description=model.__doc__,
        required=required_fields if required_fields else None
    )

options: show_root_heading: true

validate_with_pydantic(data: Dict[str, Any], model: Type) -> Any

Validate extracted data against a Pydantic model.

PARAMETER DESCRIPTION
data

Extracted dictionary data

TYPE: Dict[str, Any]

model

Pydantic BaseModel class to validate against

TYPE: Type

RETURNS DESCRIPTION
Any

Validated Pydantic model instance

RAISES DESCRIPTION
ValidationError

If validation fails

Source code in strutex/pydantic_support.py
def validate_with_pydantic(data: Dict[str, Any], model: Type) -> Any:
    """
    Validate extracted data against a Pydantic model.

    Args:
        data: Extracted dictionary data
        model: Pydantic BaseModel class to validate against

    Returns:
        Validated Pydantic model instance

    Raises:
        pydantic.ValidationError: If validation fails
    """
    try:
        from pydantic import BaseModel
    except ImportError:
        raise ImportError("Pydantic is required. Install with: pip install pydantic")

    if not (inspect.isclass(model) and issubclass(model, BaseModel)):
        raise TypeError(f"Expected Pydantic BaseModel, got {type(model)}")

    return model.model_validate(data)

options: show_root_heading: true


Exceptions

SecurityError

Bases: Exception

Raised when security validation fails.

This exception is raised when either input validation (e.g., prompt injection detected) or output validation (e.g., leaked secrets detected) fails.

ATTRIBUTE DESCRIPTION
message

Description of the security failure.

Example
from strutex.processor import SecurityError

try:
    result = processor.process(file, prompt, schema, security=True)
except SecurityError as e:
    print(f"Security check failed: {e}")

options: show_root_heading: true