Исходный код kombu.utils.imports
"""Import related utilities."""
from __future__ import annotations
import importlib
import sys
from kombu.exceptions import reraise
[документация]def symbol_by_name(name, aliases=None, imp=None, package=None,
sep='.', default=None, **kwargs):
"""Get symbol by qualified name.
The name should be the full dot-separated path to the class::
modulename.ClassName
Example::
celery.concurrency.processes.TaskPool
^- class name
or using ':' to separate module and symbol::
celery.concurrency.processes:TaskPool
If `aliases` is provided, a dict containing short name/long name
mappings, the name is looked up in the aliases first.
Examples
--------
>>> symbol_by_name('celery.concurrency.processes.TaskPool')
<class 'celery.concurrency.processes.TaskPool'>
>>> symbol_by_name('default', {
... 'default': 'celery.concurrency.processes.TaskPool'})
<class 'celery.concurrency.processes.TaskPool'>
# Does not try to look up non-string names.
>>> from celery.concurrency.processes import TaskPool
>>> symbol_by_name(TaskPool) is TaskPool
True
"""
aliases = {} if not aliases else aliases
if imp is None:
imp = importlib.import_module
if not isinstance(name, str):
return name # already a class
name = aliases.get(name) or name
sep = ':' if ':' in name else sep
module_name, _, cls_name = name.rpartition(sep)
if not module_name:
cls_name, module_name = None, package if package else cls_name
try:
try:
module = imp(module_name, package=package, **kwargs)
except ValueError as exc:
reraise(ValueError,
ValueError(f"Couldn't import {name!r}: {exc}"),
sys.exc_info()[2])
return getattr(module, cls_name) if cls_name else module
except (ImportError, AttributeError):
if default is None:
raise
return default