[Erp5-report] r14055 - /erp5/trunk/utils/oood/dispatcher.py

nobody at svn.erp5.org nobody at svn.erp5.org
Wed Apr 11 18:46:28 CEST 2007


Author: bartek
Date: Wed Apr 11 18:46:26 2007
New Revision: 14055

URL: http://svn.erp5.org?rev=14055&view=rev
Log:
a very first draft of new Dispatcher class, to handle requests, timeout and exceptions in a clean and orderly way; written in almost-pseudo-code.

Added:
    erp5/trunk/utils/oood/dispatcher.py   (with props)

Added: erp5/trunk/utils/oood/dispatcher.py
URL: http://svn.erp5.org/erp5/trunk/utils/oood/dispatcher.py?rev=14055&view=auto
==============================================================================
--- erp5/trunk/utils/oood/dispatcher.py (added)
+++ erp5/trunk/utils/oood/dispatcher.py Wed Apr 11 18:46:26 2007
@@ -1,0 +1,217 @@
+#!/usr/bin/python
+##############################################################################
+#
+# Copyright (c) 2002, 2006 Nexedi SARL and Contributors. All Rights Reserved.
+#                    Jean-Paul Smets-Solanes <jp at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+#
+##############################################################################
+
+
+import base64
+import cStringIO
+import glob
+import os
+import pdb
+import random
+import re
+import socket
+import string
+import sys
+import threading
+import time
+import traceback
+import zipfile
+
+from SimpleXMLRPCServer import *
+from SocketServer import ThreadingMixIn
+
+import config
+import lib
+from logger import Log
+from pool import pool
+
+from com.sun.star.uno import RuntimeException as unoRuntimeException
+
+
+class MySerw(ThreadingMixIn, SimpleXMLRPCServer):
+
+  def server_bind(self):
+    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+    self.socket.bind(self.server_address)
+
+# XXX shouldn't we make all threads daemonic?
+
+class Dispatcher(object):
+  """
+    This will eventually replace serw.Procesor class
+  """
+  # request handling procedure:
+  # check if server is oood_has_blocked, return error if it is (means we fit into a very narrow time span)
+  # check if pool size > 0, otherwise reinitialize
+  # create local object threading.local for all flags and references
+  # start semaphore_waiting thread
+  # it the semaphore_waiting thread times out,
+  #     mark all active threads as oood_has_blocked
+  #     mark server as oood_has_blocked
+  #     release all semaphores so that they can return
+  #     reinitialize everything
+  # acquire semaphore
+  # if current thread is oood_has_blocked: return error message
+  # get worker or trigger a reinitialization and return error message
+  # start process_waiting thread, cross-referenced with current thread
+  # if process_waiting thread times out, mark current thread as "timed_out", restart worker, release semaphore
+  # before returning anything from any public function, check if current thread is not timed_out, if it is then return error message
+  # try do to what is requested
+  # produce and return the result
+  # if there was any exception, then:
+  # cancel process_waiting thread, start another
+  # restart worker
+  # try again
+  # if there was any exception, then:
+  # cancel process_waiting thread
+  # restart worker
+  # release semaphore
+
+  def __init__(self):
+    # create semaphore
+    self.semaphore = threading.Semaphore(config.total)
+    # init flags
+    self.oood_has_blocked = False
+    # init locks
+    self.emergency_restart_lock = threading.Lock()
+    # initialize
+    self.initialize()
+
+  def initialize(self):
+    # cancel all waiting timers
+    for thr in threading.enumerate():
+      try:
+        thr.cancel()
+      except AttributeError:
+        pass
+    # acquire all semaphores (in non-blocking mode), so that new requests wait till we're done
+    for i in range(self.total):
+      self.semaphore.acquire(False)
+    # restart instances XXX structure the code cleanly
+    # recreate pool and workers
+    pool.initializePool()
+    # reset flags
+    server.oood_has_blocked = False
+    # release all semaphores - now they can go ahead
+    for i in range(self.total):
+      self.semaphore.release()
+    # initialization cancelled all timers, so now we can safely release
+    self.emergency_restart_lock.acquire(False)
+    self.emergency_restart_lock.release()
+
+  def processRequest(self, func, *a, **kw):
+    """
+      General request handler, decorator for every public function
+    """
+    if self.oood_has_blocked: # means we are in the process of an emergency restart
+      return self.error(BLOCKED)
+    # if pool has no workers, trigger reinitialization and return error
+    if pool.getSize() == 0:
+      emergency_thread = threading.Timer(1, self.emergencyRestart)
+      return self.error(POOL_EMPTY)
+    this = threading.currentThread()
+    # init thread flags
+    this.oood_has_blocked = False
+    this.timed_out = False
+    local = threading.local
+    # start controlling thread to restart if we can't acquire semaphore
+    max_allowed_time = config.instance_timeout + config.instance_load_time + 5
+    local.semaphore_waiting = threading.Timer(max_allowed_time, self.emergencyRestart)
+    local.semaphore_waiting.start()
+    self.semaphore.acquire() # blocking mode
+    # ok, we have it
+    local.semaphore_waiting.cancel()
+    # if we blocked out in the meantime, return error
+    if this.oood_has_blocked:
+      return self.error(BLOCKED)
+    # get worker
+    worker = pool.getWorkerObject()
+    # if none available, we are out of sync so we have to return error and restart
+    if not worker:
+      emergency_thread = threading.Timer(1, self.emergencyRestart)
+      return self.error(OUT_OF_SYNC)
+    # start thread to timeout document processing
+    local.process_waiting = threading.Timer(config.instance_timeout, self.instanceRestart, kwargs={'target':this, 'worker':worker})
+    local.process_waiting.start()
+    # try do do what was requested
+    try:
+      data = func(worker, *a, **kw)
+    except Exception, e:
+      Log.logException(e)
+      # restart worker
+      local.process_waiting.cancel()
+      worker = pool.getWorkerObject(worker) # XXX shouldn't we time and control this thing here?
+      # restart timing
+      local.process_waiting = threading.Timer(config.instance_timeout, self.instanceRestart, kwargs={'target':this, 'worker':worker})
+      try:
+        data = func(worker, *a, **kw)
+      except Exception, e:
+        # rationale: if the same happened twice, it was not the worker's fault, apparently the doc is lame
+        Log.logException(e)
+        local.process_waiting.cancel()
+        return self.error(e)
+    if this.timed_out: # means it took far too long and we already triggered an instance restart
+      return self.error(TIMEOUT) # XXX why not return data, after all? To inform the end-user that something is wrong.
+    # release semaphore only if we were able to release worker
+    if pool.releaseWorker(worker):
+      self.semaphore.release()
+    return data # with some post-processing
+
+  def instanceRestart(self, target, worker):
+    # mark processing thread as timed_out in case it goes on
+    target.timed_out = True
+    # kill and restart worker
+    worker = pool.getWorkerObject(worker)
+    # if successful:
+    if worker:
+      self.semaphore.release()
+    else:
+      Log.error('can not produce a new worker')
+    # else: worker can not be restarted, pool becomes smaller, if it goes down to zero then next request
+    # triggers reinitialization
+
+  def emergencyRestart(self):
+    """
+      Restart of the whole machinery if for some reason
+      the semaphore can not be acquired (because it is fully booked
+      and there is nobody to release it)
+    """
+    # we want to do it only once, subsequent threads can return immediately
+    if not self.emergency_restart_lock.acquire(False):
+      return
+    # tell threads we are blocked
+    for thr in threading.enumerate():
+      thr.oood_has_blocked = True
+    self.oood_has_blocked = True
+    # release all semaphores so that pending threads can return
+    for i in range(config.total):
+      self.semaphore.release()
+    self.initialize()
+  
+

Propchange: erp5/trunk/utils/oood/dispatcher.py
------------------------------------------------------------------------------
    svn:executable = *




More information about the Erp5-report mailing list