[Erp5-report] r13846 - in /erp5/trunk/products/ZMySQLDA: DA.py db.py

nobody at svn.erp5.org nobody at svn.erp5.org
Mon Apr 2 11:23:07 CEST 2007


Author: vincent
Date: Mon Apr  2 11:23:06 2007
New Revision: 13846

URL: http://svn.erp5.org?rev=13846&view=rev
Log:
Fix big thread concurency mistake in ZMySQLDA/db.py: the object registerable to transaction manager was the one chared by all threads, preventing multiple threads from effectively registering to transaction manager. This caused MySQL deadlocks, since commits were never issued in all concurent threads but one. This patch creates a new intermediate object between DB and DA which just handles pooling - and factorising connection string parsing and server capabilities probing.
Update DA so it uses the intermediate class.

Modified:
    erp5/trunk/products/ZMySQLDA/DA.py
    erp5/trunk/products/ZMySQLDA/db.py

Modified: erp5/trunk/products/ZMySQLDA/DA.py
URL: http://svn.erp5.org/erp5/trunk/products/ZMySQLDA/DA.py?rev=13846&r1=13845&r2=13846&view=diff
==============================================================================
--- erp5/trunk/products/ZMySQLDA/DA.py (original)
+++ erp5/trunk/products/ZMySQLDA/DA.py Mon Apr  2 11:23:06 2007
@@ -89,7 +89,7 @@
 __version__='$Revision: 1.4 $'[11:-2]
 
 import os
-from db import DB
+from db import ThreadedDB
 import Shared.DC.ZRDB.Connection, sys, DABase
 from App.Dialogs import MessageDialog
 from Globals import HTMLFile
@@ -120,7 +120,7 @@
 
     manage_properties=HTMLFile('connectionEdit', globals())
 
-    def factory(self): return DB
+    def factory(self): return ThreadedDB
 
     def connect(self, s):
       try:
@@ -128,13 +128,13 @@
         self._v_connected = ''
         pool_key = self.getPhysicalPath()
         connection = database_connection_pool.get(pool_key)
-        if connection is not None and connection.connection == s:
+        if connection is not None and connection._connection == s:
           self._v_database_connection = connection
         else:
           if connection is not None:
             connection.closeConnection()
-          DB = self.factory()
-          database_connection_pool[pool_key] = DB(s)
+          ThreadedDB = self.factory()
+          database_connection_pool[pool_key] = ThreadedDB(s)
           self._v_database_connection = database_connection_pool[pool_key]
         # XXX If date is used as such, it can be wrong because an existing
         # connection may be reused. But this is suposedly only used as a

Modified: erp5/trunk/products/ZMySQLDA/db.py
URL: http://svn.erp5.org/erp5/trunk/products/ZMySQLDA/db.py?rev=13846&r1=13845&r2=13846&view=diff
==============================================================================
--- erp5/trunk/products/ZMySQLDA/db.py (original)
+++ erp5/trunk/products/ZMySQLDA/db.py Mon Apr  2 11:23:06 2007
@@ -106,11 +106,19 @@
 import string, sys
 from string import strip, split, find, upper, rfind
 from time import time
-from thread import get_ident
-
+from thread import get_ident, allocate_lock
+
+# Connection is unusable, reconnect if possible (if so, don't re-raise the
+# exception), otherwise re-raise the exception.
 hosed_connection = (
     CR.SERVER_GONE_ERROR,
     CR.SERVER_LOST
+    )
+
+# Connection is unusable and some earlier query caused the trouble.
+# Reconnect *and* re-raise the exception.
+dead_connection = (
+    CR.COMMANDS_OUT_OF_SYNC,
     )
 
 key_types = {
@@ -156,22 +164,12 @@
     try: return int(s)
     except: return long(s)
 
-class DB(TM):
-
-    Database_Connection=_mysql.connect
-    Database_Error=_mysql.Error
-
-    def Database_Connection(self, *args, **kwargs):
-      return MySQLdb.connect(*args, **kwargs)
-
-    defs={
-        FIELD_TYPE.CHAR: "i", FIELD_TYPE.DATE: "d",
-        FIELD_TYPE.DATETIME: "d", FIELD_TYPE.DECIMAL: "n",
-        FIELD_TYPE.DOUBLE: "n", FIELD_TYPE.FLOAT: "n", FIELD_TYPE.INT24: "i",
-        FIELD_TYPE.LONG: "i", FIELD_TYPE.LONGLONG: "l",
-        FIELD_TYPE.SHORT: "i", FIELD_TYPE.TIMESTAMP: "d",
-        FIELD_TYPE.TINY: "i", FIELD_TYPE.YEAR: "i",
-        }
+class ThreadedDB:
+    """
+      This class is an interface to DB.
+      Its caracteristic is that an instance of this class interfaces multiple
+      instanes of DB class, each one being bound to a specific thread.
+    """
 
     conv=conversions.copy()
     conv[FIELD_TYPE.LONG] = int_or_long
@@ -180,15 +178,19 @@
     conv[FIELD_TYPE.DECIMAL] = float
     del conv[FIELD_TYPE.TIME]
 
-    _p_oid=_p_changed=_registered=None
-
     def __init__(self,connection):
-        self.connection=connection
-        self.kwargs = self._parse_connection_string(connection)
-        self.db = {}
-        self._finished_or_aborted = {}
-        db = self._getConnection()
-        transactional = db.server_capabilities & CLIENT.TRANSACTIONS
+        """
+          Parse the connection string.
+          Initiate a trial connection with the database to check
+          transactionality once instead of once per DB instance.
+        """
+        self._connection = connection
+        self._kw_args = self._parse_connection_string(connection)
+        self._db_pool = {}
+        self._db_lock = allocate_lock()
+        connection = MySQLdb.connect(**self._kw_args)
+        transactional = connection.server_capabilities & CLIENT.TRANSACTIONS
+        connection.close()
         if self._try_transactions == '-':
             transactional = 0
         elif not transactional and self._try_transactions == '+':
@@ -196,38 +198,6 @@
         self._use_TM = self._transactions = transactional
         if self._mysql_lock:
             self._use_TM = 1
-
-    def __del__(self):
-      self._cleanupConnections()
-
-    def _getFinishedOrAborted(self):
-      return self._finished_or_aborted[get_ident()]
-    
-    def _setFinishedOrAborted(self, value):
-      self._finished_or_aborted[get_ident()] = value
-
-    def _cleanupConnections(self):
-      for db in self.db.itervalues():
-        db.close()
-
-    def _forceReconnection(self):
-      db = apply(self.Database_Connection, (), self.kwargs)
-      self.db[get_ident()] = db
-      return db
-
-    def _getConnection(self):
-      ident = get_ident()
-      db = self.db.get(ident)
-      if db is None:
-        db = self._forceReconnection()
-      return db
-
-    def _closeConnection(self):
-      ident = get_ident()
-      db = self.db.get(ident)
-      if db is not None:
-        db.close()
-        del self.db[ident]
 
     def _parse_connection_string(self, connection):
         kwargs = {'conv': self.conv}
@@ -265,7 +235,75 @@
         if not items: return kwargs
         kwargs['unix_socket'], items = items[0], items[1:]
         return kwargs
-        
+
+    def _pool_set(self, key, value):
+      self._db_lock.acquire()
+      try:
+        self._db_pool[key] = value
+      finally:
+        self._db_lock.release()
+
+    def _pool_get(self, key):
+      self._db_lock.acquire()
+      try:
+        return self._db_pool.get(key)
+      finally:
+        self._db_lock.release()
+
+    def _access_db(self, method_id, args, kw):
+        """
+          Generic method to call pooled objects' methods.
+          When the current thread had never issued any call, create a DB
+          instance.
+        """
+        ident = get_ident()
+        db = self._pool_get(ident)
+        if db is None:
+            db = DB(kw_args=self._kw_args, use_TM=self._use_TM,
+                    mysql_lock=self._mysql_lock,
+                    transactions=self._transactions)
+            self._pool_set(ident, db)
+        return getattr(db, method_id)(*args, **kw)
+
+    def tables(self, *args, **kw):
+        return self._access_db(method_id='tables', args=args, kw=kw)
+
+    def columns(self, *args, **kw):
+        return self._access_db(method_id='columns', args=args, kw=kw)
+
+    def query(self, *args, **kw):
+        return self._access_db(method_id='query', args=args, kw=kw)
+
+    def string_literal(self, *args, **kw):
+        return self._access_db(method_id='string_literal', args=args, kw=kw)
+
+class DB(TM):
+
+    defs={
+        FIELD_TYPE.CHAR: "i", FIELD_TYPE.DATE: "d",
+        FIELD_TYPE.DATETIME: "d", FIELD_TYPE.DECIMAL: "n",
+        FIELD_TYPE.DOUBLE: "n", FIELD_TYPE.FLOAT: "n", FIELD_TYPE.INT24: "i",
+        FIELD_TYPE.LONG: "i", FIELD_TYPE.LONGLONG: "l",
+        FIELD_TYPE.SHORT: "i", FIELD_TYPE.TIMESTAMP: "d",
+        FIELD_TYPE.TINY: "i", FIELD_TYPE.YEAR: "i",
+        }
+
+    _p_oid=_p_changed=_registered=None
+
+    def __init__(self, kw_args, use_TM, mysql_lock, transactions):
+        self._kw_args = kw_args
+        self._mysql_lock = mysql_lock
+        self._use_TM = use_TM
+        self._transactions = transactions
+        self._forceReconnection()
+
+    def __del__(self):
+      self.db.close()
+
+    def _forceReconnection(self):
+      db = MySQLdb.connect(**self._kw_args)
+      self.db = db
+
     def tables(self, rdb=0,
                _care=('TABLE', 'VIEW')):
         r=[]
@@ -321,28 +359,29 @@
         return r
 
     def _query(self, query, force_reconnect=False):
-      """
-        Send a to MySQL server.
-        It reconnects automaticaly if needed and the following conditions are
-        met:
-         - It has not just tried to reconnect (ie, this function will not
-           attemp to connect twice per call).
-         - This conection is not transactionnal and has set not MySQL locks,
-           because they are bound to the connection. This check can be
-           overridden by passing force_reconnect with True value.
-      """
-      db = self._getConnection()
-      try:
-        db.query(query)
-      except OperationalError, m:
-        if ((not force_reconnect) and \
-            (self._mysql_lock or self._transactions)) or \
-           m[0] not in hosed_connection:
-          raise
-        # Hm. maybe the db is hosed.  Let's restart it.
-        self._forceReconnection()
-        db.query(query)
-      return db.store_result()
+        """
+          Send a to MySQL server.
+          It reconnects automaticaly if needed and the following conditions are
+          met:
+           - It has not just tried to reconnect (ie, this function will not
+             attemp to connect twice per call).
+           - This conection is not transactionnal and has set not MySQL locks,
+             because they are bound to the connection. This check can be
+             overridden by passing force_reconnect with True value.
+        """
+        try:
+            self.db.query(query)
+        except OperationalError, m:
+            if ((not force_reconnect) and \
+                (self._mysql_lock or self._transactions)) or \
+               m[0] not in (hosed_connection + dead_connection):
+                raise
+            # Hm. maybe the db is hosed.  Let's restart it.
+            self._forceReconnection()
+            if m[0] in dead_connection:
+                raise
+            self.db.query(query)
+        return self.db.store_result()
 
     def query(self,query_string, max_rows=1000):
         self._use_TM and self._register()
@@ -380,10 +419,11 @@
         return items, result
 
     def string_literal(self, s):
-      return self._getConnection().string_literal(s)
+        return self.db.string_literal(s)
 
     def _begin(self, *ignored):
         try:
+            self._transaction_begun = True
             # Ping the database to reconnect if connection was closed.
             self._query("SELECT 1", force_reconnect=True)
             if self._transactions:
@@ -394,21 +434,20 @@
             LOG('ZMySQLDA', ERROR, "exception during _begin",
                 error=sys.exc_info())
             raise
-        self._setFinishedOrAborted(False)
-        
+
     def _finish(self, *ignored):
-        if self._getFinishedOrAborted():
+        if not self._transaction_begun:
             return
-        self._setFinishedOrAborted(True)
+        self._transaction_begun = False
         if self._mysql_lock:
             self._query("SELECT RELEASE_LOCK('%s')" % self._mysql_lock)
         if self._transactions:
             self._query("COMMIT")
 
     def _abort(self, *ignored):
-        if self._getFinishedOrAborted():
+        if not self._transaction_begun:
             return
-        self._setFinishedOrAborted(True)
+        self._transaction_begun = False
         if self._mysql_lock:
             self._query("SELECT RELEASE_LOCK('%s')" % self._mysql_lock)
         if self._transactions:




More information about the Erp5-report mailing list