Server : nginx/1.18.0 System : Linux localhost 6.14.3-x86_64-linode168 #1 SMP PREEMPT_DYNAMIC Mon Apr 21 19:47:55 EDT 2025 x86_64 User : www-data ( 33) PHP Version : 8.0.16 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, Directory : /lib/python3/dist-packages/twisted/application/ |
# -*- test-case-name: twisted.test.test_application -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Plugin-based system for enumerating available reactors and installing one of
them.
"""
from __future__ import absolute_import, division
from zope.interface import Interface, Attribute, implementer
from twisted.plugin import IPlugin, getPlugins
from twisted.python.reflect import namedAny
class IReactorInstaller(Interface):
"""
Definition of a reactor which can probably be installed.
"""
shortName = Attribute("""
A brief string giving the user-facing name of this reactor.
""")
description = Attribute("""
A longer string giving a user-facing description of this reactor.
""")
def install():
"""
Install this reactor.
"""
# TODO - A method which provides a best-guess as to whether this reactor
# can actually be used in the execution environment.
class NoSuchReactor(KeyError):
"""
Raised when an attempt is made to install a reactor which cannot be found.
"""
@implementer(IPlugin, IReactorInstaller)
class Reactor(object):
"""
@ivar moduleName: The fully-qualified Python name of the module of which
the install callable is an attribute.
"""
def __init__(self, shortName, moduleName, description):
self.shortName = shortName
self.moduleName = moduleName
self.description = description
def install(self):
namedAny(self.moduleName).install()
def getReactorTypes():
"""
Return an iterator of L{IReactorInstaller} plugins.
"""
return getPlugins(IReactorInstaller)
def installReactor(shortName):
"""
Install the reactor with the given C{shortName} attribute.
@raise NoSuchReactor: If no reactor is found with a matching C{shortName}.
@raise: anything that the specified reactor can raise when installed.
"""
for installer in getReactorTypes():
if installer.shortName == shortName:
installer.install()
from twisted.internet import reactor
return reactor
raise NoSuchReactor(shortName)