[Erp5-report] r14180 - /erp5/trunk/utils/oood/dispatcher.py

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Apr 24 02:23:13 CEST 2007


Author: bartek
Date: Tue Apr 24 02:23:11 2007
New Revision: 14180

URL: http://svn.erp5.org?rev=14180&view=rev
Log:
poolStatus display method
increased max_allowed_time (because makeWorkerObject can now take much longer)
if processing failed twice, return error code (not exception), because this is normal
use normalized interface if kw has 'data', otherwise just pass args and result (this allows for using getAllowedTargetItemList without harcoding its name)

Modified:
    erp5/trunk/utils/oood/dispatcher.py

Modified: erp5/trunk/utils/oood/dispatcher.py
URL: http://svn.erp5.org/erp5/trunk/utils/oood/dispatcher.py?rev=14180&r1=14179&r2=14180&view=diff
==============================================================================
--- erp5/trunk/utils/oood/dispatcher.py (original)
+++ erp5/trunk/utils/oood/dispatcher.py Tue Apr 24 02:23:11 2007
@@ -59,6 +59,7 @@
 POOL_EMPTY = 52
 OUT_OF_SYNC = 53
 TIMEOUT = 54
+BAD_DOCUMENT = 55
 
 
 class MySerw(ThreadingMixIn, SimpleXMLRPCServer):
@@ -147,6 +148,8 @@
       pdb.set_trace()
     if funcname == 'threadingStatus':
       return self.threadingStatus()
+    if funcname == 'poolStatus':
+      return self.poolStatus()
     if self.oood_has_blocked: # means we are in the process of an emergency restart
       return self.error(BLOCKED)
     # if pool has no workers, trigger reinitialization and return error
@@ -162,7 +165,8 @@
     # start controlling thread to restart if we can't acquire semaphore
     # XXX this can cause problems - if the serwer gets too many requests, it will get emergency restart only because
     # it is overloaded - we will time out in waiting for semaphore, while the server is processing previous requests
-    max_allowed_time = config.instance_timeout + config.instance_load_time + 5
+    # the more so that we allow lots of waiting in pool.makeWorkerObject
+    max_allowed_time = (config.instance_timeout + config.instance_load_time) * 3
     local.semaphore_waiting = threading.Timer(max_allowed_time, self.emergencyRestart)
     Log.debug('%s is waiting for semaphore', local.semaphore_waiting.getName())
     local.semaphore_waiting.start()
@@ -204,7 +208,7 @@
           # rationale: if the same happened twice, it was not the worker's fault, apparently the doc is lame
           Log.logException(e)
           local.process_waiting.cancel()
-          return self.error(e)
+          return self.error(BAD_DOCUMENT)
     finally:
       # release semaphore only if we were able to release worker
       if pool.releaseWorkerObject(worker):
@@ -280,20 +284,40 @@
       s += line
     return s
 
+  def poolStatus(self):
+    s = ''
+    line = '%-5s|%-10s|%-30s|\n' % ('idx', 'busy', 'file')
+    s += line
+    line = '%s+%s+%s\n' % ('-'*5, '-'*10, '-'*30)
+    s += line
+    worker_index_list = pool.inst.keys()
+    worker_index_list.sort()
+    for idx in worker_index_list:
+      worker = pool.inst[idx]
+      if worker is None:
+        busy = 'xxxx'
+        file = 'xxxx'
+      else:
+        busy = worker.busy and 'busy' or '-'
+        file = getattr(worker, 'fileUrl', None) or '-'
+      line = '%-5s|%-10s|%-30s|\n' % (idx, busy, file)
+      s += line
+    return s
+
   def _process(self, worker, funcname, *a):
     """
       processing function - builds an appropriate function name, calls it on the worker
       then processes return value
       kw - in practice there should never be any, unless used in other way (not by _dispatcher)
     """
-    if not funcname.startswith('run_'): # worker's public function naming
-      funcname = 'run_' + funcname
-    method = getattr(worker, funcname, None)
+    method = getattr(worker, 'run_'+funcname, None)
     if method is None:
-      raise NotImplementedError('no function named ' + funcname)
+      method = getattr(worker, funcname, None) # method may have no prefix (we'll stop it some day anyway)
+      if method is None:
+        raise NotImplementedError('no function named ' + funcname)
     argtpl=('filename', 'data', 'meta', 'extension')
     kw = dict(zip(argtpl, a)) # build a dict from positional arguments
-    # if data given, store it in a temp file
+    # if data given, store it in a temp file and proceed using std interface
     if kw.get('data'):
       if not kw.get('filename'):
         kw['filename'] = str(random.random()).split('.')[1]
@@ -307,19 +331,21 @@
         f.close()
       kw['filename'] = filename
       kw['data'] = '' # not needed now, we have decoded data in the file
-    # DO THE JOB
-    res = method(kw) # res is not used
-    if kw.get('newfilename'): # worker produced data and saved it in a new file
-      # XXX here we will also zip html files
-      try: # we can do it only if the argument is a string
-        tozip = kw.get('extension').startswith('html') # if html, we pack the output files
-      except AttributeError:
-        tozip = False
-      if tozip:
-        self._zipResult(kw)
-      else:
-        kw['data'] = base64.encodestring(open(self._mkName(kw['newfilename'])).read())
-    return kw
+      # DO THE JOB
+      res = method(kw) # res is not used
+      if kw.get('newfilename'): # worker produced data and saved it in a new file
+        # XXX here we will also zip html files
+        try: # we can do it only if the argument is a string
+          tozip = kw.get('extension').startswith('html') # if html, we pack the output files
+        except AttributeError:
+          tozip = False
+        if tozip:
+          self._zipResult(kw)
+        else:
+          kw['data'] = base64.encodestring(open(self._mkName(kw['newfilename'])).read())
+      return kw
+    else:
+      return method(*a)
 
   def _zipResult(self, kw):
     #first delete the original




More information about the Erp5-report mailing list