mirror of
https://github.com/mudler/LocalAI.git
synced 2026-01-01 07:01:09 -06:00
feat(diffusers): implement dynamic pipeline loader to remove per-pipeline conditionals (#7365)
* Initial plan Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Add dynamic loader for diffusers pipelines and refactor backend.py Co-authored-by: mudler <2420543+mudler@users.noreply.github.com> Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Fix pipeline discovery error handling and test mock issue Co-authored-by: mudler <2420543+mudler@users.noreply.github.com> Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Address code review feedback: direct imports, better error handling, improved tests Co-authored-by: mudler <2420543+mudler@users.noreply.github.com> Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Address remaining code review feedback: specific exceptions, registry access, test imports Co-authored-by: mudler <2420543+mudler@users.noreply.github.com> Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Add defensive fallback for DiffusionPipeline registry access Co-authored-by: mudler <2420543+mudler@users.noreply.github.com> Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Actually use dynamic pipeline loading for all pipelines in backend Co-authored-by: mudler <2420543+mudler@users.noreply.github.com> Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Use dynamic loader consistently for all pipelines including AutoPipelineForText2Image Co-authored-by: mudler <2420543+mudler@users.noreply.github.com> Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Move dynamic loader tests into test.py for CI compatibility Co-authored-by: mudler <2420543+mudler@users.noreply.github.com> Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Extend dynamic loader to discover any diffusers class type, not just DiffusionPipeline Co-authored-by: mudler <2420543+mudler@users.noreply.github.com> Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Add AutoPipeline classes to pipeline registry for default model loading Co-authored-by: mudler <2420543+mudler@users.noreply.github.com> Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(python): set pyvenv python home Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * do pyenv update during start Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Minor changes Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: mudler <2420543+mudler@users.noreply.github.com> Co-authored-by: Ettore Di Giacinto <mudler@localai.io> Co-authored-by: Ettore Di Giacinto <mudler@users.noreply.github.com>
This commit is contained in:
538
backend/python/diffusers/diffusers_dynamic_loader.py
Normal file
538
backend/python/diffusers/diffusers_dynamic_loader.py
Normal file
@@ -0,0 +1,538 @@
|
||||
"""
|
||||
Dynamic Diffusers Pipeline Loader
|
||||
|
||||
This module provides dynamic discovery and loading of diffusers pipelines at runtime,
|
||||
eliminating the need for per-pipeline conditional statements. New pipelines added to
|
||||
diffusers become available automatically without code changes.
|
||||
|
||||
The module also supports discovering other diffusers classes like schedulers, models,
|
||||
and other components, making it a generic solution for dynamic class loading.
|
||||
|
||||
Usage:
|
||||
from diffusers_dynamic_loader import load_diffusers_pipeline, get_available_pipelines
|
||||
|
||||
# Load by class name
|
||||
pipe = load_diffusers_pipeline(class_name="StableDiffusionPipeline", model_id="...", torch_dtype=torch.float16)
|
||||
|
||||
# Load by task alias
|
||||
pipe = load_diffusers_pipeline(task="text-to-image", model_id="...", torch_dtype=torch.float16)
|
||||
|
||||
# Load using model_id (infers from HuggingFace Hub if possible)
|
||||
pipe = load_diffusers_pipeline(model_id="runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)
|
||||
|
||||
# Get list of available pipelines
|
||||
available = get_available_pipelines()
|
||||
|
||||
# Discover other diffusers classes (schedulers, models, etc.)
|
||||
schedulers = discover_diffusers_classes("SchedulerMixin")
|
||||
models = discover_diffusers_classes("ModelMixin")
|
||||
"""
|
||||
|
||||
import importlib
|
||||
import re
|
||||
import sys
|
||||
from typing import Any, Dict, List, Optional, Tuple, Type
|
||||
|
||||
|
||||
# Global cache for discovered pipelines - computed once per process
|
||||
_pipeline_registry: Optional[Dict[str, Type]] = None
|
||||
_task_aliases: Optional[Dict[str, List[str]]] = None
|
||||
|
||||
# Global cache for other discovered class types
|
||||
_class_registries: Dict[str, Dict[str, Type]] = {}
|
||||
|
||||
|
||||
def _camel_to_kebab(name: str) -> str:
|
||||
"""
|
||||
Convert CamelCase to kebab-case.
|
||||
|
||||
Examples:
|
||||
StableDiffusionPipeline -> stable-diffusion-pipeline
|
||||
StableDiffusionXLImg2ImgPipeline -> stable-diffusion-xl-img-2-img-pipeline
|
||||
"""
|
||||
# Insert hyphen before uppercase letters (but not at the start)
|
||||
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1-\2', name)
|
||||
# Insert hyphen before uppercase letters following lowercase letters or numbers
|
||||
s2 = re.sub('([a-z0-9])([A-Z])', r'\1-\2', s1)
|
||||
return s2.lower()
|
||||
|
||||
|
||||
def _extract_task_keywords(class_name: str) -> List[str]:
|
||||
"""
|
||||
Extract task-related keywords from a pipeline class name.
|
||||
|
||||
This function derives useful task aliases from the class name without
|
||||
hardcoding per-pipeline branches.
|
||||
|
||||
Returns a list of potential task aliases for this pipeline.
|
||||
"""
|
||||
aliases = []
|
||||
name_lower = class_name.lower()
|
||||
|
||||
# Direct task mappings based on common patterns in class names
|
||||
task_patterns = {
|
||||
'text2image': ['text-to-image', 'txt2img', 'text2image'],
|
||||
'texttoimage': ['text-to-image', 'txt2img', 'text2image'],
|
||||
'txt2img': ['text-to-image', 'txt2img', 'text2image'],
|
||||
'img2img': ['image-to-image', 'img2img', 'image2image'],
|
||||
'image2image': ['image-to-image', 'img2img', 'image2image'],
|
||||
'imagetoimage': ['image-to-image', 'img2img', 'image2image'],
|
||||
'img2video': ['image-to-video', 'img2vid', 'img2video'],
|
||||
'imagetovideo': ['image-to-video', 'img2vid', 'img2video'],
|
||||
'text2video': ['text-to-video', 'txt2vid', 'text2video'],
|
||||
'texttovideo': ['text-to-video', 'txt2vid', 'text2video'],
|
||||
'inpaint': ['inpainting', 'inpaint'],
|
||||
'depth2img': ['depth-to-image', 'depth2img'],
|
||||
'depthtoimage': ['depth-to-image', 'depth2img'],
|
||||
'controlnet': ['controlnet', 'control-net'],
|
||||
'upscale': ['upscaling', 'upscale', 'super-resolution'],
|
||||
'superresolution': ['upscaling', 'upscale', 'super-resolution'],
|
||||
}
|
||||
|
||||
# Check for each pattern in the class name
|
||||
for pattern, task_aliases in task_patterns.items():
|
||||
if pattern in name_lower:
|
||||
aliases.extend(task_aliases)
|
||||
|
||||
# Also detect general pipeline types from the class name structure
|
||||
# E.g., StableDiffusionPipeline -> stable-diffusion, flux -> flux
|
||||
# Remove "Pipeline" suffix and convert to kebab case
|
||||
if class_name.endswith('Pipeline'):
|
||||
base_name = class_name[:-8] # Remove "Pipeline"
|
||||
kebab_name = _camel_to_kebab(base_name)
|
||||
aliases.append(kebab_name)
|
||||
|
||||
# Extract model family name (e.g., "stable-diffusion" from "stable-diffusion-xl-img-2-img")
|
||||
parts = kebab_name.split('-')
|
||||
if len(parts) >= 2:
|
||||
# Try the first two words as a family name
|
||||
family = '-'.join(parts[:2])
|
||||
if family not in aliases:
|
||||
aliases.append(family)
|
||||
|
||||
# If no specific task pattern matched but class contains "Pipeline", add "text-to-image" as default
|
||||
# since most diffusion pipelines support text-to-image generation
|
||||
if 'text-to-image' not in aliases and 'image-to-image' not in aliases:
|
||||
# Only add for pipelines that seem to be generation pipelines (not schedulers, etc.)
|
||||
if 'pipeline' in name_lower and not any(x in name_lower for x in ['scheduler', 'processor', 'encoder']):
|
||||
# Don't automatically add - let it be explicit
|
||||
pass
|
||||
|
||||
return list(set(aliases)) # Remove duplicates
|
||||
|
||||
|
||||
def discover_diffusers_classes(
|
||||
base_class_name: str,
|
||||
include_base: bool = True
|
||||
) -> Dict[str, Type]:
|
||||
"""
|
||||
Discover all subclasses of a given base class from diffusers.
|
||||
|
||||
This function provides a generic way to discover any type of diffusers class,
|
||||
not just pipelines. It can be used to discover schedulers, models, processors,
|
||||
and other components.
|
||||
|
||||
Args:
|
||||
base_class_name: Name of the base class to search for subclasses
|
||||
(e.g., "DiffusionPipeline", "SchedulerMixin", "ModelMixin")
|
||||
include_base: Whether to include the base class itself in results
|
||||
|
||||
Returns:
|
||||
Dict mapping class names to class objects
|
||||
|
||||
Examples:
|
||||
# Discover all pipeline classes
|
||||
pipelines = discover_diffusers_classes("DiffusionPipeline")
|
||||
|
||||
# Discover all scheduler classes
|
||||
schedulers = discover_diffusers_classes("SchedulerMixin")
|
||||
|
||||
# Discover all model classes
|
||||
models = discover_diffusers_classes("ModelMixin")
|
||||
|
||||
# Discover AutoPipeline classes
|
||||
auto_pipelines = discover_diffusers_classes("AutoPipelineForText2Image")
|
||||
"""
|
||||
global _class_registries
|
||||
|
||||
# Check cache first
|
||||
if base_class_name in _class_registries:
|
||||
return _class_registries[base_class_name]
|
||||
|
||||
import diffusers
|
||||
|
||||
# Try to get the base class from diffusers
|
||||
base_class = None
|
||||
try:
|
||||
base_class = getattr(diffusers, base_class_name)
|
||||
except AttributeError:
|
||||
# Try to find in submodules
|
||||
for submodule in ['schedulers', 'models', 'pipelines']:
|
||||
try:
|
||||
module = importlib.import_module(f'diffusers.{submodule}')
|
||||
if hasattr(module, base_class_name):
|
||||
base_class = getattr(module, base_class_name)
|
||||
break
|
||||
except (ImportError, ModuleNotFoundError):
|
||||
continue
|
||||
|
||||
if base_class is None:
|
||||
raise ValueError(f"Could not find base class '{base_class_name}' in diffusers")
|
||||
|
||||
registry: Dict[str, Type] = {}
|
||||
|
||||
# Include base class if requested
|
||||
if include_base:
|
||||
registry[base_class_name] = base_class
|
||||
|
||||
# Scan diffusers module for subclasses
|
||||
for attr_name in dir(diffusers):
|
||||
try:
|
||||
attr = getattr(diffusers, attr_name)
|
||||
if (isinstance(attr, type) and
|
||||
issubclass(attr, base_class) and
|
||||
(include_base or attr is not base_class)):
|
||||
registry[attr_name] = attr
|
||||
except (ImportError, AttributeError, TypeError, RuntimeError, ModuleNotFoundError):
|
||||
continue
|
||||
|
||||
# Cache the results
|
||||
_class_registries[base_class_name] = registry
|
||||
return registry
|
||||
|
||||
|
||||
def get_available_classes(base_class_name: str) -> List[str]:
|
||||
"""
|
||||
Get a sorted list of all discovered class names for a given base class.
|
||||
|
||||
Args:
|
||||
base_class_name: Name of the base class (e.g., "SchedulerMixin")
|
||||
|
||||
Returns:
|
||||
Sorted list of discovered class names
|
||||
"""
|
||||
return sorted(discover_diffusers_classes(base_class_name).keys())
|
||||
|
||||
|
||||
def _discover_pipelines() -> Tuple[Dict[str, Type], Dict[str, List[str]]]:
|
||||
"""
|
||||
Discover all subclasses of DiffusionPipeline from diffusers.
|
||||
|
||||
This function uses the generic discover_diffusers_classes() internally
|
||||
and adds pipeline-specific task alias generation. It also includes
|
||||
AutoPipeline classes which are special utility classes for automatic
|
||||
pipeline selection.
|
||||
|
||||
Returns:
|
||||
A tuple of (pipeline_registry, task_aliases) where:
|
||||
- pipeline_registry: Dict mapping class names to class objects
|
||||
- task_aliases: Dict mapping task aliases to lists of class names
|
||||
"""
|
||||
# Use the generic discovery function
|
||||
pipeline_registry = discover_diffusers_classes("DiffusionPipeline", include_base=True)
|
||||
|
||||
# Also add AutoPipeline classes - these are special utility classes that are
|
||||
# NOT subclasses of DiffusionPipeline but are commonly used
|
||||
import diffusers
|
||||
auto_pipeline_classes = [
|
||||
"AutoPipelineForText2Image",
|
||||
"AutoPipelineForImage2Image",
|
||||
"AutoPipelineForInpainting",
|
||||
]
|
||||
for cls_name in auto_pipeline_classes:
|
||||
try:
|
||||
cls = getattr(diffusers, cls_name)
|
||||
if cls is not None:
|
||||
pipeline_registry[cls_name] = cls
|
||||
except AttributeError:
|
||||
# Class not available in this version of diffusers
|
||||
pass
|
||||
|
||||
# Generate task aliases for pipelines
|
||||
task_aliases: Dict[str, List[str]] = {}
|
||||
for attr_name in pipeline_registry:
|
||||
if attr_name == "DiffusionPipeline":
|
||||
continue # Skip base class for alias generation
|
||||
|
||||
aliases = _extract_task_keywords(attr_name)
|
||||
for alias in aliases:
|
||||
if alias not in task_aliases:
|
||||
task_aliases[alias] = []
|
||||
if attr_name not in task_aliases[alias]:
|
||||
task_aliases[alias].append(attr_name)
|
||||
|
||||
return pipeline_registry, task_aliases
|
||||
|
||||
|
||||
def get_pipeline_registry() -> Dict[str, Type]:
|
||||
"""
|
||||
Get the cached pipeline registry.
|
||||
|
||||
Returns a dictionary mapping pipeline class names to their class objects.
|
||||
The registry is built on first access and cached for subsequent calls.
|
||||
"""
|
||||
global _pipeline_registry, _task_aliases
|
||||
if _pipeline_registry is None:
|
||||
_pipeline_registry, _task_aliases = _discover_pipelines()
|
||||
return _pipeline_registry
|
||||
|
||||
|
||||
def get_task_aliases() -> Dict[str, List[str]]:
|
||||
"""
|
||||
Get the cached task aliases dictionary.
|
||||
|
||||
Returns a dictionary mapping task aliases (e.g., "text-to-image") to
|
||||
lists of pipeline class names that support that task.
|
||||
"""
|
||||
global _pipeline_registry, _task_aliases
|
||||
if _task_aliases is None:
|
||||
_pipeline_registry, _task_aliases = _discover_pipelines()
|
||||
return _task_aliases
|
||||
|
||||
|
||||
def get_available_pipelines() -> List[str]:
|
||||
"""
|
||||
Get a sorted list of all discovered pipeline class names.
|
||||
|
||||
Returns:
|
||||
List of pipeline class names available for loading.
|
||||
"""
|
||||
return sorted(get_pipeline_registry().keys())
|
||||
|
||||
|
||||
def get_available_tasks() -> List[str]:
|
||||
"""
|
||||
Get a sorted list of all available task aliases.
|
||||
|
||||
Returns:
|
||||
List of task aliases (e.g., ["text-to-image", "image-to-image", ...])
|
||||
"""
|
||||
return sorted(get_task_aliases().keys())
|
||||
|
||||
|
||||
def resolve_pipeline_class(
|
||||
class_name: Optional[str] = None,
|
||||
task: Optional[str] = None,
|
||||
model_id: Optional[str] = None
|
||||
) -> Type:
|
||||
"""
|
||||
Resolve a pipeline class from class_name, task, or model_id.
|
||||
|
||||
Priority:
|
||||
1. If class_name is provided, look it up directly
|
||||
2. If task is provided, resolve through task aliases
|
||||
3. If model_id is provided, try to infer from HuggingFace Hub
|
||||
|
||||
Args:
|
||||
class_name: Exact pipeline class name (e.g., "StableDiffusionPipeline")
|
||||
task: Task alias (e.g., "text-to-image", "img2img")
|
||||
model_id: HuggingFace model ID (e.g., "runwayml/stable-diffusion-v1-5")
|
||||
|
||||
Returns:
|
||||
The resolved pipeline class.
|
||||
|
||||
Raises:
|
||||
ValueError: If no pipeline could be resolved.
|
||||
"""
|
||||
registry = get_pipeline_registry()
|
||||
aliases = get_task_aliases()
|
||||
|
||||
# 1. Direct class name lookup
|
||||
if class_name:
|
||||
if class_name in registry:
|
||||
return registry[class_name]
|
||||
# Try case-insensitive match
|
||||
for name, cls in registry.items():
|
||||
if name.lower() == class_name.lower():
|
||||
return cls
|
||||
raise ValueError(
|
||||
f"Unknown pipeline class '{class_name}'. "
|
||||
f"Available pipelines: {', '.join(sorted(registry.keys())[:20])}..."
|
||||
)
|
||||
|
||||
# 2. Task alias lookup
|
||||
if task:
|
||||
task_lower = task.lower().replace('_', '-')
|
||||
if task_lower in aliases:
|
||||
# Return the first matching pipeline for this task
|
||||
matching_classes = aliases[task_lower]
|
||||
if matching_classes:
|
||||
return registry[matching_classes[0]]
|
||||
|
||||
# Try partial matching
|
||||
for alias, classes in aliases.items():
|
||||
if task_lower in alias or alias in task_lower:
|
||||
if classes:
|
||||
return registry[classes[0]]
|
||||
|
||||
raise ValueError(
|
||||
f"Unknown task '{task}'. "
|
||||
f"Available tasks: {', '.join(sorted(aliases.keys())[:20])}..."
|
||||
)
|
||||
|
||||
# 3. Try to infer from HuggingFace Hub
|
||||
if model_id:
|
||||
try:
|
||||
from huggingface_hub import model_info
|
||||
info = model_info(model_id)
|
||||
|
||||
# Check pipeline_tag
|
||||
if hasattr(info, 'pipeline_tag') and info.pipeline_tag:
|
||||
tag = info.pipeline_tag.lower().replace('_', '-')
|
||||
if tag in aliases:
|
||||
matching_classes = aliases[tag]
|
||||
if matching_classes:
|
||||
return registry[matching_classes[0]]
|
||||
|
||||
# Check model card for hints
|
||||
if hasattr(info, 'cardData') and info.cardData:
|
||||
card = info.cardData
|
||||
if 'pipeline_tag' in card:
|
||||
tag = card['pipeline_tag'].lower().replace('_', '-')
|
||||
if tag in aliases:
|
||||
matching_classes = aliases[tag]
|
||||
if matching_classes:
|
||||
return registry[matching_classes[0]]
|
||||
|
||||
except ImportError:
|
||||
# huggingface_hub not available
|
||||
pass
|
||||
except (KeyError, AttributeError, ValueError, OSError):
|
||||
# Model info lookup failed - common cases:
|
||||
# - KeyError: Missing keys in model card
|
||||
# - AttributeError: Missing attributes on model info
|
||||
# - ValueError: Invalid model data
|
||||
# - OSError: Network or file access issues
|
||||
pass
|
||||
|
||||
# Fallback: use DiffusionPipeline.from_pretrained which auto-detects
|
||||
# DiffusionPipeline is always added to registry in _discover_pipelines (line 132)
|
||||
# but use .get() with import fallback for extra safety
|
||||
from diffusers import DiffusionPipeline
|
||||
return registry.get('DiffusionPipeline', DiffusionPipeline)
|
||||
|
||||
raise ValueError(
|
||||
"Must provide at least one of: class_name, task, or model_id. "
|
||||
f"Available pipelines: {', '.join(sorted(registry.keys())[:20])}... "
|
||||
f"Available tasks: {', '.join(sorted(aliases.keys())[:20])}..."
|
||||
)
|
||||
|
||||
|
||||
def load_diffusers_pipeline(
|
||||
class_name: Optional[str] = None,
|
||||
task: Optional[str] = None,
|
||||
model_id: Optional[str] = None,
|
||||
from_single_file: bool = False,
|
||||
**kwargs
|
||||
) -> Any:
|
||||
"""
|
||||
Load a diffusers pipeline dynamically.
|
||||
|
||||
This function resolves the appropriate pipeline class based on the provided
|
||||
parameters and instantiates it with the given kwargs.
|
||||
|
||||
Args:
|
||||
class_name: Exact pipeline class name (e.g., "StableDiffusionPipeline")
|
||||
task: Task alias (e.g., "text-to-image", "img2img")
|
||||
model_id: HuggingFace model ID or local path
|
||||
from_single_file: If True, use from_single_file() instead of from_pretrained()
|
||||
**kwargs: Additional arguments passed to from_pretrained() or from_single_file()
|
||||
|
||||
Returns:
|
||||
An instantiated pipeline object.
|
||||
|
||||
Raises:
|
||||
ValueError: If no pipeline could be resolved.
|
||||
Exception: If pipeline loading fails.
|
||||
|
||||
Examples:
|
||||
# Load by class name
|
||||
pipe = load_diffusers_pipeline(
|
||||
class_name="StableDiffusionPipeline",
|
||||
model_id="runwayml/stable-diffusion-v1-5",
|
||||
torch_dtype=torch.float16
|
||||
)
|
||||
|
||||
# Load by task
|
||||
pipe = load_diffusers_pipeline(
|
||||
task="text-to-image",
|
||||
model_id="runwayml/stable-diffusion-v1-5",
|
||||
torch_dtype=torch.float16
|
||||
)
|
||||
|
||||
# Load from single file
|
||||
pipe = load_diffusers_pipeline(
|
||||
class_name="StableDiffusionPipeline",
|
||||
model_id="/path/to/model.safetensors",
|
||||
from_single_file=True,
|
||||
torch_dtype=torch.float16
|
||||
)
|
||||
"""
|
||||
# Resolve the pipeline class
|
||||
pipeline_class = resolve_pipeline_class(
|
||||
class_name=class_name,
|
||||
task=task,
|
||||
model_id=model_id
|
||||
)
|
||||
|
||||
# If no model_id provided but we have a class, we can't load
|
||||
if model_id is None:
|
||||
raise ValueError("model_id is required to load a pipeline")
|
||||
|
||||
# Load the pipeline
|
||||
try:
|
||||
if from_single_file:
|
||||
# Check if the class has from_single_file method
|
||||
if hasattr(pipeline_class, 'from_single_file'):
|
||||
return pipeline_class.from_single_file(model_id, **kwargs)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Pipeline class {pipeline_class.__name__} does not support from_single_file(). "
|
||||
f"Use from_pretrained() instead."
|
||||
)
|
||||
else:
|
||||
return pipeline_class.from_pretrained(model_id, **kwargs)
|
||||
|
||||
except Exception as e:
|
||||
# Provide helpful error message
|
||||
available = get_available_pipelines()
|
||||
raise RuntimeError(
|
||||
f"Failed to load pipeline '{pipeline_class.__name__}' from '{model_id}': {e}\n"
|
||||
f"Available pipelines: {', '.join(available[:20])}..."
|
||||
) from e
|
||||
|
||||
|
||||
def get_pipeline_info(class_name: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Get information about a specific pipeline class.
|
||||
|
||||
Args:
|
||||
class_name: The pipeline class name
|
||||
|
||||
Returns:
|
||||
Dictionary with pipeline information including:
|
||||
- name: Class name
|
||||
- aliases: List of task aliases
|
||||
- supports_single_file: Whether from_single_file() is available
|
||||
- docstring: Class docstring (if available)
|
||||
"""
|
||||
registry = get_pipeline_registry()
|
||||
aliases = get_task_aliases()
|
||||
|
||||
if class_name not in registry:
|
||||
raise ValueError(f"Unknown pipeline: {class_name}")
|
||||
|
||||
cls = registry[class_name]
|
||||
|
||||
# Find all aliases for this pipeline
|
||||
pipeline_aliases = []
|
||||
for alias, classes in aliases.items():
|
||||
if class_name in classes:
|
||||
pipeline_aliases.append(alias)
|
||||
|
||||
return {
|
||||
'name': class_name,
|
||||
'aliases': pipeline_aliases,
|
||||
'supports_single_file': hasattr(cls, 'from_single_file'),
|
||||
'docstring': cls.__doc__[:200] if cls.__doc__ else None
|
||||
}
|
||||
Reference in New Issue
Block a user