[Erp5-report] r14157 - /erp5/trunk/utils/oood/dispatcher.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Sun Apr 22 01:20:17 CEST 2007
Author: bartek
Date: Sun Apr 22 01:20:15 2007
New Revision: 14157
URL: http://svn.erp5.org?rev=14157&view=rev
Log:
alpha version of Dispatcher
Modified:
erp5/trunk/utils/oood/dispatcher.py
Modified: erp5/trunk/utils/oood/dispatcher.py
URL: http://svn.erp5.org/erp5/trunk/utils/oood/dispatcher.py?rev=14157&r1=14156&r2=14157&view=diff
==============================================================================
--- erp5/trunk/utils/oood/dispatcher.py (original)
+++ erp5/trunk/utils/oood/dispatcher.py Sun Apr 22 01:20:15 2007
@@ -71,34 +71,34 @@
class Dispatcher(object):
"""
- This will eventually replace serw.Procesor class
+ This is a replacement for serw.Procesor class
+ Request handling procedure:
+ # check if server is oood_has_blocked, return error if it is (means we fit into a very narrow time span)
+ # check if pool size > 0, otherwise reinitialize
+ # create local object threading.local for all flags and references
+ # start semaphore_waiting thread
+ # it the semaphore_waiting thread times out,
+ # mark all active threads as oood_has_blocked
+ # mark server as oood_has_blocked
+ # release all semaphores so that they can return
+ # reinitialize everything
+ # acquire semaphore
+ # if current thread is oood_has_blocked: return error message
+ # get worker or trigger a reinitialization and return error message
+ # start process_waiting thread, cross-referenced with current thread
+ # if process_waiting thread times out, mark current thread as "timed_out", restart worker, release semaphore
+ # before returning anything from any public function, check if current thread is not timed_out, if it is then return error message
+ # try do to what is requested
+ # produce and return the result
+ # if there was any exception, then:
+ # cancel process_waiting thread, start another
+ # restart worker
+ # try again
+ # if there was any exception, then:
+ # cancel process_waiting thread
+ # restart worker
+ # release semaphore
"""
- # request handling procedure:
- # check if server is oood_has_blocked, return error if it is (means we fit into a very narrow time span)
- # check if pool size > 0, otherwise reinitialize
- # create local object threading.local for all flags and references
- # start semaphore_waiting thread
- # it the semaphore_waiting thread times out,
- # mark all active threads as oood_has_blocked
- # mark server as oood_has_blocked
- # release all semaphores so that they can return
- # reinitialize everything
- # acquire semaphore
- # if current thread is oood_has_blocked: return error message
- # get worker or trigger a reinitialization and return error message
- # start process_waiting thread, cross-referenced with current thread
- # if process_waiting thread times out, mark current thread as "timed_out", restart worker, release semaphore
- # before returning anything from any public function, check if current thread is not timed_out, if it is then return error message
- # try do to what is requested
- # produce and return the result
- # if there was any exception, then:
- # cancel process_waiting thread, start another
- # restart worker
- # try again
- # if there was any exception, then:
- # cancel process_waiting thread
- # restart worker
- # release semaphore
def __init__(self):
# create semaphore
@@ -112,6 +112,7 @@
def initialize(self):
# cancel all waiting timers
+ Log.info('initializing...')
for thr in threading.enumerate():
try:
thr.cancel()
@@ -120,9 +121,12 @@
# acquire all semaphores (in non-blocking mode), so that new requests wait till we're done
for i in range(config.pool_size):
self.semaphore.acquire(False)
- # restart instances XXX structure the code cleanly
# recreate pool and workers
pool.initializePool()
+ # tell threads we are not blocked (if we are re-initializing)
+ # XXX is the sequence below correct?
+ for thr in threading.enumerate():
+ thr.oood_has_blocked = False
# reset flags
self.oood_has_blocked = False
# release all semaphores - now they can go ahead
@@ -132,23 +136,23 @@
self.emergency_restart_lock.acquire(False)
self.emergency_restart_lock.release()
- def _dispatch(self, funcname, *a, **kw):
+ def _dispatch(self, funcname, a):
"""
General request handler
+ XMLRPC protocol does not support named arguments
+ FUTURE: we could use messenger object - objects are marshalled by xmlrpclib into dictionaries, here we could
+ marshal the messenger back into an instance of a class.
"""
if funcname == 'trace':
pdb.set_trace()
if funcname == 'threadingStatus':
return self.threadingStatus()
- # first check if we have the func (we will do some compatibility here)
- func = getattr(self, funcname, None)
- if func is None and funcname != 'trace':
- return self.error(NOFUNC)
if self.oood_has_blocked: # means we are in the process of an emergency restart
return self.error(BLOCKED)
# if pool has no workers, trigger reinitialization and return error
if pool.getSize() == 0:
emergency_thread = threading.Timer(1, self.emergencyRestart)
+ emergency_thread.start()
return self.error(POOL_EMPTY)
this = threading.currentThread()
# init thread flags
@@ -156,6 +160,8 @@
this.timed_out = False
local = threading.local()
# start controlling thread to restart if we can't acquire semaphore
+ # XXX this can cause problems - if the serwer gets too many requests, it will get emergency restart only because
+ # it is overloaded - we will time out in waiting for semaphore, while the server is processing previous requests
max_allowed_time = config.instance_timeout + config.instance_load_time + 5
local.semaphore_waiting = threading.Timer(max_allowed_time, self.emergencyRestart)
Log.debug('%s is waiting for semaphore', local.semaphore_waiting.getName())
@@ -181,7 +187,10 @@
# try do do what was requested
try:
try:
- data = func(worker, *a, **kw)
+ try:
+ data = self._process(worker, funcname, *a)
+ except NotImplementedError:
+ return self.error(NOFUNC)
except Exception, e:
Log.logException(e)
# restart worker
@@ -190,7 +199,7 @@
# restart timing
local.process_waiting = threading.Timer(config.instance_timeout, self.instanceRestart, kwargs={'target':this, 'worker':worker})
try:
- data = func(worker, *a, **kw)
+ data = self._process(worker, funcname, *a)
except Exception, e:
# rationale: if the same happened twice, it was not the worker's fault, apparently the doc is lame
Log.logException(e)
@@ -198,7 +207,7 @@
return self.error(e)
finally:
# release semaphore only if we were able to release worker
- if pool.releaseWorker(worker):
+ if pool.releaseWorkerObject(worker):
self.semaphore.release()
local.process_waiting.cancel()
if this.timed_out: # means it took far too long and we have already triggered an instance restart
@@ -206,14 +215,19 @@
return data # with some post-processing
def instanceRestart(self, target, worker):
+ """
+ Restarts one worker instance, and makes it available
+ To be triggered by timer threads in case thee was a timeout
+ """
Log.debug('instanceRestart: target %s, worker %s' % (str(target), str(worker)))
# mark processing thread as timed_out in case it goes on
target.timed_out = True
# kill and restart worker
worker = pool.getWorkerObject(worker)
# if successful:
- if worker:
- self.semaphore.release()
+ if worker: # free worker and if ok then release semaphore
+ if pool.releaseWorkerObject(worker):
+ self.semaphore.release()
else:
Log.error('can not produce a new worker')
# else: worker can not be restarted, pool becomes smaller, if it goes down to zero then next request
@@ -240,6 +254,7 @@
self.initialize()
def error(self, code):
+ Log.info(code)
return code
def test(self, worker, *a, **kw):
@@ -265,6 +280,72 @@
s += line
return s
+ def _process(self, worker, funcname, *a):
+ """
+ processing function - builds an appropriate function name, calls it on the worker
+ then processes return value
+ kw - in practice there should never be any, unless used in other way (not by _dispatcher)
+ """
+ if not funcname.startswith('run_'): # worker's public function naming
+ funcname = 'run_' + funcname
+ method = getattr(worker, funcname, None)
+ if method is None:
+ raise NotImplementedError('no function named ' + funcname)
+ argtpl=('filename', 'data', 'meta', 'extension')
+ kw = dict(zip(argtpl, a)) # build a dict from positional arguments
+ # if data given, store it in a temp file
+ if kw.get('data'):
+ if not kw.get('filename'):
+ kw['filename'] = str(random.random()).split('.')[1]
+ # we have to store in a file for OOo to open
+ filename = '%d_%s' % (worker.idx, lib.asciify(kw['filename'])) # prepend worker index in case we have two files of the same name
+ filename = self._mkName(filename)
+ f = open(filename, 'w')
+ try:
+ f.write(base64.decodestring(kw['data']))
+ finally:
+ f.close()
+ kw['filename'] = filename
+ kw['data'] = '' # not needed now, we have decoded data in the file
+ # DO THE JOB
+ res = method(kw) # res is not used
+ if kw.get('newfilename'): # worker produced data and saved it in a new file
+ # XXX here we will also zip html files
+ try: # we can do it only if the argument is a string
+ tozip = kw.get('extension').startswith('html') # if html, we pack the output files
+ except AttributeError:
+ tozip = False
+ if tozip:
+ self._zipResult(kw)
+ else:
+ kw['data'] = base64.encodestring(open(self._mkName(kw['newfilename'])).read())
+ return kw
+
+ def _zipResult(self, kw):
+ #first delete the original
+ fname = kw['filename']
+ fullname = self._mkName(fname)
+ os.remove(fullname)
+ arch = cStringIO.StringIO()
+ pack = zipfile.ZipFile(arch, 'a')
+ r = re.compile(kw['extension'] + '$')
+ for f in glob.glob(fullname + '*'):
+ name = os.path.basename(f)
+ # fix html extension
+ name = r.sub('html', name)
+ pack.write(f, name)
+ pack.close()
+ arch.seek(0)
+ kw['data'] = base64.encodestring(arch.read())
+ arch.close()
+ kw['mime'] = 'application/zip' # generation returns mime type
+
+ def _mkName(self, fname):
+ """
+ make a complete file name with path
+ """
+ return os.path.join(config.oood_home, 'tmp', fname)
+
if __name__=='__main__':
ser = MySerw((config.server_host, config.server_port), allow_none = True)
disp = Dispatcher()
More information about the Erp5-report
mailing list