[Erp5-report] r13850 - in /erp5/trunk/products/ZMySQLDDA: DA.py db.py

nobody at svn.erp5.org nobody at svn.erp5.org
Mon Apr 2 14:00:04 CEST 2007


Author: vincent
Date: Mon Apr  2 14:00:03 2007
New Revision: 13850

URL: http://svn.erp5.org?rev=13850&view=rev
Log:
Fix same thread concurency mistake as in ZMySQLDDA/db.py (apply similar changes as r13845:13847).

Modified:
    erp5/trunk/products/ZMySQLDDA/DA.py
    erp5/trunk/products/ZMySQLDDA/db.py

Modified: erp5/trunk/products/ZMySQLDDA/DA.py
URL: http://svn.erp5.org/erp5/trunk/products/ZMySQLDDA/DA.py?rev=13850&r1=13849&r2=13850&view=diff
==============================================================================
--- erp5/trunk/products/ZMySQLDDA/DA.py (original)
+++ erp5/trunk/products/ZMySQLDDA/DA.py Mon Apr  2 14:00:03 2007
@@ -86,7 +86,7 @@
 database_type='MySQL'
 
 import os
-from db import DeferredDB
+from db import ThreadedDeferredDB
 import Shared.DC.ZRDB.Connection, sys, DABase
 from App.Dialogs import MessageDialog
 from Globals import HTMLFile
@@ -120,7 +120,7 @@
 
     manage_properties=HTMLFile('connectionEdit', globals())
 
-    def factory(self): return DeferredDB
+    def factory(self): return ThreadedDeferredDB
 
     def connect(self, s):
       try:
@@ -133,8 +133,8 @@
         else:
           if connection is not None:
             connection.closeConnection()
-          DB = self.factory()
-          database_connection_pool[pool_key] = DeferredDB(s)
+          ThreadedDeferredDB = self.factory()
+          database_connection_pool[pool_key] = ThreadedDeferredDB(s)
           self._v_database_connection = database_connection_pool[pool_key]
         # XXX If date is used as such, it can be wrong because an existing
         # connection may be reused. But this is suposedly only used as a

Modified: erp5/trunk/products/ZMySQLDDA/db.py
URL: http://svn.erp5.org/erp5/trunk/products/ZMySQLDDA/db.py?rev=13850&r1=13849&r2=13850&view=diff
==============================================================================
--- erp5/trunk/products/ZMySQLDDA/db.py (original)
+++ erp5/trunk/products/ZMySQLDDA/db.py Mon Apr  2 14:00:03 2007
@@ -104,7 +104,7 @@
 import string, sys
 from string import strip, split, find, upper, rfind
 from time import time
-from thread import get_ident
+from thread import get_ident, allocate_lock
 
 hosed_connection = (
     CR.SERVER_GONE_ERROR,
@@ -154,27 +154,12 @@
     try: return int(s)
     except: return long(s)
 
-class DeferredDB(TM):
+class ThreadedDeferredDB:
     """
         An experimental MySQL DA which implements deferred execution
         of SQL code in order to reduce locks and provide better behaviour
         with MyISAM non transactional tables
     """
-
-    Database_Connection=_mysql.connect
-    Database_Error=_mysql.Error
-
-    def Database_Connection(self, *args, **kwargs):
-      return MySQLdb.connect(*args, **kwargs)
-
-    defs={
-        FIELD_TYPE.CHAR: "i", FIELD_TYPE.DATE: "d",
-        FIELD_TYPE.DATETIME: "d", FIELD_TYPE.DECIMAL: "n",
-        FIELD_TYPE.DOUBLE: "n", FIELD_TYPE.FLOAT: "n", FIELD_TYPE.INT24: "i",
-        FIELD_TYPE.LONG: "i", FIELD_TYPE.LONGLONG: "l",
-        FIELD_TYPE.SHORT: "i", FIELD_TYPE.TIMESTAMP: "d",
-        FIELD_TYPE.TINY: "i", FIELD_TYPE.YEAR: "i",
-        }
 
     conv=conversions.copy()
     conv[FIELD_TYPE.LONG] = int_or_long
@@ -183,15 +168,19 @@
     conv[FIELD_TYPE.DECIMAL] = float
     del conv[FIELD_TYPE.TIME]
 
-    _p_oid=_p_changed=_registered=None
-
     def __init__(self,connection):
-        self.connection=connection
-        self.kwargs = self._parse_connection_string(connection)
-        self.db = {}
-        self._finished_or_aborted = {}
-        db = self._getConnection()
-        transactional = db.server_capabilities & CLIENT.TRANSACTIONS
+        """
+          Parse the connection string.
+          Initiate a trial connection with the database to check
+          transactionality once instead of once per DeferredDB instance.
+        """
+        self._connection = connection
+        self._kw_args = self._parse_connection_string(connection)
+        self._db_pool = {}
+        self._db_lock = allocate_lock()
+        connection = MySQLdb.connect(**self._kw_args)
+        transactional = connection.server_capabilities & CLIENT.TRANSACTIONS
+        connection.close()
         if self._try_transactions == '-':
             transactional = 0
         elif not transactional and self._try_transactions == '+':
@@ -199,48 +188,6 @@
         self._use_TM = self._transactions = transactional
         if self._mysql_lock:
             self._use_TM = 1
-        self._sql_string_list_dict = {}
-
-    def __del__(self):
-      self._cleanupConnections()
-
-    def _getFinishedOrAborted(self):
-      return self._finished_or_aborted[get_ident()]
-
-    def _setFinishedOrAborted(self, value):
-      self._finished_or_aborted[get_ident()] = value
-
-    def _cleanupConnections(self):
-      for db in self.db.itervalues():
-        db.close()
-
-    def _forceReconnection(self):
-      db = apply(self.Database_Connection, (), self.kwargs)
-      self.db[get_ident()] = db
-      return db
-
-    def _getConnection(self):
-      ident = get_ident()
-      db = self.db.get(ident)
-      if db is None:
-        db = self._forceReconnection()
-      return db
-
-    def _closeConnection(self):
-      ident = get_ident()
-      db = self.db.get(ident)
-      if db is not None:
-        db.close()
-        del self.db[ident]
-
-    def _emptySQLStringList(self):
-        self._sql_string_list_dict[get_ident()] = []
-
-    def _appendToSQLStringList(self, value):
-        self._sql_string_list_dict[get_ident()].append(value)
-
-    def _getSQLStringList(self):
-        return self._sql_string_list_dict[get_ident()]
 
     def _parse_connection_string(self, connection):
         kwargs = {'conv': self.conv}
@@ -278,6 +225,81 @@
         if not items: return kwargs
         kwargs['unix_socket'], items = items[0], items[1:]
         return kwargs
+
+    def _pool_set(self, key, value):
+      self._db_lock.acquire()
+      try:
+        self._db_pool[key] = value
+      finally:
+        self._db_lock.release()
+
+    def _pool_get(self, key):
+      self._db_lock.acquire()
+      try:
+        return self._db_pool.get(key)
+      finally:
+        self._db_lock.release()
+
+    def _access_db(self, method_id, args, kw):
+        """
+          Generic method to call pooled objects' methods.
+          When the current thread had never issued any call, create a DeferredDB
+          instance.
+        """
+        ident = get_ident()
+        db = self._pool_get(ident)
+        if db is None:
+            db = DeferredDB(kw_args=self._kw_args, use_TM=self._use_TM,
+                            mysql_lock=self._mysql_lock,
+                            transactions=self._transactions)
+            self._pool_set(ident, db)
+        return getattr(db, method_id)(*args, **kw)
+
+    def tables(self, *args, **kw):
+        return self._access_db(method_id='tables', args=args, kw=kw)
+
+    def columns(self, *args, **kw):
+        return self._access_db(method_id='columns', args=args, kw=kw)
+
+    def query(self, *args, **kw):
+        return self._access_db(method_id='query', args=args, kw=kw)
+
+    def string_literal(self, *args, **kw):
+        return self._access_db(method_id='string_literal', args=args, kw=kw)
+
+
+class DeferredDB(TM):
+    """
+        An experimental MySQL DA which implements deferred execution
+        of SQL code in order to reduce locks and provide better behaviour
+        with MyISAM non transactional tables
+    """
+
+    defs={
+        FIELD_TYPE.CHAR: "i", FIELD_TYPE.DATE: "d",
+        FIELD_TYPE.DATETIME: "d", FIELD_TYPE.DECIMAL: "n",
+        FIELD_TYPE.DOUBLE: "n", FIELD_TYPE.FLOAT: "n", FIELD_TYPE.INT24: "i",
+        FIELD_TYPE.LONG: "i", FIELD_TYPE.LONGLONG: "l",
+        FIELD_TYPE.SHORT: "i", FIELD_TYPE.TIMESTAMP: "d",
+        FIELD_TYPE.TINY: "i", FIELD_TYPE.YEAR: "i",
+        }
+
+    _p_oid=_p_changed=_registered=None
+
+    def __init__(self, kw_args, use_TM, mysql_lock, transactions):
+        self._kw_args = kw_args
+        self._mysql_lock = mysql_lock
+        self._use_TM = use_TM
+        self._transactions = transactions
+        self._forceReconnection()
+        self._sql_string_list = []
+
+    def __del__(self):
+        self.db.close()
+
+    def _forceReconnection(self):
+        db = MySQLdb.connect(**self._kw_args)
+        self.db = db
 
     def tables(self, rdb=0,
                _care=('TABLE', 'VIEW')):
@@ -344,9 +366,8 @@
            because they are bound to the connection. This check can be
            overridden by passing force_reconnect with True value.
       """
-      db = self._getConnection()
       try:
-        db.query(query)
+        self.db.query(query)
       except OperationalError, m:
         if ((not force_reconnect) and \
             (self._mysql_lock or self._transactions)) or \
@@ -354,8 +375,8 @@
           raise
         # Hm. maybe the db is hosed.  Let's restart it.
         self._forceReconnection()
-        db.query(query)
-      return db.store_result()
+        self.db.query(query)
+      return self.db.store_result()
 
     def query(self,query_string, max_rows=1000):
         self._use_TM and self._register()
@@ -363,31 +384,31 @@
             qtype = upper(split(qs, None, 1)[0])
             if qtype == "SELECT":
                 raise NotSupportedError, "can not SELECT in deferred connections"
-            self._appendToSQLStringList(qs)
+            self._sql_string_list.append(qs)
 
         return (),()
 
     def string_literal(self, s):
-      return self._getConnection().string_literal(s)
+      return self.db.string_literal(s)
 
     def _begin(self, *ignored):
         # The Deferred DB instance is sometimes used for several
         # transactions, so it is required to clear the sql_string_list
         # each time a transaction starts
-        self._emptySQLStringList()
-        self._setFinishedOrAborted(False)
+        self._sql_string_list = []
+        self._transaction_begun = True
 
     def _finish(self, *ignored):
-        if self._getFinishedOrAborted():
+        if not self._transaction_begun:
             return
-        self._setFinishedOrAborted(True)
+        self._transaction_begun = False
         # Ping the database to reconnect if connection was lost.
         self._query("SELECT 1", force_reconnect=True)
         if self._transactions:
             self._query("BEGIN")
         if self._mysql_lock:
             self._query("SELECT GET_LOCK('%s',0)" % self._mysql_lock)
-        for qs in self._getSQLStringList():
+        for qs in self._sql_string_list:
             self._query(qs)
         if self._mysql_lock:
             self._query("SELECT RELEASE_LOCK('%s')" % self._mysql_lock)
@@ -395,5 +416,5 @@
             self._query("COMMIT")
 
     def _abort(self, *ignored):
-        self._setFinishedOrAborted(True)
-
+        self._transaction_begun = False
+




More information about the Erp5-report mailing list