[Erp5-report] r24513 - in /erp5/trunk/products/TIDStorage: ./ bin/ repozo/ tests/ utils/
nobody at svn.erp5.org
nobody at svn.erp5.org
Thu Nov 6 17:14:26 CET 2008
Author: vincent
Date: Thu Nov 6 17:14:24 2008
New Revision: 24513
URL: http://svn.erp5.org?rev=24513&view=rev
Log:
Initial import of TIDStorage product.
Added:
erp5/trunk/products/TIDStorage/
erp5/trunk/products/TIDStorage/ExchangeProtocol.py
erp5/trunk/products/TIDStorage/README
erp5/trunk/products/TIDStorage/ZEOClientStorage.py
erp5/trunk/products/TIDStorage/__init__.py
erp5/trunk/products/TIDStorage/bin/
erp5/trunk/products/TIDStorage/bin/server.py (with props)
erp5/trunk/products/TIDStorage/repozo/
erp5/trunk/products/TIDStorage/repozo/from_z2.8.8_repozo.diff
erp5/trunk/products/TIDStorage/repozo/repozo_tidstorage.py (with props)
erp5/trunk/products/TIDStorage/repozo/restore_tidstorage.py (with props)
erp5/trunk/products/TIDStorage/repozo/sample_configuration.py
erp5/trunk/products/TIDStorage/tests/
erp5/trunk/products/TIDStorage/tests/testTIDServer.py (with props)
erp5/trunk/products/TIDStorage/transaction_transaction.py
erp5/trunk/products/TIDStorage/utils/
erp5/trunk/products/TIDStorage/utils/check_tid_log.py (with props)
erp5/trunk/products/TIDStorage/utils/tid_log_to_full.py (with props)
Added: erp5/trunk/products/TIDStorage/ExchangeProtocol.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/ExchangeProtocol.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/ExchangeProtocol.py (added)
+++ erp5/trunk/products/TIDStorage/ExchangeProtocol.py [utf8] Thu Nov 6 17:14:24 2008
@@ -1,0 +1,131 @@
+############################################################################
+#
+# Copyright (c) 2007, 2008 Nexedi SARL and Contributors. All Rights Reserved.
+# Vincent Pelletier <vincent at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+class ClientDisconnected(Exception):
+ pass
+
+class ExchangeProtocol:
+ """
+ Handle data exchange between client and server.
+ Kinds of data which can be exchanged:
+ - str
+ send_field
+ recv_field
+ - int
+ send_int
+ recv_int
+ - list of str
+ send_list
+ recv_list
+ - list of int
+ send_int_list
+ recv_int_list
+ - dict (key: str, value: int)
+ send_dict
+ recv_dict
+ Forbidden chars:
+ Send (raise if present):
+ \\n (field separator)
+ Receive (stripped silently):
+ \\n (field separator)
+ \\r (for compatibility)
+ """
+ def __init__(self, socket):
+ self._socket = socket
+
+ def send_field(self, to_send):
+ if type(to_send) is not str:
+ raise ValueError, 'Value is not of str type: %r' % (type(to_send), )
+ if '\n' in to_send:
+ raise ValueError, '\\n is a forbidden value.'
+ self._socket.send(to_send)
+ self._socket.send('\n')
+
+ def recv_field(self):
+ received = None
+ result = []
+ append = result.append
+ while received != '\n':
+ received = self._socket.recv(1)
+ if len(received) == 0:
+ raise ClientDisconnected
+ if received != '\r':
+ append(received)
+ return ''.join(result[:-1])
+
+ def send_int(self, to_send):
+ self.send_field(str(to_send))
+
+ def recv_int(self):
+ return int(self.recv_field())
+
+ def send_list(self, to_send, send_length=True):
+ assert isinstance(to_send, (tuple, list))
+ if send_length:
+ self.send_int(len(to_send))
+ for field in to_send:
+ self.send_field(field)
+
+ def send_int_list(self, to_send, *args, **kw):
+ self.send_list([str(x) for x in to_send], *args, **kw)
+
+ def recv_list(self, length=None):
+ result = []
+ append = result.append
+ if length is None:
+ length = int(self.recv_field())
+ for field_number in xrange(length):
+ append(self.recv_field())
+ return result
+
+ def recv_int_list(self, *args, **kw):
+ return [int(x) for x in self.recv_list(*args, **kw)]
+
+ def send_dict(self, to_send):
+ """
+ Key: string
+ Value: int
+ """
+ assert isinstance(to_send, (dict))
+ if len(to_send) == 0:
+ key_list = value_list = []
+ else:
+ key_list, value_list = zip(*to_send.items())
+ self.send_list(key_list)
+ self.send_int_list(value_list, send_length=False)
+
+ def recv_dict(self):
+ """
+ Key: string
+ Value: int
+ """
+ key_list = self.recv_list()
+ value_list = self.recv_int_list(len(key_list))
+ result = dict(zip(key_list, value_list))
+ return result
+
Added: erp5/trunk/products/TIDStorage/README
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/README?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/README (added)
+++ erp5/trunk/products/TIDStorage/README [utf8] Thu Nov 6 17:14:24 2008
@@ -1,0 +1,54 @@
+1) Protocole:
+ Tous caract�s autoris�dans les donn�, �'exception de \n et \r.
+ Tout champ se termine par un \n (\r ignor�
+ Pas d'�appement.
+ Lors de transfert de listes, la liste est pr�d�par le nombre de champs qu'elle contient.
+ Ex:
+ 3\n
+ foo\n
+ bar\n
+ baz\n
+
+2) Commande de d�t de commit:
+
+BEGIN\n
+<identifiant du commit>\n
+<liste des storages concern�
+
+ <identifiant du commit>: doit �e identique �elui fourni �a fin de l'op�tion (que �soit un ABORT ou un COMMIT)
+ <liste des storages concern�: liste des identifiants des storages concern�par le commit
+ NB: la liste se termine par un \n, il n'est donc pas r�t�ci.
+
+R�nse: (rien)
+
+3) Commande d'annulation de la transaction:
+
+ABORT\n
+<identifiant du commit>\n
+
+ <identifiant du commit>: (cf. BEGIN)
+
+R�nse: (rien)
+
+4) Commande de finalisation de la transaction:
+
+COMMIT\n
+<identifiant du commit>\n
+<liste des storages concern�
+<liste des TIDs commit�
+
+ <identifiant du commit>: (cf. BEGIN)
+ <liste des storages concern�: (cf. BEGIN)
+ <liste des TIDs commit�: De m� longueur que la liste des storages concern� L'ordre doit corresponde �ette derni�.
+ NB: la liste se termine par un \n, il n'est donc pas r�t�ci.
+
+R�nse: (rien)
+
+5) Commande de lecture des donn�:
+
+DUMP\n
+
+R�nse:
+<liste des storages>
+<liste des TIDs>
+
Added: erp5/trunk/products/TIDStorage/ZEOClientStorage.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/ZEOClientStorage.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/ZEOClientStorage.py (added)
+++ erp5/trunk/products/TIDStorage/ZEOClientStorage.py [utf8] Thu Nov 6 17:14:24 2008
@@ -1,0 +1,53 @@
+############################################################################
+#
+# Copyright (c) 2007 Nexedi SARL and Contributors. All Rights Reserved.
+# Vincent Pelletier <vincent at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+from ZEO.ClientStorage import ClientStorage
+
+LAST_COMMITED_TID_PROPERTY_ID = '_last_commited_tid'
+
+# Hook tpc_finish's hook method.
+# New hook must be a local method because it must access tpc_finish's "self"
+# and original hook.
+
+original_tpc_finish = ClientStorage.tpc_finish
+def tpc_finish(self, txn, f=None):
+ def saveTIDOnInstance(tid):
+ if f is not None:
+ f(tid)
+ setattr(self, LAST_COMMITED_TID_PROPERTY_ID, tid)
+ return original_tpc_finish(self, txn, f=saveTIDOnInstance)
+ClientStorage.tpc_finish = tpc_finish
+
+def getLastCommitedTID(self):
+ """
+ Return last commited tid for this storage, or None if no transaction
+ was commited yet.
+ """
+ return getattr(self, LAST_COMMITED_TID_PROPERTY_ID, None)
+ClientStorage.getLastCommitedTID = getLastCommitedTID
+
Added: erp5/trunk/products/TIDStorage/__init__.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/__init__.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/__init__.py (added)
+++ erp5/trunk/products/TIDStorage/__init__.py [utf8] Thu Nov 6 17:14:24 2008
@@ -1,0 +1,32 @@
+############################################################################
+#
+# Copyright (c) 2007, 2008 Nexedi SARL and Contributors. All Rights Reserved.
+# Vincent Pelletier <vincent at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+# Load monkey patches
+import transaction_transaction
+import ZEOClientStorage
+
Added: erp5/trunk/products/TIDStorage/bin/server.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/bin/server.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/bin/server.py (added)
+++ erp5/trunk/products/TIDStorage/bin/server.py [utf8] Thu Nov 6 17:14:24 2008
@@ -1,0 +1,766 @@
+#!/usr/bin/python
+
+##############################################################################
+#
+# Copyright (c) 2007, 2008 Nexedi SARL and Contributors. All Rights Reserved.
+# Vincent Pelletier <vincent at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+# About errors in TIDStorage logs:
+# - CRITICAL: Decreasing update ignored
+# This error means that any backup started prior to this error can contain
+# incomplete transaction data.
+# This error can happen when TIDStorage did not handle received data in the
+# right order.
+# Example:
+# 3 storages (S1, S2, S3):
+# They all start at TID=1 value.
+# 2 transaction (T1, T2):
+# T1 commits TID 3 on S2, TID 2 on S3
+# T2 commits TID 2 on S1, TID 2 on S2
+# Due to those TIDs, TIDStorage *should* handle data in this order:
+# T2begin, T2commit, T1begin, T1commit
+# Or:
+# T2begin, T1begin, T2commit, T1commit
+# Or even, though it denotes a late handling of T2commit:
+# T2begin, T1begin, T1commit, T2commit
+# But, if TIDStorage handles data in the following order:
+# T1begin, T1commit, T2begin, T2commit
+# *AND* a backup dumps TIDStorage content at a point between T1commit and
+# T2commit, then the backup will contain T2's commit on S2, which has a
+# lower TID than T1's commit on that same storage.
+#
+# - Abort received, but never began
+# and
+# - Commit received, but never began
+# These erros means that packets were lost/never received.
+# This should not happen, since network connection is TCP, and TCP
+# retransmits data.
+# But it happens frequently if TIDStorage is started when Zope is under
+# load. This is because Zope attemped to contact TIDStorage at "begin"
+# step, but could not reach it. Then, at commit (or abort) it could reach
+# TIDStorage, causing the error message.
+# This error is bening, because:
+# - Until bootstrap is complete, no TID is available for backup
+# - When bootstrap is complete, it means that every ZODB got unlocked
+# at some point (since TIDStorage commit happens after ZODB tpc_finish
+# lock release).
+# - When a transaction sends data to multiple ZODBs, there is a point in
+# time when ALL impacted ZODBs are locked.
+# The conclusion of all this is that any transaction started before
+# TIDStorage was available has necesarily finished (commit or abort) at
+# the time bootstrap finished.
+# So no backup can be affected by such message (but backup feasability can
+# get delayed as locks would delay bootstrap end, and hence TID data
+# availability).
+
+import os
+import imp
+import sys
+import pwd
+import grp
+import sets
+import time
+import urllib
+import socket
+import signal
+import getopt
+import SocketServer
+import threading
+import traceback
+from ExchangeProtocol import ClientDisconnected, ExchangeProtocol
+
+class TransactionException(Exception):
+ pass
+
+class AlwaysIncreasingDict(dict):
+ """
+ When inserting/updating a value, check that the new one is strictly
+ greater than existing key (or integer 0 value if no value existed for
+ given key).
+ Values are converted to integers before comparison.
+
+ TODO:
+ - Do not descend from dict to prevent users from avoiding checks.
+ """
+ def __init__(self, strict=False, *args, **kw):
+ dict.__init__(self, *args, **kw)
+ self._strict = strict
+
+ def __setitem__(self, key, value):
+ if self.get(key, 0) < value:
+ dict.__setitem__(self, key, value)
+ else:
+ if self._strict:
+ log('CRITICAL: Decreasing update ignored: key=%r %r <= %r' % \
+ (key, value, self.get(key, 0)))
+
+ def update(self, other):
+ """
+ To check for decreases.
+ """
+ for key, value in other.iteritems():
+ self[key] = value
+
+class TransactionTracker:
+ """
+ Implement transaction tracking.
+ This class is not thread-safe.
+ A transaction starts with a call to "begin" and ends with a call to
+ "finish" with the same identifier.
+ "finish" returns payload provided at begin (or True if no payload was
+ given) if nothing illegal was detected, otherwise returns False.
+
+ Illegal cases:
+ - "begin" called twice without intermediate "finish" call
+ - "finish" called without a corresponding "begin" call (this includes
+ calling "finish" twice)
+ """
+ def __init__(self):
+ self._container = {}
+
+ def begin(self, identifier, payload=True):
+ if identifier in self._container:
+ raise TransactionException, 'Begin called twice in a row.'
+ self._container[identifier] = payload
+
+ def finish(self, identifier):
+ if identifier not in self._container:
+ raise TransactionException, 'Finish called without former "begin" call.'
+ return self._container.pop(identifier)
+
+class TIDServer(SocketServer.BaseRequestHandler):
+ """
+ Exchange data with connected peer.
+
+ TODO:
+ - Implement socket buffering.
+ """
+ def log(self, message):
+ log('%r: %s' % (self.client_address, message))
+
+ def dump(self):
+ tid_dict = self._tid_storage.dump()
+ self._field_exchange.send_dict(tid_dict)
+
+ def begin(self):
+ identifier = self._field_exchange.recv_field()
+ storage_id_list = self._field_exchange.recv_list()
+ self._tid_storage.begin(identifier, storage_id_list)
+
+ def abort(self):
+ identifier = self._field_exchange.recv_field()
+ self._tid_storage.abort(identifier)
+
+ def commit(self):
+ identifier = self._field_exchange.recv_field()
+ tid_dict = self._field_exchange.recv_dict()
+ self._tid_storage.commit(identifier, tid_dict)
+
+ def handle(self):
+ global tid_storage
+ self._tid_storage = tid_storage
+ self._field_exchange = ExchangeProtocol(socket=self.request)
+ command_mapping = {
+ 'begin': self.begin,
+ 'abort': self.abort,
+ 'commit': self.commit,
+ 'dump': self.dump
+ }
+ self.log('Connected')
+ try:
+ # Intercept ClientDisconnected exception to stop thread nicely instead
+ # of crashing.
+ # Log all others exceptions.
+ while True:
+ received = self._field_exchange.recv_field()
+ command_str = received.lower()
+ if command_str == 'quit':
+ break
+ method = command_mapping.get(command_str)
+ if method is not None:
+ # Intercept all errors to log it instead of causing disconnection.
+ # Except, of course, the ClientDisconnected exception itself.
+ try:
+ method()
+ except ClientDisconnected:
+ raise
+ except:
+ self.log('\n'.join(traceback.format_exception(*sys.exc_info())))
+ except ClientDisconnected:
+ pass
+ except:
+ self.log('\n'.join(traceback.format_exception(*sys.exc_info())))
+ self.log('Client disconnected')
+ self.request.shutdown(socket.SHUT_RDWR)
+ self.request.close()
+ return
+
+class TIDStorage:
+ """
+ Store ZODB TIDs for multiple ZODBs.
+ Designed to be a singleton.
+ Thread-safe.
+
+ Consequently, transactions are not bound to a specific connection: If a
+ connection is cut after a "begin", reconnecting and issuing "abort" or
+ "commit" is valid.
+
+ TODO:
+ - Use smaller locking areas
+ - Improve decision taking algorythm in _unregisterTransactionID (implies
+ modifying _registerTransactionIDAndStorageID).
+ """
+ _storage_id_lock = threading.RLock()
+ _next_full_dump = None
+ _next_dump = None
+ _tid_file = None
+ _burst_period = None
+ _full_dump_period = None
+
+ def __init__(self, tid_file_path=None, burst_period=None, full_dump_period=None):
+ self._transaction_tracker = TransactionTracker()
+ self._storage = AlwaysIncreasingDict(strict=True)
+ self._transcient = AlwaysIncreasingDict()
+ self._storage_id_to_transaction_id_list_dict = {}
+ self._transaction_id_to_storage_id_list_dict = {}
+ self._storage_id_to_storage_id_set_dict = {}
+ if tid_file_path is not None:
+ self._tid_file = LogFile(tid_file_path)
+ self._burst_period = burst_period
+ self._full_dump_period = full_dump_period
+ now = time.time()
+ if full_dump_period is not None:
+ self._next_full_dump = now
+ if burst_period is not None:
+ self._next_dump = now
+ self._since_last_burst = sets.Set()
+
+ def __repr__(self):
+ result = []
+ append = result.append
+ self._storage_id_lock.acquire()
+ try:
+ append('_storage_id_to_transaction_id_list_dict=' + \
+ repr(self._storage_id_to_transaction_id_list_dict))
+ append('_transaction_id_to_storage_id_list_dict=' + \
+ repr(self._transaction_id_to_storage_id_list_dict))
+ append('_storage_id_to_storage_id_set_dict=' + \
+ repr(self._storage_id_to_storage_id_set_dict))
+ append('_transcient=' + repr(self._transcient))
+ append('_storage=' + repr(self._storage))
+ finally:
+ self._storage_id_lock.release()
+ return '\n'.join(result)
+
+ def _registerTransactionIDAndStorageID(self, transaction_id, storage_id_list):
+ assert len(storage_id_list) != 0
+ assert self._storage_id_lock.acquire(False)
+ try:
+ # Update transaction_id -> storage_id_list
+ assert transaction_id not in self._transaction_id_to_storage_id_list_dict
+ self._transaction_id_to_storage_id_list_dict[transaction_id] = storage_id_list
+ storage_id_set = sets.Set(storage_id_list)
+ storage_id_set_id_set = sets.Set()
+ for storage_id in storage_id_list:
+ # Update storage_id -> transaction_id_list
+ identifier_set = self._storage_id_to_transaction_id_list_dict.get(storage_id)
+ if identifier_set is None:
+ identifier_set = self._storage_id_to_transaction_id_list_dict[storage_id] = sets.Set()
+ assert transaction_id not in identifier_set
+ identifier_set.add(transaction_id)
+ # Prepare the update storage_id -> storage_id_set
+ existing_storage_id_set = self._storage_id_to_storage_id_set_dict.get(storage_id, None)
+ if existing_storage_id_set is not None:
+ storage_id_set.union_update(existing_storage_id_set)
+ storage_id_set_id_set.add(id(existing_storage_id_set))
+ # Update storage_id -> storage_id_set
+ # Cannot use iteritems because dict is modified in the loop.
+ for key, value in self._storage_id_to_storage_id_set_dict.items():
+ if id(value) in storage_id_set_id_set:
+ self._storage_id_to_storage_id_set_dict[key] = storage_id_set
+ for storage_id in storage_id_set:
+ self._storage_id_to_storage_id_set_dict[storage_id] = storage_id_set
+ finally:
+ self._storage_id_lock.release()
+
+ def _unregisterTransactionID(self, transaction_id):
+ """
+ Also transfers from self._transcient to self._storage.
+ """
+ assert self._storage_id_lock.acquire(False)
+ try:
+ # Update transaction_id -> storage_id_list and retrieve storage_id_list
+ # Raises if not found
+ storage_id_list = self._transaction_id_to_storage_id_list_dict.pop(transaction_id)
+ # Update storage_id -> transaction_id_list
+ for storage_id in storage_id_list:
+ identifier_set = self._storage_id_to_transaction_id_list_dict[storage_id]
+ # Raises if not found
+ identifier_set.remove(transaction_id)
+ if len(identifier_set) == 0:
+ del self._storage_id_to_transaction_id_list_dict[storage_id]
+ # Update storage_id -> storage_id_set
+ # Raises if not found
+ storage_id_set = self._storage_id_to_storage_id_set_dict[storage_id]
+ # Raises if not found
+ storage_id_set.remove(storage_id)
+ if self._tid_file is not None:
+ now = time.time()
+ can_full_dump = has_bootstraped and (self._next_full_dump is not None) and (self._next_full_dump < now)
+ can_dump = (not can_full_dump) and (self._next_dump is not None) and (self._next_dump < now)
+ record_for_dump = can_dump or (self._next_dump is not None)
+ append_to_file = has_bootstraped and (can_dump or can_full_dump)
+ else:
+ append_to_file = record_for_dump = can_dump = can_full_dump = False
+ for key, value in self._storage_id_to_storage_id_set_dict.iteritems():
+ if len(value) == 0 and key in self._transcient:
+ self._storage[key] = self._transcient.pop(key)
+ if record_for_dump:
+ self._since_last_burst.add(key)
+ if append_to_file:
+ if can_full_dump:
+ to_dump_dict = self._storage
+ dump_code = 'f'
+ else:
+ to_dump_dict = dict([(key, self._storage[key]) for key in self._since_last_burst])
+ dump_code = 'd'
+ if len(to_dump_dict):
+ self._tid_file.write('%.02f %s %r\n' % (now, dump_code, to_dump_dict))
+ if can_full_dump:
+ self._next_full_dump = now + self._full_dump_period
+ if self._next_dump is not None:
+ self._next_dump = now + self._burst_period
+ self._since_last_burst.clear()
+ if not has_bootstraped:
+ doBootstrap()
+ #if len(self._storage_id_to_transaction_id_list_dict) == 0:
+ # self._storage.update(self._transcient)
+ # self._transcient.clear()
+ finally:
+ self._storage_id_lock.release()
+
+ def dump(self):
+ self._storage_id_lock.acquire()
+ try:
+ return self._storage.copy()
+ finally:
+ self._storage_id_lock.release()
+
+ def begin(self, transaction_id, storage_id_list):
+ self._storage_id_lock.acquire()
+ try:
+ self._transaction_tracker.begin(transaction_id, storage_id_list)
+ self._registerTransactionIDAndStorageID(transaction_id, storage_id_list)
+ finally:
+ self._storage_id_lock.release()
+
+ def abort(self, transaction_id):
+ self._storage_id_lock.acquire()
+ try:
+ try:
+ self._transaction_tracker.finish(transaction_id)
+ except TransactionException:
+ # Overwrite exception message
+ raise TransactionException, 'Abort received, but never began'
+ self._unregisterTransactionID(transaction_id)
+ finally:
+ self._storage_id_lock.release()
+
+ def commit(self, transaction_id, tid_dict):
+ self._storage_id_lock.acquire()
+ try:
+ try:
+ storage_id_list = self._transaction_tracker.finish(transaction_id)
+ except TransactionException:
+ # Overwrite exception message
+ raise TransactionException, 'Commit received, but never began'
+ check_dict = tid_dict.copy()
+ for storage_id in storage_id_list:
+ del check_dict[storage_id]
+ assert len(check_dict) == 0
+ self._transcient.update(tid_dict)
+ self._unregisterTransactionID(transaction_id)
+ finally:
+ self._storage_id_lock.release()
+
+class BootstrapContent(threading.Thread):
+ """
+ Thread used to bootstrap TIDStorage content.
+ This must be started at first client request, and must be run only once.
+ Global boolean "has_bootstraped" is set to true once it succeeded.
+ """
+
+ def __init__(self, *args, **kw):
+ threading.Thread.__init__(self, *args, **kw)
+ self.setDaemon(True)
+
+ def run(self):
+ """
+ Contact all zopes to serialize all their storages.
+ """
+ global has_bootstraped
+ base_url = options.base_url
+ if base_url is not None:
+ storage_id_to_object_path_dict = dict([(key, value[2]) for key, value
+ in options.known_tid_storage_identifier_dict.iteritems()
+ if value[2] is not None])
+ target_storage_id_set = sets.ImmutableSet(storage_id_to_object_path_dict.keys())
+ known_storage_id_set = sets.ImmutableSet(tid_storage.dump().keys())
+ to_check_storage_id_set = target_storage_id_set - known_storage_id_set
+ while len(to_check_storage_id_set) and can_bootstrap:
+ serialize_url = None
+ for storage_id in to_check_storage_id_set:
+ if can_bootstrap and storage_id not in tid_storage.dump().keys():
+ serialize_url = base_url % (storage_id_to_object_path_dict[storage_id], )
+ try:
+ # Query a Zope, which will contact this process in return to store
+ # the new TID number, making the given storage known.
+ page = urllib.urlopen(serialize_url)
+ except Exception, message:
+ log('Exception during bootstrap (%r):\n%s' % (serialize_url, ''.join(traceback.format_exception(*sys.exc_info()))))
+ else:
+ log('Opened %r: %r' % (serialize_url, page.read()))
+ # Let some time for zope to contact TIDStorage back and fill the gaps.
+ time.sleep(5)
+ known_storage_id_set = sets.ImmutableSet(tid_storage.dump().keys())
+ to_check_storage_id_set = target_storage_id_set - known_storage_id_set
+ if len(to_check_storage_id_set):
+ log('Bootstrap in progress... Mising storages: %r' % (to_check_storage_id_set, ))
+ # Retry a bit later
+ time.sleep(60)
+ if len(to_check_storage_id_set) == 0:
+ log('Bootstrap done (%i storages).' % (len(target_storage_id_set), ))
+ has_bootstraped = True
+ else:
+ has_bootstraped = True
+
+bootstrap_content = BootstrapContent()
+has_bootstraped = False
+can_bootstrap = True
+bootstrap_lock = threading.RLock()
+
+def doBootstrap():
+ acquired = bootstrap_lock.acquire(False)
+ if acquired:
+ try:
+ if not bootstrap_content.isAlive():
+ bootstrap_content.start()
+ finally:
+ bootstrap_lock.release()
+
+def log(message):
+ print >> sys.stdout, '%s: %s' % (time.asctime(), message)
+
+class PoliteThreadingTCPServer(SocketServer.ThreadingTCPServer):
+ def server_close(self):
+ # Make the port reusable for listening as soon as the socket closes.
+ self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
+ self.socket.shutdown(socket.SHUT_RDWR)
+ self.socket.close()
+
+def main(address, port):
+ server = PoliteThreadingTCPServer((address, port), TIDServer)
+ try:
+ try:
+ log('Server listening.')
+ server.serve_forever()
+ except KeyboardInterrupt:
+ log('Shuting down (received KeyboardInterrupt).')
+ finally:
+ global can_bootstrap
+ can_bootstrap = False
+ log('Waiting for clients to disconnect...')
+ server.server_close()
+
+log_file_set = sets.Set()
+
+class LogFile:
+ """
+ Loggin-to-file class.
+ Allows rotating file.
+ Can be used as stdout/stderr file: no write from any thread is lost during
+ rotation. There is an unavoidable (?) race condition if anything gets
+ raised by the rotating thread between "self._to_buffer = False" completion
+ and following log flush.
+ """
+ _file = None
+
+ def __init__(self, file_name):
+ self._lock = threading.RLock()
+ self._file_name = file_name
+ self._to_buffer = False
+ self._buffer = []
+ self._open()
+ log_file_set.add(self)
+
+ def _open(self):
+ self._file = open(self._file_name, 'a', 0)
+
+ def write(self, value):
+ self._lock.acquire()
+ try:
+ if self._to_buffer:
+ self._buffer.append(value)
+ else:
+ self._file.write(value)
+ finally:
+ self._lock.release()
+
+ def close():
+ self._lock.acquire()
+ try:
+ log_file_set.remove(self)
+ self._file.close()
+ self._file = None
+ finally:
+ self._lock.release()
+
+ def rotate(self):
+ self._lock.acquire()
+ try:
+ self._to_buffer = True
+ self._file.close()
+ self._open()
+ # XXX: race condition below if rotating stderr: Any exception thrown
+ # here will be out-of-order in resulting log.
+ self._to_buffer = False
+ self.write(''.join(self._buffer))
+ # End of race condition.
+ finally:
+ self._lock.release()
+
+def HUPHandler(signal_number, stack):
+ rotate_count = 0
+ log('Rotating logfiles...')
+ for log_file in log_file_set:
+ rotate_count += 1
+ log_file.rotate()
+ log('Logfiles rotated (%i).' % (rotate_count, ))
+
+def USR1Handler(signal_number, stack):
+ log(repr(tid_storage))
+
+def TERMHandler(signal_number, stack):
+ log('Received SIGTERM, exiting.')
+ raise KeyboardInterrupt, 'Killed by SIGTERM'
+
+def usage():
+ print """
+Usage: %(arg0)s [-h] [-n|--nofork|--fg] [-l|--log] [-p|--port] [-a|--address]
+ [--pidfile] [--user] [--group] [-s|--status-file] [-b|--burst-period]
+ [-F|--full-dump-period]
+
+ -h
+ Display this help.
+
+ -n
+ --nofork
+ --fg
+ Do not fork in background.
+
+ -l filename
+ --log filename
+ Log to given filename, instead of default %(logfile_name)s.
+
+ -p number
+ --port number
+ Listen to given port number, intead of default %(port)i.
+
+ -a address
+ --address address
+ Listen to interface runing given address, instead of default %(address)s.
+
+ --pidfile file_path
+ If forking, this file will contain the pid of forked process.
+ If this argument is not provided, pid is written to %(pidfile_name)s.
+
+ --user user_name
+ Run as specified user.
+ Also, retrieve user's group and run as this group.
+
+ --group group_name
+ Run as specified group.
+ If both --user and --group are specified, --group must come last.
+
+ -s file_name
+ --status-file file_name
+ Append stored TIDs to file.
+ See also "burst-period" and "full-dump-period".
+ If not provided, no dump ever happens.
+
+ -b seconds
+ --burst-period seconds
+ Defines the age of last write after which an incremental write can happen.
+ Such write only contain what changed since last write.
+ If not provided, no incremental write is done.
+
+ -F seconds
+ --full-dump-period seconds
+ How many seconds must separate complete dumps to status file.
+ Those writes contain the complete current state.
+ If both a full dump and an incremental write can happen, full dump takes
+ precedence.
+ If not provided, no full dump is done.
+
+ -c file_name
+ --config file_name
+ Use given file as options file.
+ It must be a python file. See sample_options.py for possible values.
+ If provided and if configuration file defines base_url and
+ known_tid_storage_identifier_dict variables, this program will cause
+ generation of all tids before first write to status file.
+""" % {'arg0': sys.argv[0],
+ 'logfile_name': Options.logfile_name,
+ 'pidfile_name': Options.pidfile_name,
+ 'port': Options.port,
+ 'address': Options.address}
+
+class Options:
+ port = 9001
+ address = '0.0.0.0'
+ logfile_name = 'tidstorage.log'
+ pidfile_name = 'tidstorage.pid'
+ fork = True
+ setuid = None
+ setgid = None
+ status_file = None
+ burst_period = None
+ full_dump_period = None
+ known_tid_storage_identifier_dict = {}
+ base_url = None
+
+config_file_name = None
+
+options = Options()
+
+try:
+ opts, args = getopt.getopt(sys.argv[1:],
+ 'hnfl:p:a:s:b:F:c:',
+ ['help', 'nofork', 'fg', 'log=', 'port=',
+ 'address=', 'pidfile=', 'user=', 'group=',
+ 'status-file=', 'burst-period=',
+ 'full-dump-period=', 'config='])
+except:
+ usage()
+ raise
+
+for opt, arg in opts:
+ if opt in ('-h', '--help'):
+ usage()
+ sys.exit()
+ elif opt in ('-n', '--fg', '--nofork'):
+ options.fork = False
+ elif opt in ('-l', '--log'):
+ options.logfile_name = arg
+ elif opt in ('-p', '--port'):
+ options.port = int(arg)
+ elif opt in ('-a', '--address'):
+ options.address = arg
+ elif opt == '--pidfile':
+ options.pidfile_name = arg
+ elif opt == '--user':
+ pw = pwd.getpwnam(arg)
+ options.setuid = pw.pw_uid
+ options.setgid = pw.pw_gid
+ elif opt == '--group':
+ options.setgid = grp.getgrnam(arg).gr_gid
+ elif opt in ('-s', '--status-file'):
+ options.status_file = arg
+ elif opt in ('-b', '--burst-period'):
+ options.burst_period = int(arg)
+ elif opt in ('-F', '--full-dump-period'):
+ options.full_dump_period = int(arg)
+ elif opt in ('-c', '--config'):
+ config_file_name = arg
+
+if config_file_name is not None:
+ config_file = os.path.splitext(os.path.basename(config_file_name))[0]
+ config_path = os.path.dirname(config_file_name)
+ if len(config_path):
+ config_path = [config_path]
+ else:
+ config_path = sys.path
+ file, path, description = imp.find_module(config_file, config_path)
+ module = imp.load_module(config_file, file, path, description)
+ file.close()
+ for option_id in [x for x in dir(Options) if x[:1] != '_']:
+ if option_id not in options.__dict__ and hasattr(module, option_id):
+ setattr(options, option_id, getattr(module, option_id))
+
+if options.logfile_name is not None:
+ options.logfile_name = os.path.abspath(options.logfile_name)
+if options.status_file is not None:
+ options.status_file = os.path.abspath(options.status_file)
+
+if options.setgid is not None:
+ os.setgid(options.setgid)
+
+if options.setuid is not None:
+ os.setuid(options.setuid)
+
+tid_storage = TIDStorage(tid_file_path=options.status_file,
+ burst_period=options.burst_period,
+ full_dump_period=options.full_dump_period)
+
+signal.signal(signal.SIGHUP, HUPHandler)
+signal.signal(signal.SIGUSR1, USR1Handler)
+signal.signal(signal.SIGTERM, TERMHandler)
+
+if options.fork:
+ pid = os.fork()
+ if pid == 0:
+ os.umask(027)
+ os.setsid()
+ pid = os.fork()
+ if pid == 0:
+ os.close(0)
+ os.close(1)
+ os.close(2)
+ os.open('/dev/null', os.O_RDWR)
+ os.dup2(0, 1)
+ os.dup2(0, 2)
+ sys.stdout = sys.stderr = LogFile(options.logfile_name)
+ os.chdir('/')
+ try:
+ main(options.address, options.port)
+ except:
+ # Log exception before returning.
+ log('Exception caught outside of "main". Previous log entries might ' \
+ 'be out of order because of this exception.\n%s' % (
+ ''.join(traceback.format_exception(*sys.exc_info())), ))
+ else:
+ log('Exited normaly.')
+ else:
+ pidfile = open(options.pidfile_name, 'w')
+ pidfile.write(str(pid))
+ pidfile.close()
+ os._exit(0)
+ else:
+ # TODO: monitor child startup to make it easier to use.
+ os._exit(0)
+else:
+ main(options.address, options.port)
+
Propchange: erp5/trunk/products/TIDStorage/bin/server.py
------------------------------------------------------------------------------
svn:executable = *
Added: erp5/trunk/products/TIDStorage/repozo/from_z2.8.8_repozo.diff
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/repozo/from_z2.8.8_repozo.diff?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/repozo/from_z2.8.8_repozo.diff (added)
+++ erp5/trunk/products/TIDStorage/repozo/from_z2.8.8_repozo.diff [utf8] Thu Nov 6 17:14:24 2008
@@ -1,0 +1,112 @@
+--- /home/vincent/bin/zope2.8/bin/repozo.py 2007-02-09 13:52:35.000000000 +0100
++++ repozo.py 2007-10-26 15:30:43.311046075 +0200
+@@ -50,6 +50,12 @@
+ Compress with gzip the backup files. Uses the default zlib
+ compression level. By default, gzip compression is not used.
+
++ -m / --max-tid
++ Stop at given TID when saving the Data.fs.
++
++ -M / --print-max-tid
++ Print the last saved transaction's tid.
++
+ Options for -R/--recover:
+ -D str
+ --date=str
+@@ -70,6 +76,7 @@
+ import time
+ import errno
+ import getopt
++import base64
+
+ from ZODB.FileStorage import FileStorage
+
+@@ -104,10 +111,11 @@
+ def parseargs():
+ global VERBOSE
+ try:
+- opts, args = getopt.getopt(sys.argv[1:], 'BRvhf:r:FD:o:Qz',
++ opts, args = getopt.getopt(sys.argv[1:], 'BRvhf:r:FD:o:Qzm:M',
+ ['backup', 'recover', 'verbose', 'help',
+ 'file=', 'repository=', 'full', 'date=',
+- 'output=', 'quick', 'gzip'])
++ 'output=', 'quick', 'gzip', 'max-tid=',
++ 'print-max-tid'])
+ except getopt.error, msg:
+ usage(1, msg)
+
+@@ -120,6 +128,8 @@
+ output = None # where to write recovered data; None = stdout
+ quick = False # -Q flag state
+ gzip = False # -z flag state
++ print_tid = False # -M flag state
++ max_tid = None # -m argument, if any
+
+ options = Options()
+
+@@ -150,6 +160,10 @@
+ options.output = arg
+ elif opt in ('-z', '--gzip'):
+ options.gzip = True
++ elif opt in ('-M', '--print-max-tid'):
++ options.print_tid = True
++ elif opt in ('-m', '--max-tid'):
++ options.max_tid = base64.decodestring(arg)
+ else:
+ assert False, (opt, arg)
+
+@@ -174,6 +188,12 @@
+ if options.file is not None:
+ log('--file option is ignored in recover mode')
+ options.file = None
++ if options.print_tid:
++ log('--print-max-tid is ignored in recover mode')
++ options.print_tid = False
++ if options.max_tid is not None:
++ log('--max-tid is ignored in recover mode')
++ options.max_tid = None
+ return options
+
+
+@@ -349,13 +369,19 @@
+
+ def do_full_backup(options):
+ # Find the file position of the last completed transaction.
+- fs = FileStorage(options.file, read_only=True)
++ fs = FileStorage(options.file, read_only=True, stop=options.max_tid)
+ # Note that the FileStorage ctor calls read_index() which scans the file
+ # and returns "the position just after the last valid transaction record".
+ # getSize() then returns this position, which is exactly what we want,
+ # because we only want to copy stuff from the beginning of the file to the
+ # last valid transaction record.
+ pos = fs.getSize()
++ if options.print_tid:
++ undo_log = fs.undoLog(last=-1)
++ if len(undo_log):
++ print >> sys.stdout, 'Last TID: %s' % (undo_log[0]['id'], )
++ else:
++ print >> sys.stderr, 'Cannot get latest TID'
+ fs.close()
+ options.full = True
+ dest = os.path.join(options.repository, gen_filename(options))
+@@ -375,13 +401,19 @@
+
+ def do_incremental_backup(options, reposz, repofiles):
+ # Find the file position of the last completed transaction.
+- fs = FileStorage(options.file, read_only=True)
++ fs = FileStorage(options.file, read_only=True, stop=options.max_tid)
+ # Note that the FileStorage ctor calls read_index() which scans the file
+ # and returns "the position just after the last valid transaction record".
+ # getSize() then returns this position, which is exactly what we want,
+ # because we only want to copy stuff from the beginning of the file to the
+ # last valid transaction record.
+ pos = fs.getSize()
++ if options.print_tid:
++ undo_log = fs.undoLog(last=-1)
++ if len(undo_log):
++ print >> sys.stdout, 'Last TID: %s' % (undo_log[0]['id'], )
++ else:
++ print >> sys.stderr, 'Cannot get latest TID'
+ fs.close()
+ options.full = False
+ dest = os.path.join(options.repository, gen_filename(options))
Added: erp5/trunk/products/TIDStorage/repozo/repozo_tidstorage.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/repozo/repozo_tidstorage.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/repozo/repozo_tidstorage.py (added)
+++ erp5/trunk/products/TIDStorage/repozo/repozo_tidstorage.py [utf8] Thu Nov 6 17:14:24 2008
@@ -1,0 +1,353 @@
+#!/usr/bin/python
+
+##############################################################################
+#
+# Copyright (c) 2007 Nexedi SARL. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+# Parts of this file are borrowed from Zope 2.8.8 repozo.py script.
+# Essentialy "usage" and "parseargs" methods.
+# So it's released under the ZPL v2.0, as is Zope 2.8.8 .
+
+""" repozo wrapper to backup for multiple Data.fs files in a consistent way.
+
+Usage: %(program)s [-h|--help] [-c|--config configuration_file]
+ [--repozo repozo_command] [-R|--recover|--recover_check]
+ [-H|--host address] [-p|--port port_number] [-u|--url formated_url]
+ [...]
+
+ -h
+ --help
+ Display this help and exit.
+
+ -c configuration_file
+ --config configuration_file
+ Use given file as configuration file.
+ It must be a python file. See sample_configuration.py for required values.
+ Recquired if neither -h nor --help are given.
+
+ --repozo repozo_command
+ Use given executable as repozo command.
+ Default: repozo.py
+
+ -R
+ --recover
+ Instead of saving existing Data.fs, perform an automated recovery from
+ backups + timestamp file.
+
+ --recover_check
+ Similar to above, except that it restores file to temp folder and compares
+ with existing file.
+ Files restored this way are automaticaly deleted after check.
+
+ -H address
+ --host address
+ TIDStorage server host address.
+ Overrides setting found in configuration_file.
+ Not required if recovering (see above).
+
+ -p port_number
+ --port port_number
+ TIDStorage port nuber.
+ Overrides setting found in configuration_file.
+ Not required if recovering (see above).
+
+ -u formated_url
+ --url formated_url
+ Zope base url, optionnaly with credentials.
+ Overrides setting found in configuration_file.
+ Not required if recovering (see above).
+
+ All others parameters are transmitted to repozo but are partly processed by
+ getopt. To transmit unprocessed parameters to repozo, pass them as an
+ argument.
+"""
+
+from ExchangeProtocol import ExchangeProtocol
+import socket
+import base64
+import imp
+import getopt
+import sys
+import os
+# urllib2 does not support (?) urls containing credentials
+# (http://login:password@...) but it's fine with urllib.
+from urllib import urlopen
+import traceback
+import md5
+import time
+import tempfile
+from struct import pack, unpack
+
+program = sys.argv[0]
+
+def log(message):
+ print message
+
+class TIDClient:
+ def __init__(self, address):
+ # TODO: handle diconnections nicely
+ self._address = address
+ self._socket = socket.socket()
+ self._socket.connect(address)
+ self._protocol_handler = ExchangeProtocol(socket=self._socket)
+
+ def __call__(self):
+ """
+ Return dict currently stored on the server.
+ """
+ self._protocol_handler.send_field('dump')
+ return self._protocol_handler.recv_dict()
+
+def backup(address, known_tid_storage_identifier_dict, repozo_formated_command, zope_formated_url=None):
+ connection = TIDClient(address)
+ to_load = known_tid_storage_identifier_dict.keys()
+ load_count = 2
+ while len(to_load):
+ if load_count < 1:
+ raise ValueError, 'It was impossible to retrieve all required TIDs. Missing: %s' (to_load, )
+ to_load = []
+ load_count -= 1
+ stored_tid_dict = connection()
+ #log(stored_tid_dict)
+ for key, (file_path, storage_path, object_path) in known_tid_storage_identifier_dict.iteritems():
+ if key not in stored_tid_dict and zope_formated_url is not None:
+ to_load.append(key)
+ if object_path is not None:
+ serialize_url = zope_formated_url % (object_path, )
+ log(serialize_url)
+ try:
+ response = urlopen(serialize_url)
+ except Exception, message:
+ # Prevent exceptions from interrupting the backup.
+ # We don't care about how well the web server is working, the only
+ # important thing is to get all TIDs in TIDStorage, and it's checked
+ # later.
+ log(''.join(traceback.format_exception(*sys.exc_info())))
+
+ backup_count = 0
+ total_count = len(known_tid_storage_identifier_dict)
+ for key, (file_path, storage_path, object_path) in known_tid_storage_identifier_dict.iteritems():
+ tid_as_int = stored_tid_dict[key] + 1
+ tid = base64.encodestring(pack('>Q', tid_as_int)).rstrip()
+ repozo_command = repozo_formated_command % (storage_path, file_path, tid)
+ if not os.access(storage_path, os.R_OK):
+ os.makedirs(storage_path)
+ log('Runing %r...' % (repozo_command, ))
+ status = os.system(repozo_command)
+ status = os.WEXITSTATUS(status)
+ if status == 0:
+ backup_count += 1
+ else:
+ log('Error occured while saving %s: exit status=%i' % (file_path, status))
+ log('Saved %i FileStorages out of %i.' % (backup_count, total_count))
+ return total_count - backup_count
+
+def get_md5_diggest(file_instance, length):
+ BLOCK_SIZE=512
+ file_instance.seek(0)
+ md5sum = md5.new()
+ read = file_instance.read
+ update = md5sum.update
+ while length > 0:
+ to_read = min(BLOCK_SIZE, length)
+ buffer = read(to_read)
+ if len(buffer) != to_read:
+ log('Warning: read %i instead of requiested %i, stopping read' % (len(buffer), to_read))
+ length = 0
+ else:
+ length -= to_read
+ update(buffer)
+ return md5sum.hexdigest()
+
+def recover(known_tid_storage_identifier_dict, repozo_formated_command, check=False):
+ recovered_count = 0
+ total_count = len(known_tid_storage_identifier_dict)
+ for key, (file_path, storage_path, object_path) in known_tid_storage_identifier_dict.iteritems():
+ if not os.access(storage_path, os.R_OK):
+ log('Warning: unable to recover %s because %s is missing/unreadable.' % (file_path, storage_path))
+ continue
+ if check:
+ original_file_path = file_path
+ file_path = os.path.join(tempfile.gettempdir(), os.path.basename(file_path))
+ repozo_command = repozo_formated_command % (storage_path, file_path)
+ status = os.system(repozo_command)
+ status = os.WEXITSTATUS(status)
+ if status == 0:
+ recovered_count += 1
+ else:
+ log('Error occured while recovering %s: exit status=%i' % (file_path, status))
+ if check:
+ log('Info: Comparing restored %s with original %s' % (file_path, original_file_path))
+ recovered_file = open(file_path, 'r')
+ original_file = open(original_file_path, 'r')
+ try:
+ recovered_file.seek(0, 2)
+ original_file.seek(0, 2)
+ recovered_file_length = recovered_file.tell()
+ original_file_length = original_file.tell()
+ checked_length = recovered_file_length
+ if recovered_file_length < original_file_length:
+ log('Info: Shorter than original: -%i bytes (-%.02f%%)' % \
+ (original_file_length - recovered_file_length,
+ 1 - (float(recovered_file_length) / original_file_length)))
+ elif recovered_file_length > original_file_length:
+ log('ERROR: Longer than original: +%i bytes (+%.02f%%). Was original packed since backup ?' % \
+ (recovered_file_length - original_file_length,
+ float(recovered_file_length) / original_file_length))
+ checked_length = None
+ if checked_length is not None:
+ recovered_file_diggest = get_md5_diggest(recovered_file, checked_length)
+ original_file_diggest = get_md5_diggest(original_file, checked_length)
+ if recovered_file_diggest != original_file_diggest:
+ log('ERROR: Recovered md5 does not match original: %s != %s.' % \
+ (recovered_file_diggest, original_file_diggest))
+ finally:
+ recovered_file.close()
+ original_file.close()
+ os.unlink(file_path)
+
+ log('Restored %i FileStorages out of %i.' % (recovered_count, total_count))
+ return total_count - recovered_count
+
+def usage(code, msg=''):
+ outfp = sys.stderr
+ if code == 0:
+ outfp = sys.stdout
+
+ print >> outfp, __doc__ % globals()
+ if msg:
+ print >> outfp, msg
+
+ sys.exit(code)
+
+def parseargs():
+ try:
+ opts, args = getopt.getopt(sys.argv[1:], 'vQr:FhzMRc:H:p:u:',
+ ['help', 'verbose', 'quick', 'full',
+ 'gzip', 'print-max-tid', 'repository',
+ 'repozo=', 'config=', 'host=', 'port=',
+ 'url=', 'recover', 'recover_check'])
+ except getopt.error, msg:
+ usage(1, msg)
+
+ class Options:
+ timestamp_file_path = None
+ repozo_file_name = 'repozo.py'
+ configuration_file_name = None
+ repozo_opts = ['-B']
+ host = None
+ port = None
+ base_url = None
+ known_tid_storage_identifier_dict = {}
+ recover = False
+ dry_run = False
+
+ options = Options()
+
+ if args:
+ options.repozo_opts.extend(args)
+
+ for opt, arg in opts:
+ if opt in ('-h', '--help'):
+ usage(0)
+ elif opt in ('-c', '--config'):
+ options.configuration_file_name = arg
+ elif opt == '--repozo':
+ options.repozo_file_name = arg
+ elif opt in ('-R', '--recover', '--recover_check'):
+ options.repozo_opts[0] = '-R'
+ options.recover = True
+ if opt == '--recover_check':
+ options.dry_run = True
+ elif opt in ('-H', '--host'):
+ options.host = arg
+ elif opt in ('-p', '--port'):
+ try:
+ options.port = int(port)
+ except ValueError, msg:
+ usage(1, msg)
+ elif opt in ('-u', '--url'):
+ options.url = arg
+ elif opt in ('-r', '--repository'):
+ options.repozo_opts.append('%s %s' % (opt, arg))
+ else:
+ options.repozo_opts.append(opt)
+
+ if options.configuration_file_name is None:
+ usage(1, 'Either -c or --config is required.')
+
+ configuration_filename, ext = os.path.splitext(os.path.basename(options.configuration_file_name))
+ configuration_path = os.path.dirname(options.configuration_file_name)
+ if len(configuration_path):
+ configuration_path = [configuration_path]
+ else:
+ configuration_path = sys.path
+ file, path, description = imp.find_module(configuration_filename, configuration_path)
+ module = imp.load_module(configuration_filename, file, path, description)
+ file.close()
+ try:
+ options.known_tid_storage_identifier_dict = module.known_tid_storage_identifier_dict
+ options.timestamp_file_path = module.timestamp_file_path
+ except AttributeError, msg:
+ usage(1, msg)
+ for option_id in ('port', 'host', 'base_url'):
+ if getattr(options, option_id) is None:
+ setattr(options, option_id, getattr(module, option_id, None))
+ # XXX: we do not check any option this way, it's too dangerous.
+ #options.repozo_opts.extend(getattr(module, 'repozo_opts', []))
+ if options.port is None:
+ options.port = 9001
+
+ if options.host is None:
+ usage(1, 'Either -H or --host is required (or host value should be set in configuration file).')
+
+ return options
+
+options = parseargs()
+address = (options.host, options.port)
+zope_formated_url = options.base_url
+if options.base_url is not None and '%s' not in zope_formated_url:
+ raise ValueError, 'Given base url (%r) is not properly formated, it must contain one \'%%s\'.' % (zope_formated_url, )
+repozo_formated_command = '%s %s -r "%%s"' % (options.repozo_file_name, ' '.join(options.repozo_opts))
+if options.recover:
+ timestamp_file = open(options.timestamp_file_path, 'r')
+ timestamp = ''
+ read_line = ' '
+ while len(read_line):
+ timestamp = read_line
+ read_line = timestamp_file.readline()
+ timestamp = timestamp.strip('\r\n \t')
+ if timestamp is not None:
+ repozo_formated_command += ' -o "%%s" -D %s' % (timestamp, )
+ result = recover(
+ known_tid_storage_identifier_dict=options.known_tid_storage_identifier_dict,
+ repozo_formated_command=repozo_formated_command,
+ check=options.dry_run)
+else:
+ repozo_formated_command += ' -f "%s" -m "%s"'
+ result = backup(
+ address=address,
+ known_tid_storage_identifier_dict=options.known_tid_storage_identifier_dict,
+ zope_formated_url=zope_formated_url,
+ repozo_formated_command=repozo_formated_command)
+ if result == 0:
+ # Paranoid mode:
+ # Issue a system-wide "sync" command to make sure all files which were saved
+ # are really present on disk.
+ os.system('sync')
+ timestamp_file = open(options.timestamp_file_path, 'a', 0)
+ try:
+ # Borrowed from repozo.
+ timestamp_file.write('\n%04d-%02d-%02d-%02d-%02d-%02d' % time.gmtime()[:6])
+ finally:
+ timestamp_file.close()
+
+sys.exit(result)
Propchange: erp5/trunk/products/TIDStorage/repozo/repozo_tidstorage.py
------------------------------------------------------------------------------
svn:executable = *
Added: erp5/trunk/products/TIDStorage/repozo/restore_tidstorage.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/repozo/restore_tidstorage.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/repozo/restore_tidstorage.py (added)
+++ erp5/trunk/products/TIDStorage/repozo/restore_tidstorage.py [utf8] Thu Nov 6 17:14:24 2008
@@ -1,0 +1,188 @@
+#!/usr/bin/python
+
+##############################################################################
+#
+# Copyright (c) 2007 Nexedi SARL. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+# Parts of this file are borrowed from Zope 2.8.8 repozo.py script.
+# Essentialy "usage", "parseargs" and parts of "restore" methods.
+# So it's released under the ZPL v2.0, as is Zope 2.8.8 .
+
+"""
+Usage: %(program)s [-h|--help] [-c|--config configuration_file]
+
+ -h
+ --help
+ Display this help and exit.
+
+ -c configuration_file
+ --config configuration_file
+ Use given file as configuration file.
+ It must be a python file.
+ Recquired if neither -h nor --help are given.
+"""
+
+import imp
+import getopt
+import sys
+import os
+# urllib2 does not support (?) urls containing credentials
+# (http://login:password@...) but it's fine with urllib.
+from struct import pack
+import shutil
+from ZODB.FileStorage import FileStorage
+
+program = sys.argv[0]
+
+def log(message):
+ print message
+
+def parse(status_file):
+ tid_log = open(status_file)
+ content = {}
+ last_timestamp = None
+
+ line = tid_log.readline()
+ while line != '':
+ split_line = line.split(' ', 2)
+ assert len(split_line) == 3, repr(split_line)
+ line_timestamp, line_type, line_dict = split_line
+ line_timestamp = float(line_timestamp)
+ assert line_type in ('f', 'd'), repr(line_type)
+ if last_timestamp is None:
+ last_timestamp = line_timestamp
+ else:
+ assert last_timestamp < line_timestamp, '%r < %r' % (last_timestamp, line_timestamp)
+ line_dict = eval(line_dict, None)
+ assert isinstance(line_dict, dict), type(line_dict)
+ assert len(line_dict), repr(line_dict)
+ if line_type == 'd':
+ for key, value in line_dict.iteritems():
+ if key in content:
+ assert content[key] < value, '%r < %r' % (content[key], value)
+ content[key] = value
+ elif line_type == 'f':
+ for key, value in content.iteritems():
+ assert key in line_dict, repr(key)
+ assert value <= line_dict[key], '%r <= %r' % (value, line_dict[key])
+ content = line_dict
+ line = tid_log.readline()
+ return content
+
+READCHUNK = 10 * 1024 * 1024
+
+def recover(data_fs_backup_path_dict, status_file):
+ last_tid_dict = parse(status_file)
+ for storage_id, (file_path, backup_path) in data_fs_backup_path_dict.iteritems():
+ # Derived from repozo (function=do_full_backup)
+ # TODO: optimise to read backup only once.
+ can_restore = False
+ if os.path.exists(backup_path):
+ if os.path.exists(file_path):
+ print 'Both original and backup files exist for %r. If previous restoration was successful, you should delete the backup for this restoration to take place. Original: %r Backup: %r' % (storage_id, file_path, backup_path)
+ else:
+ print 'Only backup file is available for %r: %r. Assuming it\'s ok and restoring to %r' % (storage_id, backup_path, file_path)
+ can_restore = True
+ else:
+ if os.path.exists(file_path):
+ sys.stdout.write('Copying %r to %r... ' % (file_path, backup_path))
+ shutil.copy(file_path, backup_path)
+ initial_size = stat(file_path).st_size
+ final_size = stat(backup_path).st_size
+ if initial_size == final_size:
+ can_restore = True
+ print 'Done.'
+ else:
+ print 'Backup size %i differs from original size %i. Is the original file (%r) still in use ? Is there enough free disk space at destination (%r) ?' % (final_size, initial_size, file_path, backup_path)
+ else:
+ print 'Cannot find any file for %r: %r and %r do not exist.' % (storage_id, file_path, backup_path)
+ if can_restore:
+ last_tid = last_tid_dict[storage_id] + 1
+ tid = pack('>Q', last_tid)
+ # Find the file position of the last completed transaction.
+ fs = FileStorage(backup_path, read_only=True, stop=tid)
+ # Note that the FileStorage ctor calls read_index() which scans the file
+ # and returns "the position just after the last valid transaction record".
+ # getSize() then returns this position, which is exactly what we want,
+ # because we only want to copy stuff from the beginning of the file to the
+ # last valid transaction record.
+ pos = fs.getSize()
+ fs.close()
+ print 'Restoring backup: %s bytes (transaction %r) from %s to %s' % (pos, tid, backup_path, file_path)
+ source_file = open(backup_path, 'rb')
+ destination_file = open(file_path, 'wb')
+ while pos:
+ todo = min(READCHUNK, pos)
+ data = source_file.read(todo)
+ if not data:
+ print 'Unexpected end of data stream (should contain %i more bytes)' % (pos, )
+ break
+ destination_file.write(data)
+ pos -= len(data)
+ destination_file.close()
+ source_file.close()
+ else:
+ print 'Skipping restoration of %r (%r).' % (file_path, storage_id)
+
+def usage(code, msg=''):
+ outfp = sys.stderr
+ if code == 0:
+ outfp = sys.stdout
+
+ print >> outfp, __doc__ % globals()
+ if msg:
+ print >> outfp, msg
+
+ sys.exit(code)
+
+def parseargs():
+ try:
+ opts, args = getopt.getopt(sys.argv[1:], 'hc:',
+ ['help', 'config='])
+ except getopt.error, msg:
+ usage(1, msg)
+
+ class Options:
+ configuration_file_name = None
+ status_file = None
+
+ options = Options()
+
+ for opt, arg in opts:
+ if opt in ('-h', '--help'):
+ usage(0)
+ elif opt in ('-c', '--config'):
+ options.configuration_file_name = arg
+
+ if options.configuration_file_name is None:
+ usage(1, 'Either -c or --config is required.')
+
+ configuration_filename, ext = os.path.splitext(os.path.basename(options.configuration_file_name))
+ configuration_path = os.path.dirname(options.configuration_file_name)
+ if len(configuration_path):
+ configuration_path = [configuration_path]
+ else:
+ configuration_path = sys.path
+ file, path, description = imp.find_module(configuration_filename, configuration_path)
+ module = imp.load_module(configuration_filename, file, path, description)
+ file.close()
+ try:
+ options.data_fs_backup_path_dict = module.data_fs_backup_path_dict
+ options.status_file = module.status_file
+ except AttributeError, msg:
+ usage(1, msg)
+ return options
+
+options = parseargs()
+recover(
+ data_fs_backup_path_dict=options.data_fs_backup_path_dict,
+ status_file=options.status_file)
+
Propchange: erp5/trunk/products/TIDStorage/repozo/restore_tidstorage.py
------------------------------------------------------------------------------
svn:executable = *
Added: erp5/trunk/products/TIDStorage/repozo/sample_configuration.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/repozo/sample_configuration.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/repozo/sample_configuration.py (added)
+++ erp5/trunk/products/TIDStorage/repozo/sample_configuration.py [utf8] Thu Nov 6 17:14:24 2008
@@ -1,0 +1,35 @@
+# COMMON
+# This part is used both by server_v2.py and repozo_tidstorage_v2.py
+known_tid_storage_identifier_dict = {
+ "((('localhost', 8200),), '2')":
+ ('/home/vincent/zeo2/var2/Data.fs',
+ '/home/vincent/tmp/repozo/z22',
+ 'foo_test'),
+ "((('localhost', 8200),), '1')":
+ ('/home/vincent/zeo2/var/Data.fs',
+ '/home/vincent/tmp/repozo/z21',
+ 'bar_test'),
+ "((('localhost', 8100),), '1')":
+ ('/home/vincent/zeo1/var/Data.fs',
+ '/home/vincent/tmp/repozo/z11',
+ 'baz_test'),
+}
+base_url = 'http://localhost:5080/erp5/%s/modifyContext'
+port = 9001
+host = '127.0.0.1'
+
+# SERVER
+# This part is only used by server_v2.py
+#logfile_name = 'tidstorage.log'
+#pidfile_name = 'tidstorage.pid'
+#fork = False
+#setuid = None
+#setgid = None
+status_file = 'tidstorage.tid'
+burst_period = 30
+full_dump_period = 300
+
+# REPOZO_TIDSTORAGE
+# This part is only used by repozo_tidstorage_v2.py
+timestamp_file_path = 'repozo_tidstorage_timestamp.log'
+
Added: erp5/trunk/products/TIDStorage/tests/testTIDServer.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/tests/testTIDServer.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/tests/testTIDServer.py (added)
+++ erp5/trunk/products/TIDStorage/tests/testTIDServer.py [utf8] Thu Nov 6 17:14:24 2008
@@ -1,0 +1,256 @@
+#!/usr/bin/python
+
+from ExchangeProtocol import ExchangeProtocol
+import traceback
+import socket
+import sys
+
+assert len(sys.argv) == 3, 'Requires exactly 2 arguments: <address> <port>'
+
+address = sys.argv[1]
+port = int(sys.argv[2])
+
+class TIDClient:
+ def __init__(self, address):
+ self._to_server = socket.socket()
+ self._to_server.connect(address)
+ self._exchange_protocol = ExchangeProtocol(self._to_server)
+
+ def _dump(self, test_id=None):
+ self._exchange_protocol.send_field('dump')
+ received_dict = self._exchange_protocol.recv_dict()
+ if test_id is None:
+ result = received_dict
+ else:
+ id_len = len(test_id) + 1 # Add 1 to strip underscore.
+ result = dict([(key[id_len:], value) \
+ for key, value in received_dict.iteritems() \
+ if key.startswith(test_id)])
+ return dict([(key, int(value)) for key, value in result.iteritems()])
+
+ def dump(self, test_id):
+ return self._dump(test_id=test_id)
+
+ def dump_all(self):
+ return self._dump()
+
+ def begin(self, test_id, transaction_id, storage_id_list):
+ self._exchange_protocol.send_field('begin')
+ self._exchange_protocol.send_field(transaction_id)
+ internal_storage_id_list = ['%s_%s' % (test_id, x) \
+ for x in storage_id_list]
+ self._exchange_protocol.send_list(internal_storage_id_list)
+
+ def abort(self, test_id, transaction_id):
+ self._exchange_protocol.send_field('abort')
+ self._exchange_protocol.send_field(transaction_id)
+
+ def commit(self, test_id, transaction_id, storage_tid_dict):
+ self._exchange_protocol.send_field('commit')
+ self._exchange_protocol.send_field(transaction_id)
+ internal_storage_tid_dict = {}
+ for key, value in storage_tid_dict.iteritems():
+ internal_storage_tid_dict['%s_%s' % (test_id, key)] = value
+ self._exchange_protocol.send_dict(internal_storage_tid_dict)
+
+class TestTIDServerV2:
+ def __init__(self, address, port):
+ self._client = TIDClient((address, port))
+
+ def assertEqual(self, value, target):
+ assert value == target, 'value %r does not match target %r' % (value, target)
+
+ def testInitialValue(self, test_id):
+ """
+ Check that the storage is empty
+ """
+ self.assertEqual(self._client.dump_all(), {})
+
+ def testScenario1(self, test_id):
+ """
+ Simple begin - commit case.
+ """
+ storage_tid_dict = {'s1': 1}
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.begin(test_id, 't1', storage_tid_dict.keys())
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.commit(test_id, 't1', storage_tid_dict)
+ self.assertEqual(self._client.dump(test_id), storage_tid_dict)
+
+ def testScenario2(self, test_id):
+ """
+ Simple begin - abort case.
+ """
+ storage_tid_dict = {'s1': 1}
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.begin(test_id, 't1', storage_tid_dict.keys())
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.abort(test_id, 't1')
+ self.assertEqual(self._client.dump(test_id), {})
+
+ def testScenario3(self, test_id):
+ """
+ 2 concurent transactions impacting a common storage.
+ Second transaction begins after first does, and commits before
+ first does.
+ """
+ t1_storage_tid_dict = {'s1': 1, 's2': 1}
+ t2_storage_tid_dict = {'s1': 2, 's3': 1}
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.commit(test_id, 't2', t2_storage_tid_dict)
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.commit(test_id, 't1', t1_storage_tid_dict)
+ self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 1, 's3': 1})
+
+ def testScenario4(self, test_id):
+ """
+ 3 concurent transactions.
+ Transactions 1 and 2 impact same storage s1.
+ Transaction 3 impacts storage s3 after transaction 2 commited.
+ Still, as storage 3 was part of a non-commitable-yet transaction,
+ it must not be commited untill all blockable (here, t1) transaction have
+ ended.
+ """
+ t1_storage_tid_dict = {'s1': 1, 's2': 2}
+ t2_storage_tid_dict = {'s1': 2, 's3': 1}
+ t3_storage_tid_dict = {'s3': 1}
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.commit(test_id, 't2', t2_storage_tid_dict)
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.begin(test_id, 't3', t3_storage_tid_dict.keys())
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.commit(test_id, 't3', t3_storage_tid_dict)
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.commit(test_id, 't1', t1_storage_tid_dict)
+ self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 2, 's3': 1})
+
+ def testScenario4bis(self, test_id):
+ """
+ 3 concurent transactions.
+ Transactions 1 and 2 impact same storage s1.
+ Transaction 3 impacts storage s3 after transaction 2 commited.
+ Still, as storage 3 was part of a non-commitable-yet transaction,
+ it must not be commited untill all blockable (here, t1) transaction have
+ ended.
+ In this version, t1 aborts: for example, tpc_vote failed. As the data
+ was already sent to storage, it might be already present on disk (and
+ anyway, tid is not to be used anymore), so it's valid for t2 to commit
+ with tid 2 even if t1 aborted tid 1.
+ """
+ t1_storage_tid_dict = {'s1': 1, 's2': 2}
+ t2_storage_tid_dict = {'s1': 2, 's3': 1}
+ t3_storage_tid_dict = {'s3': 1}
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.commit(test_id, 't2', t2_storage_tid_dict)
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.begin(test_id, 't3', t3_storage_tid_dict.keys())
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.commit(test_id, 't3', t3_storage_tid_dict)
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.abort(test_id, 't1')
+ self.assertEqual(self._client.dump(test_id), {'s1': 2, 's3': 1})
+
+ def testScenario5(self, test_id):
+ """
+ 2 concurent transactions impacting a common storage.
+ Second transaction begins after first does, and commits after
+ first does.
+ """
+ t1_storage_tid_dict = {'s1': 2}
+ t2_storage_tid_dict = {'s1': 1, 's2': 1}
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.commit(test_id, 't1', t1_storage_tid_dict)
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.commit(test_id, 't2', t2_storage_tid_dict)
+ self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 1})
+
+ def testScenario6(self, test_id):
+ """
+ 2 concurent transactions impacting separate sets of storages.
+ Check that the first commit impacts dump data immediately.
+ """
+ t1_storage_tid_dict = {'s1': 1}
+ t2_storage_tid_dict = {'s2': 1}
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.commit(test_id, 't1', t1_storage_tid_dict)
+ self.assertEqual(self._client.dump(test_id), {'s1': 1})
+ self._client.commit(test_id, 't2', t2_storage_tid_dict)
+ self.assertEqual(self._client.dump(test_id), {'s1': 1, 's2': 1})
+
+ def testScenario7(self, test_id):
+ """
+ 3 concurent transactions.
+ t1 and t2 impact a set of different storages.
+ t3 impacts a set of storage containing the ones from t1 and the ones
+ from t2.
+ Check that nothing impacts dump data until everything is commited.
+ """
+ t1_storage_tid_dict = {'s1': 1}
+ t2_storage_tid_dict = {'s2': 2}
+ t3_storage_tid_dict = {'s1': 2, 's2': 2}
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.begin(test_id, 't3', t3_storage_tid_dict.keys())
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.commit(test_id, 't1', t1_storage_tid_dict)
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.commit(test_id, 't2', t2_storage_tid_dict)
+ self.assertEqual(self._client.dump(test_id), {})
+ self._client.commit(test_id, 't3', t3_storage_tid_dict)
+ self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 2})
+
+ def testScenario8(self, test_id):
+ """
+ Simple increase case.
+ """
+ self.assertEqual(self._client.dump(test_id), {})
+ t1_storage_tid_dict = {}
+ for s1_value in (1, 2):
+ previous_t1_storage_tid_dict = t1_storage_tid_dict
+ t1_storage_tid_dict = {'s1': s1_value}
+ self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
+ self.assertEqual(self._client.dump(test_id), previous_t1_storage_tid_dict)
+ self._client.commit(test_id, 't1', t1_storage_tid_dict)
+ self.assertEqual(self._client.dump(test_id), t1_storage_tid_dict)
+
+ def run(self):
+ for test_method_id in [x for x in dir(self) if x.startswith('test')]:
+ self.log("Runing %s..." % (test_method_id, ))
+ try:
+ try:
+ getattr(self, test_method_id)(test_id=test_method_id)
+ except AssertionError:
+ self.log('F\n')
+ self.log('\n'.join(traceback.format_exception(*sys.exc_info())))
+ finally:
+ self.log('\n')
+
+ def log(self, message):
+ sys.stdout.write(message)
+
+test = TestTIDServerV2(address, port)
+test.run()
+
Propchange: erp5/trunk/products/TIDStorage/tests/testTIDServer.py
------------------------------------------------------------------------------
svn:executable = *
Added: erp5/trunk/products/TIDStorage/transaction_transaction.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/transaction_transaction.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/transaction_transaction.py (added)
+++ erp5/trunk/products/TIDStorage/transaction_transaction.py [utf8] Thu Nov 6 17:14:24 2008
@@ -1,0 +1,254 @@
+############################################################################
+#
+# Copyright (c) 2007, 2008 Nexedi SARL and Contributors. All Rights Reserved.
+# Vincent Pelletier <vincent at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+from ExchangeProtocol import ExchangeProtocol
+from transaction._transaction import Transaction
+from zLOG import LOG, WARNING
+import socket
+import thread
+import struct
+import sys
+
+GET_LAST_COMMITED_TID_METHOD_ID = 'getLastCommitedTID'
+TID_STORAGE_ADDRESS = ('127.0.0.1', 9001)
+
+tid_storage = None
+zope_identifier = None
+
+# Borrowed from CMFActivity.ActivityTool.getCurrentNode
+def getZopeId():
+ """ Return current node in form ip:port """
+ global zope_identifier
+ if zope_identifier is None:
+ port = ''
+ from asyncore import socket_map
+ for k, v in socket_map.items():
+ if hasattr(v, 'port'):
+ # see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
+ type = str(getattr(v, '__class__', 'unknown'))
+ if type == 'ZServer.HTTPServer.zhttp_server':
+ port = v.port
+ break
+ assert port != '', 'zhttp_server not started yet'
+ ip = socket.gethostbyname(socket.gethostname())
+ if TID_STORAGE_ADDRESS[0] != '127.0.0.1':
+ assert ip != '127.0.0.1', 'self address must not be 127.0.0.1 if TIDStorage is remote'
+ zope_identifier = '%s:%s' %(ip, port)
+ return zope_identifier
+
+def getFilestorageList(resource_list):
+ return getFilestorageToTIDMapping(resource_list).keys()
+
+def getFilestorageToTIDMapping(resource_list):
+ datafs_tid_update_dict = {}
+ for resource in resource_list:
+ storage = getattr(resource, '_storage', None)
+ if storage is not None:
+ getLastCommitedTID = getattr(storage, GET_LAST_COMMITED_TID_METHOD_ID,
+ None)
+ if getLastCommitedTID is not None:
+ tid = getLastCommitedTID()
+ _addr = tuple([tuple(x) for x in getattr(storage, '_addr', [])])
+ _storage = getattr(storage, '_storage', '')
+ datafs_id = repr((_addr, _storage))
+ assert datafs_id not in datafs_tid_update_dict
+ if tid is None:
+ datafs_tid_update_dict[datafs_id] = None
+ else:
+ # unpack stolen from ZODB/utils.py:u64
+ datafs_tid_update_dict[datafs_id] = struct.unpack(">Q", tid)[0]
+ return datafs_tid_update_dict
+
+class BufferedSocket:
+ """
+ Write-only thread-safe buffered socket.
+ Attemps to reconnect at most once per flush.
+ """
+
+ _socket_lock = thread.allocate_lock()
+ _connected = False
+
+ def __init__(self, address):
+ self._socket = socket.socket()
+ self._address = address
+ self._send_buffer_dict = {}
+
+ def _connect(self):
+ try:
+ self._socket.connect(self._address)
+ self._notifyConnected()
+ except socket.error, message:
+ # We don't want to have an error line per failed connection attemp, to
+ # avoid flooding the logfile.
+ pass
+
+ def _getSendBuffer(self, ident):
+ send_buffer = self._send_buffer_dict.get(ident)
+ if send_buffer is None:
+ send_buffer = self._send_buffer_dict[ident] = []
+ return send_buffer
+
+ def _notifyDisconnected(self, message):
+ if self._connected:
+ self._connected = False
+ LOG('TIDStorage', WARNING, 'Disconnected: %s' % (message, ))
+
+ def _notifyConnected(self):
+ if not self._connected:
+ self._connected = True
+ # Display a log message at WARNING level, so that reconnection message
+ # are visible when disconnection messages are visible, even if it is
+ # not a warning, properly speaking.
+ LOG('TIDStorage', WARNING, 'Connected')
+
+ def send(self, to_send):
+ send_buffer = self._getSendBuffer(thread.get_ident())
+ send_buffer.append(to_send)
+
+ def flush(self):
+ """
+ Flush send buffer and actually send data, with extra checks to behave
+ nicely if connection is broken.
+ Do not retry to send if something goes wrong (data is then lost !).
+ Here, most important thing is speed, not data.
+ Serialize usage.
+ """
+ ident = thread.get_ident()
+ self._socket_lock.acquire()
+ try:
+ if not self._connected:
+ self._connect()
+ if self._connected:
+ try:
+ self._socket.sendall(''.join(self._getSendBuffer(ident)))
+ except socket.error, message:
+ self._notifyDisconnected(message)
+ try:
+ self._socket.shutdown(socket.SHUT_RDWR)
+ except socket.error:
+ self._socket.close()
+ self._socket = socket.socket()
+ finally:
+ self._socket_lock.release()
+ self._send_buffer_dict[ident] = []
+
+class TIDClient:
+ def __init__(self, address):
+ self._buffered_socket = BufferedSocket(address)
+ self._field_exchange = ExchangeProtocol(socket=self._buffered_socket)
+
+ def commit(self, tid_update_dict):
+ """
+ Send given dict to TIDStorage server.
+ """
+ self._send_command('commit')
+ self._field_exchange.send_dict(tid_update_dict)
+ self._buffered_socket.flush()
+
+ def begin(self, storage_id_list):
+ """
+ Inform TIDStorage connection tracking that commit was initiated.
+ """
+ self._send_command('begin')
+ self._field_exchange.send_list(storage_id_list)
+ self._buffered_socket.flush()
+
+ def abort(self):
+ """
+ Inform TIDStorage connection tracking that commit was aborted.
+ """
+ self._send_command('abort')
+ self._buffered_socket.flush()
+
+ def _send_command(self, command):
+ """
+ Every command must be followed by an identifier.
+ This identifier is used to track transactions, so the same identifier
+ must not be used twice at the same time, but can be reused later.
+ """
+ self._field_exchange.send_field(command)
+ self._field_exchange.send_field('%s_%x' % (getZopeId(), thread.get_ident()))
+
+original__commitResources = Transaction._commitResources
+def _commitResources(self, *args, **kw):
+ """
+ Hook Transaction's _commitResources.
+
+ Before:
+ - Initialise TIDClient if needed
+ - Check if there is any storage we are interested in in current commit
+ - If so, issue a begin
+
+ After (2 cases):
+ - original__commitResources raised:
+ - Issue an abort
+ - otherwise:
+ - Issue a commit
+
+ Note to editors: Prevent your code from raising anything ! This method
+ MUST NOT raise any exception, except that it MUST NOT hide any exception
+ raised by original__commitResources.
+ """
+ has_storages = False
+ try:
+ global tid_storage
+ if tid_storage is None:
+ tid_storage = TIDClient(TID_STORAGE_ADDRESS)
+ filestorage_list = getFilestorageList(self._resources)
+ if len(filestorage_list):
+ has_storages = True
+ tid_storage.begin(filestorage_list)
+ except:
+ LOG('TIDStorage _commitResources', WARNING, 'Exception in begin phase', error=sys.exc_info())
+ try:
+ result = original__commitResources(self, *args, **kw)
+ except:
+ if has_storages:
+ exception = sys.exc_info()
+ try:
+ tid_storage.abort()
+ except:
+ LOG('TIDStorage _commitResources', WARNING, 'Exception in abort phase', error=sys.exc_info())
+ # Re-raise original exception, in case sendTIDCommitAbort tainted
+ # last exception value.
+ raise exception[0], exception[1], exception[2]
+ else:
+ raise
+ else:
+ if has_storages:
+ # Now that everything has been commited, all exceptions relative to added
+ # code must be swalowed (but still reported) to avoid confusing transaction
+ # system.
+ try:
+ tid_storage.commit(getFilestorageToTIDMapping(self._resources))
+ except:
+ LOG('TIDStorage _commitResources', WARNING, 'Exception in commit phase', error=sys.exc_info())
+ return result
+
+Transaction._commitResources = _commitResources
+
Added: erp5/trunk/products/TIDStorage/utils/check_tid_log.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/utils/check_tid_log.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/utils/check_tid_log.py (added)
+++ erp5/trunk/products/TIDStorage/utils/check_tid_log.py [utf8] Thu Nov 6 17:14:24 2008
@@ -1,0 +1,71 @@
+#!/usr/bin/python
+##############################################################################
+#
+# Copyright (c) 2008 Nexedi SARL and Contributors. All Rights Reserved.
+# Vincent Pelletier <vincent at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+# Checks the sanity of a tidstorage TID log provided via stdin.
+# Exit status:
+# 0 Success
+# 1 Failure
+# Error is displayed on stderr.
+# On success, final tid values for each storage is displayed on stdout.
+
+import sys
+content = {}
+last_timestamp = None
+
+line = sys.stdin.readline()
+while line != '':
+ split_line = line.split(' ', 2)
+ assert len(split_line) == 3, repr(split_line)
+ line_timestamp, line_type, line_dict = split_line
+ line_timestamp = float(line_timestamp)
+ assert line_type in ('f', 'd'), repr(line_type)
+ if last_timestamp is None:
+ last_timestamp = line_timestamp
+ else:
+ assert last_timestamp < line_timestamp, '%r < %r' % (last_timestamp, line_timestamp)
+ line_dict = eval(line_dict, None)
+ assert isinstance(line_dict, dict), type(line_dict)
+ assert len(line_dict), repr(line_dict)
+ if line_type == 'd':
+ for key, value in line_dict.iteritems():
+ if key in content:
+ assert content[key] < value, '%r < %r' % (content[key], value)
+ content[key] = value
+ elif line_type == 'f':
+ for key, value in content.iteritems():
+ assert key in line_dict, repr(key)
+ assert value <= line_dict[key], '%r <= %r' % (value, line_dict[key])
+ content = line_dict
+ line = sys.stdin.readline()
+
+key_list = content.keys()
+key_list.sort()
+for key in key_list:
+ print '%r %r' % (key, content[key])
+
Propchange: erp5/trunk/products/TIDStorage/utils/check_tid_log.py
------------------------------------------------------------------------------
svn:executable = *
Added: erp5/trunk/products/TIDStorage/utils/tid_log_to_full.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/utils/tid_log_to_full.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/utils/tid_log_to_full.py (added)
+++ erp5/trunk/products/TIDStorage/utils/tid_log_to_full.py [utf8] Thu Nov 6 17:14:24 2008
@@ -1,0 +1,69 @@
+#!/usr/bin/python
+##############################################################################
+#
+# Copyright (c) 2008 Nexedi SARL and Contributors. All Rights Reserved.
+# Vincent Pelletier <vincent at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+# Transforms a TIDStorage TID log provided on stdin, which might contain
+# full statuses and/or incremental changes, and provides a version on stdout
+# containing only full statuses.
+# Also, does sanity checks on the given file.
+# Exit status:
+# 0 Success
+# 1 Failure
+
+import sys
+content = {}
+last_timestamp = None
+
+line = sys.stdin.readline()
+while line != '':
+ split_line = line.split(' ', 2)
+ assert len(split_line) == 3, repr(split_line)
+ line_timestamp, line_type, line_dict = split_line
+ line_timestamp = float(line_timestamp)
+ assert line_type in ('f', 'd'), repr(line_type)
+ if last_timestamp is None:
+ last_timestamp = line_timestamp
+ else:
+ assert last_timestamp < line_timestamp, '%r < %r' % (last_timestamp, line_timestamp)
+ line_dict = eval(line_dict, None)
+ assert isinstance(line_dict, dict), type(line_dict)
+ assert len(line_dict), repr(line_dict)
+ if line_type == 'd':
+ for key, value in line_dict.iteritems():
+ if key in content:
+ assert content[key] < value, '%r < %r' % (content[key], value)
+ content[key] = value
+ print '%r f %r' % (line_timestamp, content)
+ elif line_type == 'f':
+ for key, value in content.iteritems():
+ assert key in line_dict, repr(key)
+ assert value <= line_dict[key], '%r <= %r' % (value, line_dict[key])
+ content = line_dict
+ print line.strip()
+ line = sys.stdin.readline()
+
Propchange: erp5/trunk/products/TIDStorage/utils/tid_log_to_full.py
------------------------------------------------------------------------------
svn:executable = *
More information about the Erp5-report
mailing list