[Erp5-report] r13850 - in /erp5/trunk/products/ZMySQLDDA: DA.py db.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Mon Apr 2 14:00:04 CEST 2007
Author: vincent
Date: Mon Apr 2 14:00:03 2007
New Revision: 13850
URL: http://svn.erp5.org?rev=13850&view=rev
Log:
Fix same thread concurency mistake as in ZMySQLDDA/db.py (apply similar changes as r13845:13847).
Modified:
erp5/trunk/products/ZMySQLDDA/DA.py
erp5/trunk/products/ZMySQLDDA/db.py
Modified: erp5/trunk/products/ZMySQLDDA/DA.py
URL: http://svn.erp5.org/erp5/trunk/products/ZMySQLDDA/DA.py?rev=13850&r1=13849&r2=13850&view=diff
==============================================================================
--- erp5/trunk/products/ZMySQLDDA/DA.py (original)
+++ erp5/trunk/products/ZMySQLDDA/DA.py Mon Apr 2 14:00:03 2007
@@ -86,7 +86,7 @@
database_type='MySQL'
import os
-from db import DeferredDB
+from db import ThreadedDeferredDB
import Shared.DC.ZRDB.Connection, sys, DABase
from App.Dialogs import MessageDialog
from Globals import HTMLFile
@@ -120,7 +120,7 @@
manage_properties=HTMLFile('connectionEdit', globals())
- def factory(self): return DeferredDB
+ def factory(self): return ThreadedDeferredDB
def connect(self, s):
try:
@@ -133,8 +133,8 @@
else:
if connection is not None:
connection.closeConnection()
- DB = self.factory()
- database_connection_pool[pool_key] = DeferredDB(s)
+ ThreadedDeferredDB = self.factory()
+ database_connection_pool[pool_key] = ThreadedDeferredDB(s)
self._v_database_connection = database_connection_pool[pool_key]
# XXX If date is used as such, it can be wrong because an existing
# connection may be reused. But this is suposedly only used as a
Modified: erp5/trunk/products/ZMySQLDDA/db.py
URL: http://svn.erp5.org/erp5/trunk/products/ZMySQLDDA/db.py?rev=13850&r1=13849&r2=13850&view=diff
==============================================================================
--- erp5/trunk/products/ZMySQLDDA/db.py (original)
+++ erp5/trunk/products/ZMySQLDDA/db.py Mon Apr 2 14:00:03 2007
@@ -104,7 +104,7 @@
import string, sys
from string import strip, split, find, upper, rfind
from time import time
-from thread import get_ident
+from thread import get_ident, allocate_lock
hosed_connection = (
CR.SERVER_GONE_ERROR,
@@ -154,27 +154,12 @@
try: return int(s)
except: return long(s)
-class DeferredDB(TM):
+class ThreadedDeferredDB:
"""
An experimental MySQL DA which implements deferred execution
of SQL code in order to reduce locks and provide better behaviour
with MyISAM non transactional tables
"""
-
- Database_Connection=_mysql.connect
- Database_Error=_mysql.Error
-
- def Database_Connection(self, *args, **kwargs):
- return MySQLdb.connect(*args, **kwargs)
-
- defs={
- FIELD_TYPE.CHAR: "i", FIELD_TYPE.DATE: "d",
- FIELD_TYPE.DATETIME: "d", FIELD_TYPE.DECIMAL: "n",
- FIELD_TYPE.DOUBLE: "n", FIELD_TYPE.FLOAT: "n", FIELD_TYPE.INT24: "i",
- FIELD_TYPE.LONG: "i", FIELD_TYPE.LONGLONG: "l",
- FIELD_TYPE.SHORT: "i", FIELD_TYPE.TIMESTAMP: "d",
- FIELD_TYPE.TINY: "i", FIELD_TYPE.YEAR: "i",
- }
conv=conversions.copy()
conv[FIELD_TYPE.LONG] = int_or_long
@@ -183,15 +168,19 @@
conv[FIELD_TYPE.DECIMAL] = float
del conv[FIELD_TYPE.TIME]
- _p_oid=_p_changed=_registered=None
-
def __init__(self,connection):
- self.connection=connection
- self.kwargs = self._parse_connection_string(connection)
- self.db = {}
- self._finished_or_aborted = {}
- db = self._getConnection()
- transactional = db.server_capabilities & CLIENT.TRANSACTIONS
+ """
+ Parse the connection string.
+ Initiate a trial connection with the database to check
+ transactionality once instead of once per DeferredDB instance.
+ """
+ self._connection = connection
+ self._kw_args = self._parse_connection_string(connection)
+ self._db_pool = {}
+ self._db_lock = allocate_lock()
+ connection = MySQLdb.connect(**self._kw_args)
+ transactional = connection.server_capabilities & CLIENT.TRANSACTIONS
+ connection.close()
if self._try_transactions == '-':
transactional = 0
elif not transactional and self._try_transactions == '+':
@@ -199,48 +188,6 @@
self._use_TM = self._transactions = transactional
if self._mysql_lock:
self._use_TM = 1
- self._sql_string_list_dict = {}
-
- def __del__(self):
- self._cleanupConnections()
-
- def _getFinishedOrAborted(self):
- return self._finished_or_aborted[get_ident()]
-
- def _setFinishedOrAborted(self, value):
- self._finished_or_aborted[get_ident()] = value
-
- def _cleanupConnections(self):
- for db in self.db.itervalues():
- db.close()
-
- def _forceReconnection(self):
- db = apply(self.Database_Connection, (), self.kwargs)
- self.db[get_ident()] = db
- return db
-
- def _getConnection(self):
- ident = get_ident()
- db = self.db.get(ident)
- if db is None:
- db = self._forceReconnection()
- return db
-
- def _closeConnection(self):
- ident = get_ident()
- db = self.db.get(ident)
- if db is not None:
- db.close()
- del self.db[ident]
-
- def _emptySQLStringList(self):
- self._sql_string_list_dict[get_ident()] = []
-
- def _appendToSQLStringList(self, value):
- self._sql_string_list_dict[get_ident()].append(value)
-
- def _getSQLStringList(self):
- return self._sql_string_list_dict[get_ident()]
def _parse_connection_string(self, connection):
kwargs = {'conv': self.conv}
@@ -278,6 +225,81 @@
if not items: return kwargs
kwargs['unix_socket'], items = items[0], items[1:]
return kwargs
+
+ def _pool_set(self, key, value):
+ self._db_lock.acquire()
+ try:
+ self._db_pool[key] = value
+ finally:
+ self._db_lock.release()
+
+ def _pool_get(self, key):
+ self._db_lock.acquire()
+ try:
+ return self._db_pool.get(key)
+ finally:
+ self._db_lock.release()
+
+ def _access_db(self, method_id, args, kw):
+ """
+ Generic method to call pooled objects' methods.
+ When the current thread had never issued any call, create a DeferredDB
+ instance.
+ """
+ ident = get_ident()
+ db = self._pool_get(ident)
+ if db is None:
+ db = DeferredDB(kw_args=self._kw_args, use_TM=self._use_TM,
+ mysql_lock=self._mysql_lock,
+ transactions=self._transactions)
+ self._pool_set(ident, db)
+ return getattr(db, method_id)(*args, **kw)
+
+ def tables(self, *args, **kw):
+ return self._access_db(method_id='tables', args=args, kw=kw)
+
+ def columns(self, *args, **kw):
+ return self._access_db(method_id='columns', args=args, kw=kw)
+
+ def query(self, *args, **kw):
+ return self._access_db(method_id='query', args=args, kw=kw)
+
+ def string_literal(self, *args, **kw):
+ return self._access_db(method_id='string_literal', args=args, kw=kw)
+
+
+class DeferredDB(TM):
+ """
+ An experimental MySQL DA which implements deferred execution
+ of SQL code in order to reduce locks and provide better behaviour
+ with MyISAM non transactional tables
+ """
+
+ defs={
+ FIELD_TYPE.CHAR: "i", FIELD_TYPE.DATE: "d",
+ FIELD_TYPE.DATETIME: "d", FIELD_TYPE.DECIMAL: "n",
+ FIELD_TYPE.DOUBLE: "n", FIELD_TYPE.FLOAT: "n", FIELD_TYPE.INT24: "i",
+ FIELD_TYPE.LONG: "i", FIELD_TYPE.LONGLONG: "l",
+ FIELD_TYPE.SHORT: "i", FIELD_TYPE.TIMESTAMP: "d",
+ FIELD_TYPE.TINY: "i", FIELD_TYPE.YEAR: "i",
+ }
+
+ _p_oid=_p_changed=_registered=None
+
+ def __init__(self, kw_args, use_TM, mysql_lock, transactions):
+ self._kw_args = kw_args
+ self._mysql_lock = mysql_lock
+ self._use_TM = use_TM
+ self._transactions = transactions
+ self._forceReconnection()
+ self._sql_string_list = []
+
+ def __del__(self):
+ self.db.close()
+
+ def _forceReconnection(self):
+ db = MySQLdb.connect(**self._kw_args)
+ self.db = db
def tables(self, rdb=0,
_care=('TABLE', 'VIEW')):
@@ -344,9 +366,8 @@
because they are bound to the connection. This check can be
overridden by passing force_reconnect with True value.
"""
- db = self._getConnection()
try:
- db.query(query)
+ self.db.query(query)
except OperationalError, m:
if ((not force_reconnect) and \
(self._mysql_lock or self._transactions)) or \
@@ -354,8 +375,8 @@
raise
# Hm. maybe the db is hosed. Let's restart it.
self._forceReconnection()
- db.query(query)
- return db.store_result()
+ self.db.query(query)
+ return self.db.store_result()
def query(self,query_string, max_rows=1000):
self._use_TM and self._register()
@@ -363,31 +384,31 @@
qtype = upper(split(qs, None, 1)[0])
if qtype == "SELECT":
raise NotSupportedError, "can not SELECT in deferred connections"
- self._appendToSQLStringList(qs)
+ self._sql_string_list.append(qs)
return (),()
def string_literal(self, s):
- return self._getConnection().string_literal(s)
+ return self.db.string_literal(s)
def _begin(self, *ignored):
# The Deferred DB instance is sometimes used for several
# transactions, so it is required to clear the sql_string_list
# each time a transaction starts
- self._emptySQLStringList()
- self._setFinishedOrAborted(False)
+ self._sql_string_list = []
+ self._transaction_begun = True
def _finish(self, *ignored):
- if self._getFinishedOrAborted():
+ if not self._transaction_begun:
return
- self._setFinishedOrAborted(True)
+ self._transaction_begun = False
# Ping the database to reconnect if connection was lost.
self._query("SELECT 1", force_reconnect=True)
if self._transactions:
self._query("BEGIN")
if self._mysql_lock:
self._query("SELECT GET_LOCK('%s',0)" % self._mysql_lock)
- for qs in self._getSQLStringList():
+ for qs in self._sql_string_list:
self._query(qs)
if self._mysql_lock:
self._query("SELECT RELEASE_LOCK('%s')" % self._mysql_lock)
@@ -395,5 +416,5 @@
self._query("COMMIT")
def _abort(self, *ignored):
- self._setFinishedOrAborted(True)
-
+ self._transaction_begun = False
+
More information about the Erp5-report
mailing list