[Erp5-report] r14157 - /erp5/trunk/utils/oood/dispatcher.py

nobody at svn.erp5.org nobody at svn.erp5.org
Sun Apr 22 01:20:17 CEST 2007


Author: bartek
Date: Sun Apr 22 01:20:15 2007
New Revision: 14157

URL: http://svn.erp5.org?rev=14157&view=rev
Log:
alpha version of Dispatcher

Modified:
    erp5/trunk/utils/oood/dispatcher.py

Modified: erp5/trunk/utils/oood/dispatcher.py
URL: http://svn.erp5.org/erp5/trunk/utils/oood/dispatcher.py?rev=14157&r1=14156&r2=14157&view=diff
==============================================================================
--- erp5/trunk/utils/oood/dispatcher.py (original)
+++ erp5/trunk/utils/oood/dispatcher.py Sun Apr 22 01:20:15 2007
@@ -71,34 +71,34 @@
 
 class Dispatcher(object):
   """
-    This will eventually replace serw.Procesor class
+    This is a replacement for serw.Procesor class
+    Request handling procedure:
+    # check if server is oood_has_blocked, return error if it is (means we fit into a very narrow time span)
+    # check if pool size > 0, otherwise reinitialize
+    # create local object threading.local for all flags and references
+    # start semaphore_waiting thread
+    # it the semaphore_waiting thread times out,
+    #     mark all active threads as oood_has_blocked
+    #     mark server as oood_has_blocked
+    #     release all semaphores so that they can return
+    #     reinitialize everything
+    # acquire semaphore
+    # if current thread is oood_has_blocked: return error message
+    # get worker or trigger a reinitialization and return error message
+    # start process_waiting thread, cross-referenced with current thread
+    # if process_waiting thread times out, mark current thread as "timed_out", restart worker, release semaphore
+    # before returning anything from any public function, check if current thread is not timed_out, if it is then return error message
+    # try do to what is requested
+    # produce and return the result
+    # if there was any exception, then:
+    # cancel process_waiting thread, start another
+    # restart worker
+    # try again
+    # if there was any exception, then:
+    # cancel process_waiting thread
+    # restart worker
+    # release semaphore
   """
-  # request handling procedure:
-  # check if server is oood_has_blocked, return error if it is (means we fit into a very narrow time span)
-  # check if pool size > 0, otherwise reinitialize
-  # create local object threading.local for all flags and references
-  # start semaphore_waiting thread
-  # it the semaphore_waiting thread times out,
-  #     mark all active threads as oood_has_blocked
-  #     mark server as oood_has_blocked
-  #     release all semaphores so that they can return
-  #     reinitialize everything
-  # acquire semaphore
-  # if current thread is oood_has_blocked: return error message
-  # get worker or trigger a reinitialization and return error message
-  # start process_waiting thread, cross-referenced with current thread
-  # if process_waiting thread times out, mark current thread as "timed_out", restart worker, release semaphore
-  # before returning anything from any public function, check if current thread is not timed_out, if it is then return error message
-  # try do to what is requested
-  # produce and return the result
-  # if there was any exception, then:
-  # cancel process_waiting thread, start another
-  # restart worker
-  # try again
-  # if there was any exception, then:
-  # cancel process_waiting thread
-  # restart worker
-  # release semaphore
 
   def __init__(self):
     # create semaphore
@@ -112,6 +112,7 @@
 
   def initialize(self):
     # cancel all waiting timers
+    Log.info('initializing...')
     for thr in threading.enumerate():
       try:
         thr.cancel()
@@ -120,9 +121,12 @@
     # acquire all semaphores (in non-blocking mode), so that new requests wait till we're done
     for i in range(config.pool_size):
       self.semaphore.acquire(False)
-    # restart instances XXX structure the code cleanly
     # recreate pool and workers
     pool.initializePool()
+    # tell threads we are not blocked (if we are re-initializing)
+    # XXX is the sequence below correct?
+    for thr in threading.enumerate():
+      thr.oood_has_blocked = False
     # reset flags
     self.oood_has_blocked = False
     # release all semaphores - now they can go ahead
@@ -132,23 +136,23 @@
     self.emergency_restart_lock.acquire(False)
     self.emergency_restart_lock.release()
 
-  def _dispatch(self, funcname, *a, **kw):
+  def _dispatch(self, funcname, a):
     """
       General request handler
+      XMLRPC protocol does not support named arguments
+      FUTURE: we could use messenger object - objects are marshalled by xmlrpclib into dictionaries, here we could
+      marshal the messenger back into an instance of a class.
     """
     if funcname == 'trace':
       pdb.set_trace()
     if funcname == 'threadingStatus':
       return self.threadingStatus()
-    # first check if we have the func (we will do some compatibility here)
-    func = getattr(self, funcname, None)
-    if func is None and funcname != 'trace':
-      return self.error(NOFUNC)
     if self.oood_has_blocked: # means we are in the process of an emergency restart
       return self.error(BLOCKED)
     # if pool has no workers, trigger reinitialization and return error
     if pool.getSize() == 0:
       emergency_thread = threading.Timer(1, self.emergencyRestart)
+      emergency_thread.start()
       return self.error(POOL_EMPTY)
     this = threading.currentThread()
     # init thread flags
@@ -156,6 +160,8 @@
     this.timed_out = False
     local = threading.local()
     # start controlling thread to restart if we can't acquire semaphore
+    # XXX this can cause problems - if the serwer gets too many requests, it will get emergency restart only because
+    # it is overloaded - we will time out in waiting for semaphore, while the server is processing previous requests
     max_allowed_time = config.instance_timeout + config.instance_load_time + 5
     local.semaphore_waiting = threading.Timer(max_allowed_time, self.emergencyRestart)
     Log.debug('%s is waiting for semaphore', local.semaphore_waiting.getName())
@@ -181,7 +187,10 @@
     # try do do what was requested
     try:
       try:
-        data = func(worker, *a, **kw)
+        try:
+          data = self._process(worker, funcname, *a)
+        except NotImplementedError:
+          return self.error(NOFUNC)
       except Exception, e:
         Log.logException(e)
         # restart worker
@@ -190,7 +199,7 @@
         # restart timing
         local.process_waiting = threading.Timer(config.instance_timeout, self.instanceRestart, kwargs={'target':this, 'worker':worker})
         try:
-          data = func(worker, *a, **kw)
+          data = self._process(worker, funcname, *a)
         except Exception, e:
           # rationale: if the same happened twice, it was not the worker's fault, apparently the doc is lame
           Log.logException(e)
@@ -198,7 +207,7 @@
           return self.error(e)
     finally:
       # release semaphore only if we were able to release worker
-      if pool.releaseWorker(worker):
+      if pool.releaseWorkerObject(worker):
         self.semaphore.release()
     local.process_waiting.cancel()
     if this.timed_out: # means it took far too long and we have already triggered an instance restart
@@ -206,14 +215,19 @@
     return data # with some post-processing
 
   def instanceRestart(self, target, worker):
+    """
+      Restarts one worker instance, and makes it available
+      To be triggered by timer threads in case thee was a timeout
+    """
     Log.debug('instanceRestart: target %s, worker %s' % (str(target), str(worker)))
     # mark processing thread as timed_out in case it goes on
     target.timed_out = True
     # kill and restart worker
     worker = pool.getWorkerObject(worker)
     # if successful:
-    if worker:
-      self.semaphore.release()
+    if worker: # free worker and if ok then release semaphore
+      if pool.releaseWorkerObject(worker):
+        self.semaphore.release()
     else:
       Log.error('can not produce a new worker')
     # else: worker can not be restarted, pool becomes smaller, if it goes down to zero then next request
@@ -240,6 +254,7 @@
     self.initialize()
 
   def error(self, code):
+    Log.info(code)
     return code
 
   def test(self, worker, *a, **kw):
@@ -265,6 +280,72 @@
       s += line
     return s
 
+  def _process(self, worker, funcname, *a):
+    """
+      processing function - builds an appropriate function name, calls it on the worker
+      then processes return value
+      kw - in practice there should never be any, unless used in other way (not by _dispatcher)
+    """
+    if not funcname.startswith('run_'): # worker's public function naming
+      funcname = 'run_' + funcname
+    method = getattr(worker, funcname, None)
+    if method is None:
+      raise NotImplementedError('no function named ' + funcname)
+    argtpl=('filename', 'data', 'meta', 'extension')
+    kw = dict(zip(argtpl, a)) # build a dict from positional arguments
+    # if data given, store it in a temp file
+    if kw.get('data'):
+      if not kw.get('filename'):
+        kw['filename'] = str(random.random()).split('.')[1]
+      # we have to store in a file for OOo to open
+      filename = '%d_%s' % (worker.idx, lib.asciify(kw['filename'])) # prepend worker index in case we have two files of the same name
+      filename = self._mkName(filename)
+      f = open(filename, 'w')
+      try:
+        f.write(base64.decodestring(kw['data']))
+      finally:
+        f.close()
+      kw['filename'] = filename
+      kw['data'] = '' # not needed now, we have decoded data in the file
+    # DO THE JOB
+    res = method(kw) # res is not used
+    if kw.get('newfilename'): # worker produced data and saved it in a new file
+      # XXX here we will also zip html files
+      try: # we can do it only if the argument is a string
+        tozip = kw.get('extension').startswith('html') # if html, we pack the output files
+      except AttributeError:
+        tozip = False
+      if tozip:
+        self._zipResult(kw)
+      else:
+        kw['data'] = base64.encodestring(open(self._mkName(kw['newfilename'])).read())
+    return kw
+
+  def _zipResult(self, kw):
+    #first delete the original
+    fname = kw['filename']
+    fullname = self._mkName(fname)
+    os.remove(fullname)
+    arch = cStringIO.StringIO()
+    pack = zipfile.ZipFile(arch, 'a')
+    r = re.compile(kw['extension'] + '$')
+    for f in glob.glob(fullname + '*'):
+      name = os.path.basename(f)
+      # fix html extension
+      name = r.sub('html', name)
+      pack.write(f, name)
+    pack.close()
+    arch.seek(0)
+    kw['data'] = base64.encodestring(arch.read())
+    arch.close()
+    kw['mime'] = 'application/zip' # generation returns mime type
+
+  def _mkName(self, fname):
+    """
+      make a complete file name with path
+    """
+    return os.path.join(config.oood_home, 'tmp', fname)
+
 if __name__=='__main__':
     ser = MySerw((config.server_host, config.server_port), allow_none = True)
     disp = Dispatcher()




More information about the Erp5-report mailing list