Skip to content

API Reference

Complete reference for all public APIs.


DocumentProcessor

DocumentProcessor(provider: Union[str, Provider] = 'gemini', model_name: str = 'gemini-2.5-flash', api_key: Optional[str] = None, security: Optional[SecurityPlugin] = None, on_pre_process: Optional[PreProcessCallback] = None, on_post_process: Optional[PostProcessCallback] = None, on_error: Optional[ErrorCallback] = None)

Main document processing class for extracting structured data from documents.

The DocumentProcessor orchestrates document extraction using pluggable providers, with optional security layer and Pydantic model support. It automatically detects file types, applies security checks, and validates output against schemas.

ATTRIBUTE DESCRIPTION
security

Optional security plugin/chain for input/output validation.

Example

Basic usage with schema:

from strutex import DocumentProcessor, Object, String, Number

schema = Object(properties={
    "invoice_number": String(),
    "total": Number()
})

processor = DocumentProcessor(provider="gemini")
result = processor.process("invoice.pdf", "Extract data", schema)
print(result["invoice_number"])

With callbacks:

processor = DocumentProcessor(
    provider="gemini",
    on_post_process=lambda result, ctx: {**result, "processed": True}
)

With decorator:

processor = DocumentProcessor()

@processor.on_post_process
def add_timestamp(result, context):
    result["timestamp"] = datetime.now().isoformat()
    return result

Initialize the document processor.

PARAMETER DESCRIPTION
provider

Provider name (e.g., "gemini", "openai") or a Provider instance.

TYPE: Union[str, Provider] DEFAULT: 'gemini'

model_name

LLM model name to use (only when provider is a string).

TYPE: str DEFAULT: 'gemini-2.5-flash'

api_key

API key for the provider. Falls back to environment variables (e.g., GOOGLE_API_KEY for Gemini).

TYPE: Optional[str] DEFAULT: None

security

Optional SecurityPlugin or SecurityChain for input/output validation. Security is opt-in.

TYPE: Optional[SecurityPlugin] DEFAULT: None

on_pre_process

Callback called before processing. Receives (file_path, prompt, schema, mime_type, context) and can return a dict with modified values.

TYPE: Optional[PreProcessCallback] DEFAULT: None

on_post_process

Callback called after processing. Receives (result, context) and can return a modified result dict.

TYPE: Optional[PostProcessCallback] DEFAULT: None

on_error

Callback called on error. Receives (error, file_path, context) and can return a fallback result or None to propagate the error.

TYPE: Optional[ErrorCallback] DEFAULT: None

RAISES DESCRIPTION
ValueError

If the specified provider is not found in the registry.

Example
# Using callbacks
processor = DocumentProcessor(
    provider="gemini",
    on_post_process=lambda result, ctx: normalize_dates(result)
)
Source code in strutex/processor.py
def __init__(
    self,
    provider: Union[str, Provider] = "gemini",
    model_name: str = "gemini-2.5-flash",
    api_key: Optional[str] = None,
    security: Optional[SecurityPlugin] = None,
    on_pre_process: Optional[PreProcessCallback] = None,
    on_post_process: Optional[PostProcessCallback] = None,
    on_error: Optional[ErrorCallback] = None,
):
    """
    Initialize the document processor.

    Args:
        provider: Provider name (e.g., "gemini", "openai") or a
            [`Provider`][strutex.plugins.base.Provider] instance.
        model_name: LLM model name to use (only when provider is a string).
        api_key: API key for the provider. Falls back to environment variables
            (e.g., `GOOGLE_API_KEY` for Gemini).
        security: Optional [`SecurityPlugin`][strutex.plugins.base.SecurityPlugin]
            or [`SecurityChain`][strutex.security.chain.SecurityChain] for
            input/output validation. Security is opt-in.
        on_pre_process: Callback called before processing. Receives
            (file_path, prompt, schema, mime_type, context) and can return
            a dict with modified values.
        on_post_process: Callback called after processing. Receives
            (result, context) and can return a modified result dict.
        on_error: Callback called on error. Receives (error, file_path, context)
            and can return a fallback result or None to propagate the error.

    Raises:
        ValueError: If the specified provider is not found in the registry.

    Example:
        ```python
        # Using callbacks
        processor = DocumentProcessor(
            provider="gemini",
            on_post_process=lambda result, ctx: normalize_dates(result)
        )
        ```
    """
    self.security = security

    # Hook storage: callbacks first, then decorated hooks
    self._pre_process_hooks: List[PreProcessCallback] = []
    self._post_process_hooks: List[PostProcessCallback] = []
    self._error_hooks: List[ErrorCallback] = []

    # Pluggy integration
    self._hook_plugin: Optional[_CallbackHookPlugin] = None
    self._hook_plugin_registered = False

    # Add initial callbacks if provided
    if on_pre_process:
        self._pre_process_hooks.append(on_pre_process)
    if on_post_process:
        self._post_process_hooks.append(on_post_process)
    if on_error:
        self._error_hooks.append(on_error)

    # Resolve provider
    if isinstance(provider, str):
        provider_name = provider.lower()

        # Try to get from registry
        provider_cls = PluginRegistry.get("provider", provider_name)

        if provider_cls:
            self._provider = provider_cls(api_key=api_key, model=model_name)
        else:
            # Fallback for backward compatibility
            if provider_name in ("google", "gemini"):
                from .providers.gemini import GeminiProvider
                self._provider = GeminiProvider(api_key=api_key, model=model_name)
            else:
                raise ValueError(f"Unknown provider: {provider}. Available: {list(PluginRegistry.list('provider').keys())}")
    else:
        # Provider instance passed directly
        self._provider = provider

__del__()

Unregister hooks when processor is garbage collected.

Source code in strutex/processor.py
def __del__(self):
    """Unregister hooks when processor is garbage collected."""
    if self._hook_plugin_registered and self._hook_plugin:
        try:
            from .plugins.hooks import get_plugin_manager
            pm = get_plugin_manager()
            if pm:
                pm.unregister(self._hook_plugin)
        except Exception:
            pass  # Ignore errors during cleanup

on_error(func: ErrorCallback) -> ErrorCallback

Decorator to register an error hook.

The hook receives (error, file_path, context) and can return a fallback result dict. Return None to propagate the original error.

Example
@processor.on_error
def handle_rate_limit(error, file_path, context):
    if "rate limit" in str(error).lower():
        return {"error": "Rate limited, please retry"}
    return None  # Propagate other errors
Source code in strutex/processor.py
def on_error(self, func: ErrorCallback) -> ErrorCallback:
    """
    Decorator to register an error hook.

    The hook receives (error, file_path, context) and can return a fallback
    result dict. Return None to propagate the original error.

    Example:
        ```python
        @processor.on_error
        def handle_rate_limit(error, file_path, context):
            if "rate limit" in str(error).lower():
                return {"error": "Rate limited, please retry"}
            return None  # Propagate other errors
        ```
    """
    self._error_hooks.append(func)
    self._hook_plugin_registered = False  # Force re-registration
    return func

on_post_process(func: PostProcessCallback) -> PostProcessCallback

Decorator to register a post-process hook.

The hook receives (result, context) and can return a modified result dict.

Example
@processor.on_post_process
def normalize_dates(result, context):
    result["date"] = parse_date(result.get("date"))
    return result
Source code in strutex/processor.py
def on_post_process(self, func: PostProcessCallback) -> PostProcessCallback:
    """
    Decorator to register a post-process hook.

    The hook receives (result, context) and can return a modified result dict.

    Example:
        ```python
        @processor.on_post_process
        def normalize_dates(result, context):
            result["date"] = parse_date(result.get("date"))
            return result
        ```
    """
    self._post_process_hooks.append(func)
    self._hook_plugin_registered = False  # Force re-registration
    return func

on_pre_process(func: PreProcessCallback) -> PreProcessCallback

Decorator to register a pre-process hook.

The hook receives (file_path, prompt, schema, mime_type, context) and can return a dict with modified values for 'prompt' or other parameters.

Example
@processor.on_pre_process
def add_instructions(file_path, prompt, schema, mime_type, context):
    return {"prompt": prompt + "\nBe precise."}
Source code in strutex/processor.py
def on_pre_process(self, func: PreProcessCallback) -> PreProcessCallback:
    """
    Decorator to register a pre-process hook.

    The hook receives (file_path, prompt, schema, mime_type, context) and
    can return a dict with modified values for 'prompt' or other parameters.

    Example:
        ```python
        @processor.on_pre_process
        def add_instructions(file_path, prompt, schema, mime_type, context):
            return {"prompt": prompt + "\\nBe precise."}
        ```
    """
    self._pre_process_hooks.append(func)
    self._hook_plugin_registered = False  # Force re-registration
    return func

process(file_path: str, prompt: str, schema: Optional[Schema] = None, model: Optional[Type] = None, security: Optional[Union[SecurityPlugin, bool]] = None, **kwargs) -> Any

Process a document and extract structured data.

This method automatically detects the file type, applies security validation (if enabled), sends the document to the LLM provider, and validates the output.

PARAMETER DESCRIPTION
file_path

Absolute path to the source file (PDF, Excel, or Image).

TYPE: str

prompt

Natural language instruction for extraction.

TYPE: str

schema

A [Schema][strutex.types.Schema] definition. Mutually exclusive with model.

TYPE: Optional[Schema] DEFAULT: None

model

A Pydantic BaseModel class. Mutually exclusive with schema. If provided, returns a validated Pydantic instance.

TYPE: Optional[Type] DEFAULT: None

security

Override security setting for this request. - True: Use default security chain - False: Disable security - SecurityPlugin: Use specific plugin - None: Use processor default

TYPE: Optional[Union[SecurityPlugin, bool]] DEFAULT: None

**kwargs

Additional provider-specific options.

DEFAULT: {}

RETURNS DESCRIPTION
Any

Extracted data as a dictionary, or a Pydantic model instance if model

Any

was provided.

RAISES DESCRIPTION
FileNotFoundError

If file_path does not exist.

ValueError

If neither schema nor model is provided.

SecurityError

If security validation fails (input or output rejected).

Example
result = processor.process(
    file_path="invoice.pdf",
    prompt="Extract invoice number and total amount",
    schema=invoice_schema
)
print(result["total"])
Source code in strutex/processor.py
def process(
    self,
    file_path: str,
    prompt: str,
    schema: Optional[Schema] = None,
    model: Optional[Type] = None,
    security: Optional[Union[SecurityPlugin, bool]] = None,
    **kwargs
) -> Any:
    """
    Process a document and extract structured data.

    This method automatically detects the file type, applies security validation
    (if enabled), sends the document to the LLM provider, and validates the output.

    Args:
        file_path: Absolute path to the source file (PDF, Excel, or Image).
        prompt: Natural language instruction for extraction.
        schema: A [`Schema`][strutex.types.Schema] definition. Mutually exclusive
            with `model`.
        model: A Pydantic `BaseModel` class. Mutually exclusive with `schema`.
            If provided, returns a validated Pydantic instance.
        security: Override security setting for this request.
            - `True`: Use default security chain
            - `False`: Disable security
            - `SecurityPlugin`: Use specific plugin
            - `None`: Use processor default
        **kwargs: Additional provider-specific options.

    Returns:
        Extracted data as a dictionary, or a Pydantic model instance if `model`
        was provided.

    Raises:
        FileNotFoundError: If `file_path` does not exist.
        ValueError: If neither `schema` nor `model` is provided.
        SecurityError: If security validation fails (input or output rejected).

    Example:
        ```python
        result = processor.process(
            file_path="invoice.pdf",
            prompt="Extract invoice number and total amount",
            schema=invoice_schema
        )
        print(result["total"])
        ```
    """
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"File not found: {file_path}")

    # Ensure hooks are registered with pluggy
    self._ensure_hooks_registered()

    # Handle Pydantic model
    pydantic_model = None
    if model is not None:
        from .pydantic_support import pydantic_to_schema
        schema = pydantic_to_schema(model)
        pydantic_model = model

    if schema is None:
        raise ValueError("Either 'schema' or 'model' must be provided")

    # Detect MIME type
    mime_type = get_mime_type(file_path)

    # Create context for hooks
    context: Dict[str, Any] = {
        "file_path": file_path,
        "mime_type": mime_type,
        "kwargs": kwargs,
    }

    # Run pre-process hooks via pluggy
    from .plugins.hooks import call_hook
    pre_results = call_hook(
        "pre_process",
        file_path=file_path,
        prompt=prompt,
        schema=schema,
        mime_type=mime_type,
        context=context
    )
    # Apply any prompt modifications from hooks
    for hook_result in pre_results:
        if hook_result and isinstance(hook_result, dict) and "prompt" in hook_result:
            prompt = hook_result["prompt"]

    # Handle security
    effective_security = self._resolve_security(security)

    # Apply input security if enabled
    if effective_security:
        input_result = effective_security.validate_input(prompt)
        if not input_result.valid:
            raise SecurityError(f"Input rejected: {input_result.reason}")
        prompt = input_result.text or prompt

    # Process with provider (with error handling)
    try:
        result = self._provider.process(
            file_path=file_path,
            prompt=prompt,
            schema=schema,
            mime_type=mime_type,
            **kwargs
        )
    except Exception as e:
        # Run error hooks via pluggy
        error_results = call_hook(
            "on_error",
            error=e,
            file_path=file_path,
            context=context
        )
        # Use first non-None fallback
        fallback = None
        for hook_result in error_results:
            if hook_result is not None:
                fallback = hook_result
                break

        if fallback is not None:
            result = fallback
        else:
            raise  # Re-raise if no hook handled it

    # Apply output security if enabled
    if effective_security and isinstance(result, dict):
        output_result = effective_security.validate_output(result)
        if not output_result.valid:
            raise SecurityError(f"Output rejected: {output_result.reason}")
        result = output_result.data or result

    # Run post-process hooks via pluggy
    if isinstance(result, dict):
        post_results = call_hook(
            "post_process",
            result=result,
            context=context
        )
        # Apply modifications from hooks
        for hook_result in post_results:
            if hook_result is not None and isinstance(hook_result, dict):
                result = hook_result

    # Validate with Pydantic if model was provided
    if pydantic_model is not None:
        from .pydantic_support import validate_with_pydantic
        result = validate_with_pydantic(result, pydantic_model)

    return result

options: show_root_heading: true members: - init - process


Schema Types

String(description: str = None, nullable: bool = False)

Bases: Schema

Source code in strutex/types.py
def __init__(self, description: str = None, nullable: bool = False):
    super().__init__(Type.STRING, description=description, nullable=nullable)

options: show_root_heading: true

Number(description: str = None, nullable: bool = False)

Bases: Schema

Source code in strutex/types.py
def __init__(self, description: str = None, nullable: bool = False):
    super().__init__(Type.NUMBER, description=description, nullable=nullable)

options: show_root_heading: true

Integer(description: str = None, nullable: bool = False)

Bases: Schema

Source code in strutex/types.py
def __init__(self, description: str = None, nullable: bool = False):
    super().__init__(Type.INTEGER, description=description, nullable=nullable)

options: show_root_heading: true

Boolean(description: str = None, nullable: bool = False)

Bases: Schema

Source code in strutex/types.py
def __init__(self, description: str = None, nullable: bool = False):
    super().__init__(Type.BOOLEAN, description=description, nullable=nullable)

options: show_root_heading: true

Array(items: Schema, description: str = None, nullable: bool = False)

Bases: Schema

Represents a list of items. :param items: The Schema definition for the items inside the array.

Source code in strutex/types.py
def __init__(self, items: Schema, description: str = None, nullable: bool = False):
    """
    Represents a list of items.
    :param items: The Schema definition for the items inside the array.
    """
    super().__init__(Type.ARRAY, items=items, description=description, nullable=nullable)

options: show_root_heading: true

Object(properties: Dict[str, Schema], description: str = None, required: Optional[List[str]] = None, nullable: bool = False)

Bases: Schema

Represents a nested object (dictionary).

:param properties: Dictionary mapping field names to Schema objects. :param required: List of keys that are mandatory. If None, ALL properties are assumed required. Pass [] explicitly if no fields are required.

Source code in strutex/types.py
def __init__(
        self,
        properties: Dict[str, Schema],
        description: str = None,
        required: Optional[List[str]] = None,
        nullable: bool = False
):
    """
    Represents a nested object (dictionary).

    :param properties: Dictionary mapping field names to Schema objects.
    :param required: List of keys that are mandatory.
                     If None, ALL properties are assumed required.
                     Pass [] explicitly if no fields are required.
    """
    # Smart Default: If 'required' is missing, assume strict mode (all fields required)
    if required is None:
        calculated_required = list(properties.keys())
    else:
        calculated_required = required

    super().__init__(
        Type.OBJECT,
        properties=properties,
        description=description,
        required=calculated_required,
        nullable=nullable
    )

options: show_root_heading: true


Plugin System

PluginRegistry

Central registry for all plugin types with lazy loading.

Plugins are stored as EntryPoint objects and only loaded when first accessed via get(). This improves startup time and avoids importing unused dependencies.

Usage

Get a plugin (loads on first access)

cls = PluginRegistry.get("provider", "gemini")

List all plugins (does not load them)

all_providers = PluginRegistry.list("provider")

Force discovery from entry points

count = PluginRegistry.discover()

clear(plugin_type: Optional[str] = None) -> None classmethod

Clear registered plugins.

PARAMETER DESCRIPTION
plugin_type

If provided, only clear this type. Otherwise clear all.

TYPE: Optional[str] DEFAULT: None

Source code in strutex/plugins/registry.py
@classmethod
def clear(cls, plugin_type: Optional[str] = None) -> None:
    """
    Clear registered plugins.

    Args:
        plugin_type: If provided, only clear this type. Otherwise clear all.
    """
    if plugin_type:
        cls._entry_points.pop(plugin_type, None)
        cls._loaded.pop(plugin_type, None)
        cls._manual.pop(plugin_type, None)
    else:
        cls._entry_points.clear()
        cls._loaded.clear()
        cls._manual.clear()
        cls._discovered = False

discover(group_prefix: str = 'strutex', force: bool = False) -> int classmethod

Discover and register plugins from entry points.

Scans for entry points matching the pattern: - strutex.providers - strutex.validators - strutex.postprocessors - strutex.security - etc.

Entry points are stored for lazy loading - they are not imported until first use via get().

PARAMETER DESCRIPTION
group_prefix

Entry point group prefix (default: "strutex")

TYPE: str DEFAULT: 'strutex'

force

Force re-discovery even if already discovered

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
int

Number of entry points discovered

Example pyproject.toml: [project.entry-points."strutex.providers"] my_provider = "my_package:MyProvider"

Source code in strutex/plugins/registry.py
@classmethod
def discover(cls, group_prefix: str = "strutex", force: bool = False) -> int:
    """
    Discover and register plugins from entry points.

    Scans for entry points matching the pattern:
    - strutex.providers
    - strutex.validators
    - strutex.postprocessors
    - strutex.security
    - etc.

    Entry points are stored for lazy loading - they are not imported
    until first use via get().

    Args:
        group_prefix: Entry point group prefix (default: "strutex")
        force: Force re-discovery even if already discovered

    Returns:
        Number of entry points discovered

    Example pyproject.toml:
        [project.entry-points."strutex.providers"]
        my_provider = "my_package:MyProvider"
    """
    if cls._discovered and not force:
        return sum(len(eps) for eps in cls._entry_points.values())

    discovered = 0

    # Get entry_points function
    if sys.version_info >= (3, 10):
        from importlib.metadata import entry_points
    else:
        try:
            from importlib_metadata import entry_points
        except ImportError:
            cls._discovered = True
            return 0

    # Get all entry point groups
    try:
        all_eps = entry_points()

        # Get group names that match our prefix
        if hasattr(all_eps, 'groups'):
            # Python 3.12+ style
            groups = [g for g in all_eps.groups if g.startswith(f"{group_prefix}.")]
        elif hasattr(all_eps, 'keys'):
            # Python 3.9-3.11 style (dict-like)
            groups = [g for g in all_eps.keys() if g.startswith(f"{group_prefix}.")]
        else:
            groups = []
    except Exception:
        cls._discovered = True
        return 0

    for group in groups:
        # Extract plugin type from group name
        # e.g., "strutex.providers" -> "provider"
        plugin_type = group.replace(f"{group_prefix}.", "").rstrip("s")

        if plugin_type not in cls._entry_points:
            cls._entry_points[plugin_type] = {}

        try:
            # Get entry points for this group
            if hasattr(all_eps, 'select'):
                eps = all_eps.select(group=group)
            else:
                eps = all_eps.get(group, [])

            for ep in eps:
                # Store entry point for lazy loading
                cls._entry_points[plugin_type][ep.name.lower()] = ep
                discovered += 1

        except Exception:
            pass

    cls._discovered = True
    return discovered

get(plugin_type: str, name: str) -> Optional[Type] classmethod

Get a registered plugin class by type and name.

If the plugin is registered via entry point and not yet loaded, it will be loaded on first access (lazy loading).

PARAMETER DESCRIPTION
plugin_type

Type of plugin

TYPE: str

name

Name of the plugin

TYPE: str

RETURNS DESCRIPTION
Optional[Type]

The plugin class, or None if not found

Source code in strutex/plugins/registry.py
@classmethod
def get(cls, plugin_type: str, name: str) -> Optional[Type]:
    """
    Get a registered plugin class by type and name.

    If the plugin is registered via entry point and not yet loaded,
    it will be loaded on first access (lazy loading).

    Args:
        plugin_type: Type of plugin
        name: Name of the plugin

    Returns:
        The plugin class, or None if not found
    """
    name_lower = name.lower()

    # Ensure discovery has run
    if not cls._discovered:
        cls.discover()

    # Check loaded cache first
    if name_lower in cls._loaded.get(plugin_type, {}):
        return cls._loaded[plugin_type][name_lower]

    # Check manual registrations
    if name_lower in cls._manual.get(plugin_type, {}):
        return cls._manual[plugin_type][name_lower]

    # Try to lazy load from entry point
    ep = cls._entry_points.get(plugin_type, {}).get(name_lower)
    if ep is not None:
        plugin_cls = cls._load_entry_point(ep, plugin_type, name_lower)
        if plugin_cls is not None:
            return plugin_cls

    return None

get_plugin_info(plugin_type: str, name: str) -> Optional[Dict[str, Any]] classmethod

Get metadata about a plugin without necessarily loading it.

PARAMETER DESCRIPTION
plugin_type

Type of plugin

TYPE: str

name

Name of the plugin

TYPE: str

RETURNS DESCRIPTION
Optional[Dict[str, Any]]

Dict with plugin info, or None if not found

Source code in strutex/plugins/registry.py
@classmethod
def get_plugin_info(cls, plugin_type: str, name: str) -> Optional[Dict[str, Any]]:
    """
    Get metadata about a plugin without necessarily loading it.

    Args:
        plugin_type: Type of plugin
        name: Name of the plugin

    Returns:
        Dict with plugin info, or None if not found
    """
    name_lower = name.lower()

    if not cls._discovered:
        cls.discover()

    # Check if loaded
    if name_lower in cls._loaded.get(plugin_type, {}):
        plugin_cls = cls._loaded[plugin_type][name_lower]
        return {
            "name": name_lower,
            "version": getattr(plugin_cls, "strutex_plugin_version", "unknown"),
            "priority": getattr(plugin_cls, "priority", 50),
            "cost": getattr(plugin_cls, "cost", 1.0),
            "capabilities": getattr(plugin_cls, "capabilities", []),
            "loaded": True,
            "healthy": cls._check_health(plugin_cls),
        }

    # Check entry point
    ep = cls._entry_points.get(plugin_type, {}).get(name_lower)
    if ep is not None:
        return {
            "name": name_lower,
            "entry_point": f"{ep.group}:{ep.name}",
            "loaded": False,
            "healthy": None,  # Unknown until loaded
        }

    return None

get_sorted(plugin_type: str, reverse: bool = True) -> List[Tuple[str, Type]] classmethod

Get all plugins of a type sorted by priority.

Useful for waterfall selection where you want to try higher-priority plugins first.

PARAMETER DESCRIPTION
plugin_type

Type of plugin

TYPE: str

reverse

If True (default), higher priority first

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
List[Tuple[str, Type]]

List of (name, class) tuples sorted by priority

Source code in strutex/plugins/registry.py
@classmethod
def get_sorted(cls, plugin_type: str, reverse: bool = True) -> List[Tuple[str, Type]]:
    """
    Get all plugins of a type sorted by priority.

    Useful for waterfall selection where you want to try
    higher-priority plugins first.

    Args:
        plugin_type: Type of plugin
        reverse: If True (default), higher priority first

    Returns:
        List of (name, class) tuples sorted by priority
    """
    plugins = cls.list(plugin_type)
    return sorted(
        plugins.items(),
        key=lambda x: getattr(x[1], 'priority', 50),
        reverse=reverse
    )

list(plugin_type: str) -> Dict[str, Type] classmethod

List all plugins of a given type.

Note: This loads all plugins of the type. Use list_names() for a lightweight listing without loading.

PARAMETER DESCRIPTION
plugin_type

Type of plugin

TYPE: str

RETURNS DESCRIPTION
Dict[str, Type]

Dictionary mapping names to plugin classes

Source code in strutex/plugins/registry.py
@classmethod
def list(cls, plugin_type: str) -> Dict[str, Type]:
    """
    List all plugins of a given type.

    Note: This loads all plugins of the type. Use list_names()
    for a lightweight listing without loading.

    Args:
        plugin_type: Type of plugin

    Returns:
        Dictionary mapping names to plugin classes
    """
    if not cls._discovered:
        cls.discover()

    result = {}

    # Get all names from entry points and manual registrations
    all_names = set()
    all_names.update(cls._entry_points.get(plugin_type, {}).keys())
    all_names.update(cls._manual.get(plugin_type, {}).keys())
    all_names.update(cls._loaded.get(plugin_type, {}).keys())

    # Load each plugin
    for name in all_names:
        plugin_cls = cls.get(plugin_type, name)
        if plugin_cls is not None:
            result[name] = plugin_cls

    return result

list_names(plugin_type: str) -> List[str] classmethod

List names of all plugins of a given type without loading them.

PARAMETER DESCRIPTION
plugin_type

Type of plugin

TYPE: str

RETURNS DESCRIPTION
List[str]

List of plugin names

Source code in strutex/plugins/registry.py
@classmethod
def list_names(cls, plugin_type: str) -> List[str]:
    """
    List names of all plugins of a given type without loading them.

    Args:
        plugin_type: Type of plugin

    Returns:
        List of plugin names
    """
    if not cls._discovered:
        cls.discover()

    names = set()
    names.update(cls._entry_points.get(plugin_type, {}).keys())
    names.update(cls._manual.get(plugin_type, {}).keys())
    names.update(cls._loaded.get(plugin_type, {}).keys())

    return sorted(names)

list_types() -> List[str] classmethod

List all registered plugin types.

Source code in strutex/plugins/registry.py
@classmethod
def list_types(cls) -> List[str]:
    """List all registered plugin types."""
    if not cls._discovered:
        cls.discover()

    types = set()
    types.update(cls._entry_points.keys())
    types.update(cls._manual.keys())
    types.update(cls._loaded.keys())

    return sorted(types)

register(plugin_type: str, name: str, plugin_cls: Type) -> None classmethod

Register a plugin class manually.

This is used by the @register decorator for backwards compatibility. Prefer using entry points in pyproject.toml for new plugins.

PARAMETER DESCRIPTION
plugin_type

Type of plugin (e.g., "provider", "security", "validator")

TYPE: str

name

Unique name for this plugin

TYPE: str

plugin_cls

The plugin class to register

TYPE: Type

Source code in strutex/plugins/registry.py
@classmethod
def register(cls, plugin_type: str, name: str, plugin_cls: Type) -> None:
    """
    Register a plugin class manually.

    This is used by the @register decorator for backwards compatibility.
    Prefer using entry points in pyproject.toml for new plugins.

    Args:
        plugin_type: Type of plugin (e.g., "provider", "security", "validator")
        name: Unique name for this plugin
        plugin_cls: The plugin class to register
    """
    if plugin_type not in cls._manual:
        cls._manual[plugin_type] = {}

    cls._manual[plugin_type][name.lower()] = plugin_cls

    # Also add to loaded cache
    if plugin_type not in cls._loaded:
        cls._loaded[plugin_type] = {}
    cls._loaded[plugin_type][name.lower()] = plugin_cls

options: show_root_heading: true members: - register - get - list - discover

register(plugin_type: str, name: Optional[str] = None) -> Callable[[Type], Type]

Decorator to register a plugin class at runtime.

Use this decorator for: - Runtime/dynamic registration based on config - Prototyping plugins without packaging - Plugins in the same codebase (not installed separately) - Conditional loading based on environment or feature flags

For distributable third-party plugin packages, use entry points in pyproject.toml instead.

PARAMETER DESCRIPTION
plugin_type

Type of plugin (e.g., "provider", "security", "validator")

TYPE: str

name

Optional name. If not provided, uses lowercase class name.

TYPE: Optional[str] DEFAULT: None

Usage

@register("provider") class MyProvider(Provider): ...

@register("provider", name="custom_name") class AnotherProvider(Provider): ...

See Also

Entry points in pyproject.toml for distributable packages:

[project.entry-points."strutex.providers"]
my_provider = "my_package:MyProvider"
Source code in strutex/plugins/registry.py
def register(
    plugin_type: str,
    name: Optional[str] = None,
) -> Callable[[Type], Type]:
    """
    Decorator to register a plugin class at runtime.

    Use this decorator for:
    - Runtime/dynamic registration based on config
    - Prototyping plugins without packaging
    - Plugins in the same codebase (not installed separately)
    - Conditional loading based on environment or feature flags

    For distributable third-party plugin packages, use entry points
    in pyproject.toml instead.

    Args:
        plugin_type: Type of plugin (e.g., "provider", "security", "validator")
        name: Optional name. If not provided, uses lowercase class name.

    Usage:
        @register("provider")
        class MyProvider(Provider):
            ...

        @register("provider", name="custom_name")
        class AnotherProvider(Provider):
            ...

    See Also:
        Entry points in pyproject.toml for distributable packages:

            [project.entry-points."strutex.providers"]
            my_provider = "my_package:MyProvider"
    """
    def decorator(cls: Type) -> Type:
        plugin_name = name if name else cls.__name__.lower()
        PluginRegistry.register(plugin_type, plugin_name, cls)
        return cls

    return decorator

options: show_root_heading: true


Base Classes

Provider

Bases: ABC

Base class for LLM providers.

All providers must implement the process method to handle document extraction via their specific LLM API.

Subclassing auto-registers the plugin. Use class arguments to customize:

class MyProvider(Provider, name="custom", priority=90):
    ...
ATTRIBUTE DESCRIPTION
strutex_plugin_version

API version for compatibility checks

TYPE: str

priority

Ordering priority (0-100, higher = preferred)

TYPE: int

cost

Cost hint for optimization (lower = cheaper)

TYPE: float

capabilities

List of supported features

TYPE: List[str]

aprocess(file_path: str, prompt: str, schema: Schema, mime_type: str, **kwargs) -> Any async

Async version of process. Override for true async support. Default implementation calls sync version.

Source code in strutex/plugins/base.py
async def aprocess(
    self,
    file_path: str,
    prompt: str,
    schema: Schema,
    mime_type: str,
    **kwargs
) -> Any:
    """
    Async version of process. Override for true async support.
    Default implementation calls sync version.
    """
    return self.process(file_path, prompt, schema, mime_type, **kwargs)

has_capability(capability: str) -> bool

Check if this provider has a specific capability.

Source code in strutex/plugins/base.py
def has_capability(self, capability: str) -> bool:
    """Check if this provider has a specific capability."""
    return capability.lower() in [c.lower() for c in self.capabilities]

health_check() -> bool classmethod

Check if this provider is healthy and ready to use.

Override in subclasses for custom health checks (e.g., API connectivity).

RETURNS DESCRIPTION
bool

True if healthy, False otherwise

Source code in strutex/plugins/base.py
@classmethod
def health_check(cls) -> bool:
    """
    Check if this provider is healthy and ready to use.

    Override in subclasses for custom health checks (e.g., API connectivity).

    Returns:
        True if healthy, False otherwise
    """
    return True

process(file_path: str, prompt: str, schema: Schema, mime_type: str, **kwargs) -> Any abstractmethod

Process a document and extract structured data.

PARAMETER DESCRIPTION
file_path

Path to the document file

TYPE: str

prompt

Extraction prompt/instructions

TYPE: str

schema

Expected output schema

TYPE: Schema

mime_type

MIME type of the file

TYPE: str

**kwargs

Provider-specific options

DEFAULT: {}

RETURNS DESCRIPTION
Any

Extracted data matching the schema

Source code in strutex/plugins/base.py
@abstractmethod
def process(
    self,
    file_path: str,
    prompt: str,
    schema: Schema,
    mime_type: str,
    **kwargs
) -> Any:
    """
    Process a document and extract structured data.

    Args:
        file_path: Path to the document file
        prompt: Extraction prompt/instructions
        schema: Expected output schema
        mime_type: MIME type of the file
        **kwargs: Provider-specific options

    Returns:
        Extracted data matching the schema
    """
    pass

options: show_root_heading: true

Validator

Bases: ABC

Base class for output validators.

Validators check extracted data for correctness and can optionally fix issues.

Subclassing auto-registers the plugin.

ATTRIBUTE DESCRIPTION
strutex_plugin_version

API version for compatibility checks

TYPE: str

priority

Ordering priority in validation chain

TYPE: int

health_check() -> bool classmethod

Check if this validator is healthy and ready.

Source code in strutex/plugins/base.py
@classmethod
def health_check(cls) -> bool:
    """Check if this validator is healthy and ready."""
    return True

validate(data: Dict[str, Any], schema: Optional[Schema] = None) -> ValidationResult abstractmethod

Validate extracted data.

PARAMETER DESCRIPTION
data

The extracted data to validate

TYPE: Dict[str, Any]

schema

Optional schema to validate against

TYPE: Optional[Schema] DEFAULT: None

RETURNS DESCRIPTION
ValidationResult

ValidationResult with status and any issues

Source code in strutex/plugins/base.py
@abstractmethod
def validate(self, data: Dict[str, Any], schema: Optional[Schema] = None) -> "ValidationResult":
    """
    Validate extracted data.

    Args:
        data: The extracted data to validate
        schema: Optional schema to validate against

    Returns:
        ValidationResult with status and any issues
    """
    pass

options: show_root_heading: true

Postprocessor

Bases: ABC

Base class for data postprocessors.

Postprocessors transform extracted data (e.g., normalize dates, convert currencies, standardize units).

Subclassing auto-registers the plugin.

ATTRIBUTE DESCRIPTION
strutex_plugin_version

API version for compatibility checks

TYPE: str

priority

Ordering priority in postprocessing pipeline

TYPE: int

health_check() -> bool classmethod

Check if this postprocessor is healthy and ready.

Source code in strutex/plugins/base.py
@classmethod
def health_check(cls) -> bool:
    """Check if this postprocessor is healthy and ready."""
    return True

process(data: Dict[str, Any]) -> Dict[str, Any] abstractmethod

Process/transform the extracted data.

PARAMETER DESCRIPTION
data

The data to transform

TYPE: Dict[str, Any]

RETURNS DESCRIPTION
Dict[str, Any]

Transformed data

Source code in strutex/plugins/base.py
@abstractmethod
def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Process/transform the extracted data.

    Args:
        data: The data to transform

    Returns:
        Transformed data
    """
    pass

options: show_root_heading: true

SecurityPlugin

Bases: ABC

Base class for security plugins.

Security plugins can validate/sanitize input before sending to the LLM and validate output before returning to the user.

Subclassing auto-registers the plugin.

ATTRIBUTE DESCRIPTION
strutex_plugin_version

API version for compatibility checks

TYPE: str

priority

Ordering priority in security chain

TYPE: int

health_check() -> bool classmethod

Check if this security plugin is healthy and ready.

Source code in strutex/plugins/base.py
@classmethod
def health_check(cls) -> bool:
    """Check if this security plugin is healthy and ready."""
    return True

validate_input(text: str) -> SecurityResult

Validate/sanitize input text before sending to LLM.

PARAMETER DESCRIPTION
text

The input text (prompt + document content)

TYPE: str

RETURNS DESCRIPTION
SecurityResult

SecurityResult with sanitized text or rejection

Source code in strutex/plugins/base.py
def validate_input(self, text: str) -> "SecurityResult":
    """
    Validate/sanitize input text before sending to LLM.

    Args:
        text: The input text (prompt + document content)

    Returns:
        SecurityResult with sanitized text or rejection
    """
    return SecurityResult(valid=True, text=text)

validate_output(data: Dict[str, Any]) -> SecurityResult

Validate output data before returning to user.

PARAMETER DESCRIPTION
data

The extracted data

TYPE: Dict[str, Any]

RETURNS DESCRIPTION
SecurityResult

SecurityResult with clean data or rejection

Source code in strutex/plugins/base.py
def validate_output(self, data: Dict[str, Any]) -> "SecurityResult":
    """
    Validate output data before returning to user.

    Args:
        data: The extracted data

    Returns:
        SecurityResult with clean data or rejection
    """
    return SecurityResult(valid=True, data=data)

options: show_root_heading: true


Security

SecurityChain(plugins: List[SecurityPlugin])

Bases: SecurityPlugin

Chains multiple security plugins together.

Runs each plugin in sequence. If any plugin rejects, the chain stops.

Usage

chain = SecurityChain([ InputSanitizer(collapse_whitespace=True), PromptInjectionDetector(), ]) result = chain.validate_input(text)

PARAMETER DESCRIPTION
plugins

List of security plugins to run in order

TYPE: List[SecurityPlugin]

Source code in strutex/security/chain.py
def __init__(self, plugins: List[SecurityPlugin]):
    """
    Args:
        plugins: List of security plugins to run in order
    """
    self.plugins = plugins

add(plugin: SecurityPlugin) -> SecurityChain

Add a plugin to the chain. Returns self for chaining.

Source code in strutex/security/chain.py
def add(self, plugin: SecurityPlugin) -> "SecurityChain":
    """Add a plugin to the chain. Returns self for chaining."""
    self.plugins.append(plugin)
    return self

validate_input(text: str) -> SecurityResult

Run all plugins' input validation in sequence.

Source code in strutex/security/chain.py
def validate_input(self, text: str) -> SecurityResult:
    """Run all plugins' input validation in sequence."""
    current_text = text

    for plugin in self.plugins:
        result = plugin.validate_input(current_text)
        if not result.valid:
            return result
        # Use possibly-sanitized text for next plugin
        if result.text is not None:
            current_text = result.text

    return SecurityResult(valid=True, text=current_text)

validate_output(data: Dict[str, Any]) -> SecurityResult

Run all plugins' output validation in sequence.

Source code in strutex/security/chain.py
def validate_output(self, data: Dict[str, Any]) -> SecurityResult:
    """Run all plugins' output validation in sequence."""
    current_data = data

    for plugin in self.plugins:
        result = plugin.validate_output(current_data)
        if not result.valid:
            return result
        # Use possibly-modified data for next plugin
        if result.data is not None:
            current_data = result.data

    return SecurityResult(valid=True, data=current_data)

options: show_root_heading: true

InputSanitizer(collapse_whitespace: bool = True, normalize_unicode: bool = True, remove_invisible: bool = True, max_length: Optional[int] = None)

Bases: SecurityPlugin

Sanitizes input text to prevent various attacks.

Features: - Collapse excessive whitespace - Normalize Unicode characters - Remove invisible characters - Limit input length

Usage

sanitizer = InputSanitizer(collapse_whitespace=True, max_length=50000) result = sanitizer.validate_input(text)

Source code in strutex/security/sanitizer.py
def __init__(
    self,
    collapse_whitespace: bool = True,
    normalize_unicode: bool = True,
    remove_invisible: bool = True,
    max_length: Optional[int] = None
):
    self.collapse_whitespace = collapse_whitespace
    self.normalize_unicode = normalize_unicode
    self.remove_invisible = remove_invisible
    self.max_length = max_length

validate_input(text: str) -> SecurityResult

Sanitize the input text.

Source code in strutex/security/sanitizer.py
def validate_input(self, text: str) -> SecurityResult:
    """Sanitize the input text."""
    sanitized = text

    # Normalize Unicode (NFC form)
    if self.normalize_unicode:
        sanitized = unicodedata.normalize("NFC", sanitized)

    # Remove invisible characters (zero-width, etc.)
    if self.remove_invisible:
        # Remove zero-width characters and other invisibles
        invisible_pattern = r'[\u200b\u200c\u200d\u2060\u2061\u2062\u2063\u2064\ufeff]'
        sanitized = re.sub(invisible_pattern, '', sanitized)

    # Collapse whitespace (multiple spaces/newlines → single)
    if self.collapse_whitespace:
        # Collapse multiple spaces to single
        sanitized = re.sub(r' {2,}', ' ', sanitized)
        # Collapse multiple newlines to double (preserve paragraphs)
        sanitized = re.sub(r'\n{3,}', '\n\n', sanitized)
        # Remove trailing whitespace per line
        sanitized = re.sub(r' +$', '', sanitized, flags=re.MULTILINE)

    # Enforce max length
    if self.max_length and len(sanitized) > self.max_length:
        return SecurityResult(
            valid=False,
            text=None,
            reason=f"Input exceeds maximum length of {self.max_length} characters"
        )

    return SecurityResult(valid=True, text=sanitized)

options: show_root_heading: true

PromptInjectionDetector(strict: bool = False, additional_patterns: List[Tuple[str, str]] = None, block_on_detection: bool = True)

Bases: SecurityPlugin

Detects common prompt injection patterns.

Checks for: - Direct instruction overrides ("ignore previous instructions") - Role manipulation ("you are now", "pretend to be") - Delimiter attacks (markdown, XML-style tags) - Encoding attacks (base64 instructions)

Usage

detector = PromptInjectionDetector(strict=True) result = detector.validate_input(text)

PARAMETER DESCRIPTION
strict

If True, use stricter matching

TYPE: bool DEFAULT: False

additional_patterns

Extra (pattern, category) tuples to check

TYPE: List[Tuple[str, str]] DEFAULT: None

block_on_detection

If True, reject input on detection. If False, just warn.

TYPE: bool DEFAULT: True

Source code in strutex/security/injection.py
def __init__(
    self,
    strict: bool = False,
    additional_patterns: List[Tuple[str, str]] = None,
    block_on_detection: bool = True
):
    """
    Args:
        strict: If True, use stricter matching
        additional_patterns: Extra (pattern, category) tuples to check
        block_on_detection: If True, reject input on detection. If False, just warn.
    """
    self.strict = strict
    self.patterns = list(self.DEFAULT_PATTERNS)
    if additional_patterns:
        self.patterns.extend(additional_patterns)
    self.block_on_detection = block_on_detection

    # Compile patterns
    flags = re.IGNORECASE
    self._compiled = [(re.compile(p, flags), cat) for p, cat in self.patterns]

get_detections(text: str) -> List[dict]

Get detailed detection information without blocking.

Source code in strutex/security/injection.py
def get_detections(self, text: str) -> List[dict]:
    """Get detailed detection information without blocking."""
    detections = []
    for pattern, category in self._compiled:
        matches = pattern.findall(text)
        if matches:
            detections.append({
                "category": category,
                "pattern": pattern.pattern,
                "matches": matches[:5]  # Limit for safety
            })
    return detections

validate_input(text: str) -> SecurityResult

Check for prompt injection patterns.

Source code in strutex/security/injection.py
def validate_input(self, text: str) -> SecurityResult:
    """Check for prompt injection patterns."""
    detections = []

    for pattern, category in self._compiled:
        matches = pattern.findall(text)
        if matches:
            detections.append({
                "category": category,
                "pattern": pattern.pattern,
                "count": len(matches)
            })

    if detections:
        if self.block_on_detection:
            categories = list(set(d["category"] for d in detections))
            return SecurityResult(
                valid=False,
                text=None,
                reason=f"Potential prompt injection detected: {', '.join(categories)}"
            )
        else:
            # Allow but flag
            return SecurityResult(
                valid=True,
                text=text,
                reason=f"Warning: potential injection patterns found"
            )

    return SecurityResult(valid=True, text=text)

options: show_root_heading: true

OutputValidator(check_secrets: bool = True, check_prompt_leaks: bool = True, secret_patterns: Optional[List[tuple]] = None, block_on_detection: bool = True)

Bases: SecurityPlugin

Validates LLM output for security issues.

Checks for: - Leaked API keys/secrets - Leaked system prompts - Suspicious executable patterns - PII exposure

Usage

validator = OutputValidator() result = validator.validate_output(data)

Source code in strutex/security/output.py
def __init__(
    self,
    check_secrets: bool = True,
    check_prompt_leaks: bool = True,
    secret_patterns: Optional[List[tuple]] = None,
    block_on_detection: bool = True
):
    self.check_secrets = check_secrets
    self.check_prompt_leaks = check_prompt_leaks
    self.block_on_detection = block_on_detection

    # Compile patterns
    patterns = secret_patterns or self.SECRET_PATTERNS
    self._secret_patterns = [(re.compile(p, re.IGNORECASE), name) for p, name in patterns]
    self._leak_patterns = [re.compile(p, re.IGNORECASE) for p in self.PROMPT_LEAK_PATTERNS]

validate_output(data: Dict[str, Any]) -> SecurityResult

Validate output data for security issues.

Source code in strutex/security/output.py
def validate_output(self, data: Dict[str, Any]) -> SecurityResult:
    """Validate output data for security issues."""
    issues = []

    # Convert to string for pattern matching
    text = self._flatten_to_text(data)

    # Check for secrets
    if self.check_secrets:
        for pattern, secret_type in self._secret_patterns:
            if pattern.search(text):
                issues.append(f"Potential {secret_type} detected in output")

    # Check for prompt leaks
    if self.check_prompt_leaks:
        for pattern in self._leak_patterns:
            if pattern.search(text):
                issues.append("Potential system prompt leak detected")
                break

    if issues:
        if self.block_on_detection:
            return SecurityResult(
                valid=False,
                data=None,
                reason="; ".join(issues)
            )
        else:
            return SecurityResult(
                valid=True,
                data=data,
                reason=f"Warning: {'; '.join(issues)}"
            )

    return SecurityResult(valid=True, data=data)

options: show_root_heading: true


Prompts

StructuredPrompt(persona: str = 'You are a highly accurate AI Data Extraction Assistant.')

Builder for organizing complex extraction prompts.

Provides a fluent API for constructing well-structured prompts with general rules, field-specific rules, and output guidelines.

Usage

prompt = StructuredPrompt("You are an expert...")

Variadic arguments allow adding multiple rules at once

prompt.add_general_rule("No guessing", "Use ISO dates") prompt.add_field_rule("total", "Exclude tax", "Must be numeric", critical=True) final_string = prompt.compile()

Example

prompt = ( ... StructuredPrompt() ... .add_general_rule( ... "Strict data fidelity: do not invent values.", ... "Dates must be in DD.MM.YYYY format." ... ) ... .add_field_rule( ... "artikelnummer", ... "Must be 8 digits.", ... "Ignore supplier codes.", ... critical=True ... ) ... .add_output_guideline("Return valid JSON.") ... .compile() ... )

Initialize the prompt builder.

PARAMETER DESCRIPTION
persona

The system persona/role description.

TYPE: str DEFAULT: 'You are a highly accurate AI Data Extraction Assistant.'

Source code in strutex/prompts/builder.py
def __init__(self, persona: str = "You are a highly accurate AI Data Extraction Assistant."):
    """
    Initialize the prompt builder.

    Args:
        persona: The system persona/role description.
    """
    self.persona = persona.strip()
    self.general_rules: List[str] = []
    self.field_rules: Dict[str, List[str]] = {}
    self.output_guidelines: List[str] = []

__str__() -> str

Allow using the prompt directly as a string.

Source code in strutex/prompts/builder.py
def __str__(self) -> str:
    """Allow using the prompt directly as a string."""
    return self.compile()

add_field_rule(field_name: str, *rules: str, critical: bool = False) -> StructuredPrompt

Adds one or more rules specific to a single field.

PARAMETER DESCRIPTION
field_name

The name of the field these rules apply to.

TYPE: str

*rules

Variable number of rule strings.

TYPE: str DEFAULT: ()

critical

If True, prefixes rules with CRITICAL.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
StructuredPrompt

Self for method chaining.

Example

.add_field_rule("invoice_id", "Must be numeric", "8 digits", critical=True)

Source code in strutex/prompts/builder.py
def add_field_rule(self, field_name: str, *rules: str, critical: bool = False) -> "StructuredPrompt":
    """
    Adds one or more rules specific to a single field.

    Args:
        field_name: The name of the field these rules apply to.
        *rules: Variable number of rule strings.
        critical: If True, prefixes rules with **CRITICAL**.

    Returns:
        Self for method chaining.

    Example:
        .add_field_rule("invoice_id", "Must be numeric", "8 digits", critical=True)
    """
    if field_name not in self.field_rules:
        self.field_rules[field_name] = []

    prefix = "**CRITICAL**: " if critical else ""
    for rule in rules:
        self.field_rules[field_name].append(f"{prefix}{rule}")
    return self

add_general_rule(*rules: str) -> StructuredPrompt

Adds one or more high-level rules.

PARAMETER DESCRIPTION
*rules

Variable number of rule strings.

TYPE: str DEFAULT: ()

RETURNS DESCRIPTION
StructuredPrompt

Self for method chaining.

Example

.add_general_rule("Rule 1", "Rule 2", "Rule 3")

Source code in strutex/prompts/builder.py
def add_general_rule(self, *rules: str) -> "StructuredPrompt":
    """
    Adds one or more high-level rules.

    Args:
        *rules: Variable number of rule strings.

    Returns:
        Self for method chaining.

    Example:
        .add_general_rule("Rule 1", "Rule 2", "Rule 3")
    """
    self.general_rules.extend(rules)
    return self

add_output_guideline(*guidelines: str) -> StructuredPrompt

Adds formatting instructions for the output.

PARAMETER DESCRIPTION
*guidelines

Variable number of guideline strings.

TYPE: str DEFAULT: ()

RETURNS DESCRIPTION
StructuredPrompt

Self for method chaining.

Example

.add_output_guideline("JSON only", "No markdown", "No comments")

Source code in strutex/prompts/builder.py
def add_output_guideline(self, *guidelines: str) -> "StructuredPrompt":
    """
    Adds formatting instructions for the output.

    Args:
        *guidelines: Variable number of guideline strings.

    Returns:
        Self for method chaining.

    Example:
        .add_output_guideline("JSON only", "No markdown", "No comments")
    """
    self.output_guidelines.extend(guidelines)
    return self

compile() -> str

Builds the final prompt string.

RETURNS DESCRIPTION
str

The complete formatted prompt ready for LLM consumption.

Source code in strutex/prompts/builder.py
def compile(self) -> str:
    """
    Builds the final prompt string.

    Returns:
        The complete formatted prompt ready for LLM consumption.
    """
    parts = [self.persona, ""]

    if self.general_rules:
        parts.append("### 1. General Principles")
        parts.extend([f"- {r}" for r in self.general_rules])
        parts.append("")

    if self.field_rules:
        parts.append("### 2. Field Rules")
        for field, rules in self.field_rules.items():
            parts.append(f"\n**{field}**:")
            parts.extend([f"- {r}" for r in rules])
        parts.append("")

    parts.append("### 3. Output Format")
    if self.output_guidelines:
        parts.extend([f"- {r}" for r in self.output_guidelines])
    else:
        parts.append("- Output valid JSON only. No markdown.")

    return "\n".join(parts)

options: show_root_heading: true members: - init - add_general_rule - add_field_rule - add_output_guideline - compile


Pydantic Support

pydantic_to_schema(model: Type) -> Schema

Convert a Pydantic BaseModel to a strutex Schema.

PARAMETER DESCRIPTION
model

A Pydantic BaseModel class

TYPE: Type

RETURNS DESCRIPTION
Schema

Equivalent strutex Schema (Object)

Example

from pydantic import BaseModel

class Invoice(BaseModel): invoice_number: str total: float items: list[LineItem]

schema = pydantic_to_schema(Invoice)

Source code in strutex/pydantic_support.py
def pydantic_to_schema(model: Type) -> Schema:
    """
    Convert a Pydantic BaseModel to a strutex Schema.

    Args:
        model: A Pydantic BaseModel class

    Returns:
        Equivalent strutex Schema (Object)

    Example:
        from pydantic import BaseModel

        class Invoice(BaseModel):
            invoice_number: str
            total: float
            items: list[LineItem]

        schema = pydantic_to_schema(Invoice)
    """
    try:
        from pydantic import BaseModel
        from pydantic.fields import FieldInfo
    except ImportError:
        raise ImportError("Pydantic is required for pydantic_to_schema. Install with: pip install pydantic")

    if not (inspect.isclass(model) and issubclass(model, BaseModel)):
        raise TypeError(f"Expected Pydantic BaseModel, got {type(model)}")

    properties = {}
    required_fields = []

    # Get model fields
    for field_name, field_info in model.model_fields.items():
        field_type = field_info.annotation
        description = field_info.description

        # Check if required
        if field_info.is_required():
            required_fields.append(field_name)

        # Convert type to schema
        properties[field_name] = _python_type_to_schema(
            field_type, 
            description=description,
            nullable=not field_info.is_required()
        )

    return Object(
        properties=properties,
        description=model.__doc__,
        required=required_fields if required_fields else None
    )

options: show_root_heading: true

validate_with_pydantic(data: Dict[str, Any], model: Type) -> Any

Validate extracted data against a Pydantic model.

PARAMETER DESCRIPTION
data

Extracted dictionary data

TYPE: Dict[str, Any]

model

Pydantic BaseModel class to validate against

TYPE: Type

RETURNS DESCRIPTION
Any

Validated Pydantic model instance

RAISES DESCRIPTION
ValidationError

If validation fails

Source code in strutex/pydantic_support.py
def validate_with_pydantic(data: Dict[str, Any], model: Type) -> Any:
    """
    Validate extracted data against a Pydantic model.

    Args:
        data: Extracted dictionary data
        model: Pydantic BaseModel class to validate against

    Returns:
        Validated Pydantic model instance

    Raises:
        pydantic.ValidationError: If validation fails
    """
    try:
        from pydantic import BaseModel
    except ImportError:
        raise ImportError("Pydantic is required. Install with: pip install pydantic")

    if not (inspect.isclass(model) and issubclass(model, BaseModel)):
        raise TypeError(f"Expected Pydantic BaseModel, got {type(model)}")

    return model.model_validate(data)

options: show_root_heading: true


Exceptions

SecurityError

Bases: Exception

Raised when security validation fails.

This exception is raised when either input validation (e.g., prompt injection detected) or output validation (e.g., leaked secrets detected) fails.

ATTRIBUTE DESCRIPTION
message

Description of the security failure.

Example
from strutex.processor import SecurityError

try:
    result = processor.process(file, prompt, schema, security=True)
except SecurityError as e:
    print(f"Security check failed: {e}")

options: show_root_heading: true