[Erp5-report] r14055 - /erp5/trunk/utils/oood/dispatcher.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Wed Apr 11 18:46:28 CEST 2007
Author: bartek
Date: Wed Apr 11 18:46:26 2007
New Revision: 14055
URL: http://svn.erp5.org?rev=14055&view=rev
Log:
a very first draft of new Dispatcher class, to handle requests, timeout and exceptions in a clean and orderly way; written in almost-pseudo-code.
Added:
erp5/trunk/utils/oood/dispatcher.py (with props)
Added: erp5/trunk/utils/oood/dispatcher.py
URL: http://svn.erp5.org/erp5/trunk/utils/oood/dispatcher.py?rev=14055&view=auto
==============================================================================
--- erp5/trunk/utils/oood/dispatcher.py (added)
+++ erp5/trunk/utils/oood/dispatcher.py Wed Apr 11 18:46:26 2007
@@ -1,0 +1,217 @@
+#!/usr/bin/python
+##############################################################################
+#
+# Copyright (c) 2002, 2006 Nexedi SARL and Contributors. All Rights Reserved.
+# Jean-Paul Smets-Solanes <jp at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+
+import base64
+import cStringIO
+import glob
+import os
+import pdb
+import random
+import re
+import socket
+import string
+import sys
+import threading
+import time
+import traceback
+import zipfile
+
+from SimpleXMLRPCServer import *
+from SocketServer import ThreadingMixIn
+
+import config
+import lib
+from logger import Log
+from pool import pool
+
+from com.sun.star.uno import RuntimeException as unoRuntimeException
+
+
+class MySerw(ThreadingMixIn, SimpleXMLRPCServer):
+
+ def server_bind(self):
+ self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.socket.bind(self.server_address)
+
+# XXX shouldn't we make all threads daemonic?
+
+class Dispatcher(object):
+ """
+ This will eventually replace serw.Procesor class
+ """
+ # request handling procedure:
+ # check if server is oood_has_blocked, return error if it is (means we fit into a very narrow time span)
+ # check if pool size > 0, otherwise reinitialize
+ # create local object threading.local for all flags and references
+ # start semaphore_waiting thread
+ # it the semaphore_waiting thread times out,
+ # mark all active threads as oood_has_blocked
+ # mark server as oood_has_blocked
+ # release all semaphores so that they can return
+ # reinitialize everything
+ # acquire semaphore
+ # if current thread is oood_has_blocked: return error message
+ # get worker or trigger a reinitialization and return error message
+ # start process_waiting thread, cross-referenced with current thread
+ # if process_waiting thread times out, mark current thread as "timed_out", restart worker, release semaphore
+ # before returning anything from any public function, check if current thread is not timed_out, if it is then return error message
+ # try do to what is requested
+ # produce and return the result
+ # if there was any exception, then:
+ # cancel process_waiting thread, start another
+ # restart worker
+ # try again
+ # if there was any exception, then:
+ # cancel process_waiting thread
+ # restart worker
+ # release semaphore
+
+ def __init__(self):
+ # create semaphore
+ self.semaphore = threading.Semaphore(config.total)
+ # init flags
+ self.oood_has_blocked = False
+ # init locks
+ self.emergency_restart_lock = threading.Lock()
+ # initialize
+ self.initialize()
+
+ def initialize(self):
+ # cancel all waiting timers
+ for thr in threading.enumerate():
+ try:
+ thr.cancel()
+ except AttributeError:
+ pass
+ # acquire all semaphores (in non-blocking mode), so that new requests wait till we're done
+ for i in range(self.total):
+ self.semaphore.acquire(False)
+ # restart instances XXX structure the code cleanly
+ # recreate pool and workers
+ pool.initializePool()
+ # reset flags
+ server.oood_has_blocked = False
+ # release all semaphores - now they can go ahead
+ for i in range(self.total):
+ self.semaphore.release()
+ # initialization cancelled all timers, so now we can safely release
+ self.emergency_restart_lock.acquire(False)
+ self.emergency_restart_lock.release()
+
+ def processRequest(self, func, *a, **kw):
+ """
+ General request handler, decorator for every public function
+ """
+ if self.oood_has_blocked: # means we are in the process of an emergency restart
+ return self.error(BLOCKED)
+ # if pool has no workers, trigger reinitialization and return error
+ if pool.getSize() == 0:
+ emergency_thread = threading.Timer(1, self.emergencyRestart)
+ return self.error(POOL_EMPTY)
+ this = threading.currentThread()
+ # init thread flags
+ this.oood_has_blocked = False
+ this.timed_out = False
+ local = threading.local
+ # start controlling thread to restart if we can't acquire semaphore
+ max_allowed_time = config.instance_timeout + config.instance_load_time + 5
+ local.semaphore_waiting = threading.Timer(max_allowed_time, self.emergencyRestart)
+ local.semaphore_waiting.start()
+ self.semaphore.acquire() # blocking mode
+ # ok, we have it
+ local.semaphore_waiting.cancel()
+ # if we blocked out in the meantime, return error
+ if this.oood_has_blocked:
+ return self.error(BLOCKED)
+ # get worker
+ worker = pool.getWorkerObject()
+ # if none available, we are out of sync so we have to return error and restart
+ if not worker:
+ emergency_thread = threading.Timer(1, self.emergencyRestart)
+ return self.error(OUT_OF_SYNC)
+ # start thread to timeout document processing
+ local.process_waiting = threading.Timer(config.instance_timeout, self.instanceRestart, kwargs={'target':this, 'worker':worker})
+ local.process_waiting.start()
+ # try do do what was requested
+ try:
+ data = func(worker, *a, **kw)
+ except Exception, e:
+ Log.logException(e)
+ # restart worker
+ local.process_waiting.cancel()
+ worker = pool.getWorkerObject(worker) # XXX shouldn't we time and control this thing here?
+ # restart timing
+ local.process_waiting = threading.Timer(config.instance_timeout, self.instanceRestart, kwargs={'target':this, 'worker':worker})
+ try:
+ data = func(worker, *a, **kw)
+ except Exception, e:
+ # rationale: if the same happened twice, it was not the worker's fault, apparently the doc is lame
+ Log.logException(e)
+ local.process_waiting.cancel()
+ return self.error(e)
+ if this.timed_out: # means it took far too long and we already triggered an instance restart
+ return self.error(TIMEOUT) # XXX why not return data, after all? To inform the end-user that something is wrong.
+ # release semaphore only if we were able to release worker
+ if pool.releaseWorker(worker):
+ self.semaphore.release()
+ return data # with some post-processing
+
+ def instanceRestart(self, target, worker):
+ # mark processing thread as timed_out in case it goes on
+ target.timed_out = True
+ # kill and restart worker
+ worker = pool.getWorkerObject(worker)
+ # if successful:
+ if worker:
+ self.semaphore.release()
+ else:
+ Log.error('can not produce a new worker')
+ # else: worker can not be restarted, pool becomes smaller, if it goes down to zero then next request
+ # triggers reinitialization
+
+ def emergencyRestart(self):
+ """
+ Restart of the whole machinery if for some reason
+ the semaphore can not be acquired (because it is fully booked
+ and there is nobody to release it)
+ """
+ # we want to do it only once, subsequent threads can return immediately
+ if not self.emergency_restart_lock.acquire(False):
+ return
+ # tell threads we are blocked
+ for thr in threading.enumerate():
+ thr.oood_has_blocked = True
+ self.oood_has_blocked = True
+ # release all semaphores so that pending threads can return
+ for i in range(config.total):
+ self.semaphore.release()
+ self.initialize()
+
+
Propchange: erp5/trunk/utils/oood/dispatcher.py
------------------------------------------------------------------------------
svn:executable = *
More information about the Erp5-report
mailing list