Server : nginx/1.18.0 System : Linux localhost 6.14.3-x86_64-linode168 #1 SMP PREEMPT_DYNAMIC Mon Apr 21 19:47:55 EDT 2025 x86_64 User : www-data ( 33) PHP Version : 8.0.16 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, Directory : /lib/python3/dist-packages/supervisor/ |
import sys
import time
from supervisor.compat import xmlrpclib
from supervisor.compat import long
from supervisor.compat import as_string
from supervisor.xmlrpc import SupervisorTransport
from supervisor.events import ProcessCommunicationEvent
from supervisor.dispatchers import PEventListenerDispatcher
def getRPCTransport(env):
u = env.get('SUPERVISOR_USERNAME', '')
p = env.get('SUPERVISOR_PASSWORD', '')
return SupervisorTransport(u, p, env['SUPERVISOR_SERVER_URL'])
def getRPCInterface(env):
# dumbass ServerProxy won't allow us to pass in a non-HTTP url,
# so we fake the url we pass into it and always use the transport's
# 'serverurl' to figure out what to attach to
return xmlrpclib.ServerProxy('http://127.0.0.1', getRPCTransport(env))
def get_headers(line):
return dict([ x.split(':') for x in line.split() ])
def eventdata(payload):
headerinfo, data = payload.split('\n', 1)
headers = get_headers(headerinfo)
return headers, data
def get_asctime(now=None):
if now is None: # for testing
now = time.time() # pragma: no cover
msecs = (now - long(now)) * 1000
part1 = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(now))
asctime = '%s,%03d' % (part1, msecs)
return asctime
class ProcessCommunicationsProtocol:
def send(self, msg, fp=sys.stdout):
fp.write(ProcessCommunicationEvent.BEGIN_TOKEN)
fp.write(msg)
fp.write(ProcessCommunicationEvent.END_TOKEN)
fp.flush()
def stdout(self, msg):
return self.send(msg, sys.stdout)
def stderr(self, msg):
return self.send(msg, sys.stderr)
pcomm = ProcessCommunicationsProtocol()
class EventListenerProtocol:
def wait(self, stdin=sys.stdin, stdout=sys.stdout):
self.ready(stdout)
line = stdin.readline()
headers = get_headers(line)
payload = stdin.read(int(headers['len']))
return headers, payload
def ready(self, stdout=sys.stdout):
stdout.write(as_string(PEventListenerDispatcher.READY_FOR_EVENTS_TOKEN))
stdout.flush()
def ok(self, stdout=sys.stdout):
self.send('OK', stdout)
def fail(self, stdout=sys.stdout):
self.send('FAIL', stdout)
def send(self, data, stdout=sys.stdout):
resultlen = len(data)
result = '%s%s\n%s' % (as_string(PEventListenerDispatcher.RESULT_TOKEN_START),
str(resultlen),
data)
stdout.write(result)
stdout.flush()
listener = EventListenerProtocol()