[Erp5-report] r24513 - in /erp5/trunk/products/TIDStorage: ./ bin/ repozo/ tests/ utils/

nobody at svn.erp5.org nobody at svn.erp5.org
Thu Nov 6 17:14:26 CET 2008


Author: vincent
Date: Thu Nov  6 17:14:24 2008
New Revision: 24513

URL: http://svn.erp5.org?rev=24513&view=rev
Log:
Initial import of TIDStorage product.

Added:
    erp5/trunk/products/TIDStorage/
    erp5/trunk/products/TIDStorage/ExchangeProtocol.py
    erp5/trunk/products/TIDStorage/README
    erp5/trunk/products/TIDStorage/ZEOClientStorage.py
    erp5/trunk/products/TIDStorage/__init__.py
    erp5/trunk/products/TIDStorage/bin/
    erp5/trunk/products/TIDStorage/bin/server.py   (with props)
    erp5/trunk/products/TIDStorage/repozo/
    erp5/trunk/products/TIDStorage/repozo/from_z2.8.8_repozo.diff
    erp5/trunk/products/TIDStorage/repozo/repozo_tidstorage.py   (with props)
    erp5/trunk/products/TIDStorage/repozo/restore_tidstorage.py   (with props)
    erp5/trunk/products/TIDStorage/repozo/sample_configuration.py
    erp5/trunk/products/TIDStorage/tests/
    erp5/trunk/products/TIDStorage/tests/testTIDServer.py   (with props)
    erp5/trunk/products/TIDStorage/transaction_transaction.py
    erp5/trunk/products/TIDStorage/utils/
    erp5/trunk/products/TIDStorage/utils/check_tid_log.py   (with props)
    erp5/trunk/products/TIDStorage/utils/tid_log_to_full.py   (with props)

Added: erp5/trunk/products/TIDStorage/ExchangeProtocol.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/ExchangeProtocol.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/ExchangeProtocol.py (added)
+++ erp5/trunk/products/TIDStorage/ExchangeProtocol.py [utf8] Thu Nov  6 17:14:24 2008
@@ -1,0 +1,131 @@
+############################################################################
+#
+# Copyright (c) 2007, 2008 Nexedi SARL and Contributors. All Rights Reserved.
+#                    Vincent Pelletier <vincent at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+#
+##############################################################################
+
+class ClientDisconnected(Exception):
+  pass
+
+class ExchangeProtocol:
+  """
+    Handle data exchange between client and server.
+    Kinds of data which can be exchanged:
+     - str
+       send_field
+       recv_field
+     - int
+       send_int
+       recv_int
+     - list of str
+       send_list
+       recv_list
+     - list of int
+       send_int_list
+       recv_int_list
+     - dict (key: str, value: int)
+       send_dict
+       recv_dict
+    Forbidden chars:
+      Send (raise if present):
+        \\n (field separator)
+      Receive (stripped silently):
+        \\n (field separator)
+        \\r (for compatibility)
+  """
+  def __init__(self, socket):
+    self._socket = socket
+
+  def send_field(self, to_send):
+    if type(to_send) is not str:
+      raise ValueError, 'Value is not of str type: %r' % (type(to_send), )
+    if '\n' in to_send:
+      raise ValueError, '\\n is a forbidden value.'
+    self._socket.send(to_send)
+    self._socket.send('\n')
+
+  def recv_field(self):
+    received = None
+    result = []
+    append = result.append
+    while received != '\n':
+      received = self._socket.recv(1)
+      if len(received) == 0:
+        raise ClientDisconnected
+      if received != '\r':
+        append(received)
+    return ''.join(result[:-1])
+
+  def send_int(self, to_send):
+    self.send_field(str(to_send))
+
+  def recv_int(self):
+    return int(self.recv_field())
+
+  def send_list(self, to_send, send_length=True):
+    assert isinstance(to_send, (tuple, list))
+    if send_length:
+      self.send_int(len(to_send))
+    for field in to_send:
+      self.send_field(field)
+
+  def send_int_list(self, to_send, *args, **kw):
+    self.send_list([str(x) for x in to_send], *args, **kw)
+
+  def recv_list(self, length=None):
+    result = []
+    append = result.append
+    if length is None:
+      length = int(self.recv_field())
+    for field_number in xrange(length):
+      append(self.recv_field())
+    return result
+
+  def recv_int_list(self, *args, **kw):
+    return [int(x) for x in self.recv_list(*args, **kw)]
+
+  def send_dict(self, to_send):
+    """
+      Key: string
+      Value: int
+    """
+    assert isinstance(to_send, (dict))
+    if len(to_send) == 0:
+      key_list = value_list = []
+    else:
+      key_list, value_list = zip(*to_send.items())
+    self.send_list(key_list)
+    self.send_int_list(value_list, send_length=False)
+
+  def recv_dict(self):
+    """
+      Key: string
+      Value: int
+    """
+    key_list = self.recv_list()
+    value_list = self.recv_int_list(len(key_list))
+    result = dict(zip(key_list, value_list))
+    return result
+

Added: erp5/trunk/products/TIDStorage/README
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/README?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/README (added)
+++ erp5/trunk/products/TIDStorage/README [utf8] Thu Nov  6 17:14:24 2008
@@ -1,0 +1,54 @@
+1) Protocole:
+ Tous caract�s autoris�dans les donn�, �'exception de \n et \r.
+ Tout champ se termine par un \n (\r ignor�
+ Pas d'�appement.
+ Lors de transfert de listes, la liste est pr�d�par le nombre de champs qu'elle contient.
+  Ex:
+  3\n
+  foo\n
+  bar\n
+  baz\n
+
+2) Commande de d�t de commit:
+
+BEGIN\n
+<identifiant du commit>\n
+<liste des storages concern�
+
+ <identifiant du commit>: doit �e identique �elui fourni �a fin de l'op�tion (que �soit un ABORT ou un COMMIT)
+ <liste des storages concern�: liste des identifiants des storages concern�par le commit
+  NB: la liste se termine par un \n, il n'est donc pas r�t�ci.
+
+R�nse: (rien)
+
+3) Commande d'annulation de la transaction:
+
+ABORT\n
+<identifiant du commit>\n
+
+  <identifiant du commit>: (cf. BEGIN)
+
+R�nse: (rien)
+
+4) Commande de finalisation de la transaction:
+
+COMMIT\n
+<identifiant du commit>\n
+<liste des storages concern�
+<liste des TIDs commit�
+
+ <identifiant du commit>: (cf. BEGIN)
+ <liste des storages concern�: (cf. BEGIN)
+ <liste des TIDs commit�: De m� longueur que la liste des storages concern� L'ordre doit corresponde �ette derni�.
+  NB: la liste se termine par un \n, il n'est donc pas r�t�ci.
+
+R�nse: (rien)
+
+5) Commande de lecture des donn�:
+
+DUMP\n
+
+R�nse:
+<liste des storages>
+<liste des TIDs>
+

Added: erp5/trunk/products/TIDStorage/ZEOClientStorage.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/ZEOClientStorage.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/ZEOClientStorage.py (added)
+++ erp5/trunk/products/TIDStorage/ZEOClientStorage.py [utf8] Thu Nov  6 17:14:24 2008
@@ -1,0 +1,53 @@
+############################################################################
+#
+# Copyright (c) 2007 Nexedi SARL and Contributors. All Rights Reserved.
+#                    Vincent Pelletier <vincent at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+#
+##############################################################################
+
+from ZEO.ClientStorage import ClientStorage
+
+LAST_COMMITED_TID_PROPERTY_ID = '_last_commited_tid'
+
+# Hook tpc_finish's hook method.
+# New hook must be a local method because it must access tpc_finish's "self"
+# and original hook.
+
+original_tpc_finish = ClientStorage.tpc_finish
+def tpc_finish(self, txn, f=None):
+  def saveTIDOnInstance(tid):
+    if f is not None:
+      f(tid)
+    setattr(self, LAST_COMMITED_TID_PROPERTY_ID, tid)
+  return original_tpc_finish(self, txn, f=saveTIDOnInstance)
+ClientStorage.tpc_finish = tpc_finish
+
+def getLastCommitedTID(self):
+  """
+    Return last commited tid for this storage, or None if no transaction
+    was commited yet.
+  """
+  return getattr(self, LAST_COMMITED_TID_PROPERTY_ID, None)
+ClientStorage.getLastCommitedTID = getLastCommitedTID
+

Added: erp5/trunk/products/TIDStorage/__init__.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/__init__.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/__init__.py (added)
+++ erp5/trunk/products/TIDStorage/__init__.py [utf8] Thu Nov  6 17:14:24 2008
@@ -1,0 +1,32 @@
+############################################################################
+#
+# Copyright (c) 2007, 2008 Nexedi SARL and Contributors. All Rights Reserved.
+#                    Vincent Pelletier <vincent at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+#
+##############################################################################
+
+# Load monkey patches
+import transaction_transaction
+import ZEOClientStorage
+

Added: erp5/trunk/products/TIDStorage/bin/server.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/bin/server.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/bin/server.py (added)
+++ erp5/trunk/products/TIDStorage/bin/server.py [utf8] Thu Nov  6 17:14:24 2008
@@ -1,0 +1,766 @@
+#!/usr/bin/python
+
+##############################################################################
+#
+# Copyright (c) 2007, 2008 Nexedi SARL and Contributors. All Rights Reserved.
+#                    Vincent Pelletier <vincent at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+#
+##############################################################################
+
+# About errors in TIDStorage logs:
+# - CRITICAL: Decreasing update ignored
+#   This error means that any backup started prior to this error can contain
+#   incomplete transaction data.
+#   This error can happen when TIDStorage did not handle received data in the
+#   right order.
+#   Example:
+#     3 storages (S1, S2, S3):
+#       They all start at TID=1 value.
+#     2 transaction (T1, T2):
+#       T1 commits TID 3 on S2, TID 2 on S3
+#       T2 commits TID 2 on S1, TID 2 on S2
+#     Due to those TIDs, TIDStorage *should* handle data in this order:
+#       T2begin, T2commit, T1begin, T1commit
+#     Or:
+#       T2begin, T1begin, T2commit, T1commit
+#     Or even, though it denotes a late handling of T2commit:
+#       T2begin, T1begin, T1commit, T2commit
+#     But, if TIDStorage handles data in the following order:
+#       T1begin, T1commit, T2begin, T2commit
+#     *AND* a backup dumps TIDStorage content at a point between T1commit and
+#     T2commit, then the backup will contain T2's commit on S2, which has a
+#     lower TID than T1's commit on that same storage.
+#
+# - Abort received, but never began
+# and
+# - Commit received, but never began
+#   These erros means that packets were lost/never received.
+#   This should not happen, since network connection is TCP, and TCP
+#   retransmits data.
+#   But it happens frequently if TIDStorage is started when Zope is under
+#   load. This is because Zope attemped to contact TIDStorage at "begin"
+#   step, but could not reach it. Then, at commit (or abort) it could reach
+#   TIDStorage, causing the error message.
+#   This error is bening, because:
+#     - Until bootstrap is complete, no TID is available for backup
+#     - When bootstrap is complete, it means that every ZODB got unlocked
+#       at some point (since TIDStorage commit happens after ZODB tpc_finish
+#       lock release).
+#     - When a transaction sends data to multiple ZODBs, there is a point in
+#       time when ALL impacted ZODBs are locked.
+#     The conclusion of all this is that any transaction started before
+#     TIDStorage was available has necesarily finished (commit or abort) at
+#     the time bootstrap finished.
+#     So no backup can be affected by such message (but backup feasability can
+#     get delayed as locks would delay bootstrap end, and hence TID data
+#     availability).
+
+import os
+import imp
+import sys
+import pwd
+import grp
+import sets
+import time
+import urllib
+import socket
+import signal
+import getopt
+import SocketServer
+import threading
+import traceback
+from ExchangeProtocol import ClientDisconnected, ExchangeProtocol
+
+class TransactionException(Exception):
+  pass
+
+class AlwaysIncreasingDict(dict):
+  """
+    When inserting/updating a value, check that the new one is strictly
+    greater than existing key (or integer 0 value if no value existed for
+    given key).
+    Values are converted to integers before comparison.
+    
+    TODO:
+     - Do not descend from dict to prevent users from avoiding checks.
+  """
+  def __init__(self, strict=False, *args, **kw):
+    dict.__init__(self, *args, **kw)
+    self._strict = strict
+ 
+  def __setitem__(self, key, value):
+    if self.get(key, 0) < value:
+      dict.__setitem__(self, key, value)
+    else:
+      if self._strict:
+        log('CRITICAL: Decreasing update ignored: key=%r %r <= %r' % \
+            (key, value, self.get(key, 0)))
+
+  def update(self, other):
+    """
+      To check for decreases.
+    """
+    for key, value in other.iteritems():
+      self[key] = value
+ 
+class TransactionTracker:
+  """
+    Implement transaction tracking.
+    This class is not thread-safe.
+    A transaction starts with a call to "begin" and ends with a call to
+    "finish" with the same identifier.
+    "finish" returns payload provided at begin (or True if no payload was
+    given) if nothing illegal was detected, otherwise returns False.
+
+    Illegal cases:
+     - "begin" called twice without intermediate "finish" call
+     - "finish" called without a corresponding "begin" call (this includes
+       calling "finish" twice)
+  """
+  def __init__(self):
+    self._container = {}
+
+  def begin(self, identifier, payload=True):
+    if identifier in self._container:
+      raise TransactionException, 'Begin called twice in a row.'
+    self._container[identifier] = payload
+
+  def finish(self, identifier):
+    if identifier not in self._container:
+      raise TransactionException, 'Finish called without former "begin" call.'
+    return self._container.pop(identifier)
+
+class TIDServer(SocketServer.BaseRequestHandler):
+  """
+    Exchange data with connected peer.
+
+    TODO:
+     - Implement socket buffering.
+  """
+  def log(self, message):
+    log('%r: %s' % (self.client_address, message))
+
+  def dump(self):
+    tid_dict = self._tid_storage.dump()
+    self._field_exchange.send_dict(tid_dict)
+
+  def begin(self):
+    identifier = self._field_exchange.recv_field()
+    storage_id_list = self._field_exchange.recv_list()
+    self._tid_storage.begin(identifier, storage_id_list)
+
+  def abort(self):
+    identifier = self._field_exchange.recv_field()
+    self._tid_storage.abort(identifier)
+
+  def commit(self):
+    identifier = self._field_exchange.recv_field()
+    tid_dict = self._field_exchange.recv_dict()
+    self._tid_storage.commit(identifier, tid_dict)
+
+  def handle(self):
+    global tid_storage
+    self._tid_storage = tid_storage
+    self._field_exchange = ExchangeProtocol(socket=self.request)
+    command_mapping = {
+      'begin': self.begin,
+      'abort': self.abort,
+      'commit': self.commit,
+      'dump': self.dump
+    }
+    self.log('Connected')
+    try:
+      # Intercept ClientDisconnected exception to stop thread nicely instead
+      # of crashing.
+      # Log all others exceptions.
+      while True:
+        received = self._field_exchange.recv_field()
+        command_str = received.lower()
+        if command_str == 'quit':
+          break
+        method = command_mapping.get(command_str)
+        if method is not None:
+          # Intercept all errors to log it instead of causing disconnection.
+          # Except, of course, the ClientDisconnected exception itself.
+          try:
+            method()
+          except ClientDisconnected:
+            raise
+          except:
+            self.log('\n'.join(traceback.format_exception(*sys.exc_info())))
+    except ClientDisconnected:
+      pass
+    except:
+      self.log('\n'.join(traceback.format_exception(*sys.exc_info())))
+    self.log('Client disconnected')
+    self.request.shutdown(socket.SHUT_RDWR)
+    self.request.close()
+    return
+
+class TIDStorage:
+  """
+    Store ZODB TIDs for multiple ZODBs.
+    Designed to be a singleton.
+    Thread-safe.
+    
+    Consequently, transactions are not bound to a specific connection: If a
+    connection is cut after a "begin", reconnecting and issuing "abort" or
+    "commit" is valid.
+
+    TODO:
+     - Use smaller locking areas
+     - Improve decision taking algorythm in _unregisterTransactionID (implies
+       modifying _registerTransactionIDAndStorageID).
+  """
+  _storage_id_lock = threading.RLock()
+  _next_full_dump = None
+  _next_dump = None
+  _tid_file = None
+  _burst_period = None
+  _full_dump_period = None
+  
+  def __init__(self, tid_file_path=None, burst_period=None, full_dump_period=None):
+    self._transaction_tracker = TransactionTracker()
+    self._storage = AlwaysIncreasingDict(strict=True)
+    self._transcient = AlwaysIncreasingDict()
+    self._storage_id_to_transaction_id_list_dict = {}
+    self._transaction_id_to_storage_id_list_dict = {}
+    self._storage_id_to_storage_id_set_dict = {}
+    if tid_file_path is not None:
+      self._tid_file = LogFile(tid_file_path)
+      self._burst_period = burst_period
+      self._full_dump_period = full_dump_period
+      now = time.time()
+      if full_dump_period is not None:
+        self._next_full_dump = now
+      if burst_period is not None:
+        self._next_dump = now
+        self._since_last_burst = sets.Set()
+
+  def __repr__(self):
+    result = []
+    append = result.append
+    self._storage_id_lock.acquire()
+    try:
+      append('_storage_id_to_transaction_id_list_dict=' + \
+             repr(self._storage_id_to_transaction_id_list_dict))
+      append('_transaction_id_to_storage_id_list_dict=' + \
+             repr(self._transaction_id_to_storage_id_list_dict))
+      append('_storage_id_to_storage_id_set_dict=' + \
+             repr(self._storage_id_to_storage_id_set_dict))
+      append('_transcient=' + repr(self._transcient))
+      append('_storage=' + repr(self._storage))
+    finally:
+      self._storage_id_lock.release()
+    return '\n'.join(result)
+
+  def _registerTransactionIDAndStorageID(self, transaction_id, storage_id_list):
+    assert len(storage_id_list) != 0
+    assert self._storage_id_lock.acquire(False)
+    try:
+      # Update transaction_id -> storage_id_list
+      assert transaction_id not in self._transaction_id_to_storage_id_list_dict
+      self._transaction_id_to_storage_id_list_dict[transaction_id] = storage_id_list
+      storage_id_set = sets.Set(storage_id_list)
+      storage_id_set_id_set = sets.Set()
+      for storage_id in storage_id_list:
+        # Update storage_id -> transaction_id_list
+        identifier_set = self._storage_id_to_transaction_id_list_dict.get(storage_id)
+        if identifier_set is None:
+          identifier_set = self._storage_id_to_transaction_id_list_dict[storage_id] = sets.Set()
+        assert transaction_id not in identifier_set
+        identifier_set.add(transaction_id)
+        # Prepare the update storage_id -> storage_id_set
+        existing_storage_id_set = self._storage_id_to_storage_id_set_dict.get(storage_id, None)
+        if existing_storage_id_set is not None:
+          storage_id_set.union_update(existing_storage_id_set)
+          storage_id_set_id_set.add(id(existing_storage_id_set))
+      # Update storage_id -> storage_id_set
+      # Cannot use iteritems because dict is modified in the loop.
+      for key, value in self._storage_id_to_storage_id_set_dict.items():
+        if id(value) in storage_id_set_id_set:
+          self._storage_id_to_storage_id_set_dict[key] = storage_id_set
+      for storage_id in storage_id_set:
+        self._storage_id_to_storage_id_set_dict[storage_id] = storage_id_set
+    finally:
+      self._storage_id_lock.release()
+
+  def _unregisterTransactionID(self, transaction_id):
+    """
+      Also transfers from self._transcient to self._storage.
+    """
+    assert self._storage_id_lock.acquire(False)
+    try:
+      # Update transaction_id -> storage_id_list and retrieve storage_id_list
+      # Raises if not found
+      storage_id_list = self._transaction_id_to_storage_id_list_dict.pop(transaction_id)
+      # Update storage_id -> transaction_id_list
+      for storage_id in storage_id_list:
+        identifier_set = self._storage_id_to_transaction_id_list_dict[storage_id]
+        # Raises if not found
+        identifier_set.remove(transaction_id)
+        if len(identifier_set) == 0:
+          del self._storage_id_to_transaction_id_list_dict[storage_id]
+          # Update storage_id -> storage_id_set
+          # Raises if not found
+          storage_id_set = self._storage_id_to_storage_id_set_dict[storage_id]
+          # Raises if not found
+          storage_id_set.remove(storage_id)
+      if self._tid_file is not None:
+        now = time.time()
+        can_full_dump = has_bootstraped and (self._next_full_dump is not None) and (self._next_full_dump < now)
+        can_dump = (not can_full_dump) and (self._next_dump is not None) and (self._next_dump < now)
+        record_for_dump = can_dump or (self._next_dump is not None)
+        append_to_file = has_bootstraped and (can_dump or can_full_dump)
+      else:
+        append_to_file = record_for_dump = can_dump = can_full_dump = False
+      for key, value in self._storage_id_to_storage_id_set_dict.iteritems():
+        if len(value) == 0 and key in self._transcient:
+          self._storage[key] = self._transcient.pop(key)
+          if record_for_dump:
+            self._since_last_burst.add(key)
+      if append_to_file:
+        if can_full_dump:
+          to_dump_dict = self._storage
+          dump_code = 'f'
+        else:
+          to_dump_dict = dict([(key, self._storage[key]) for key in self._since_last_burst])
+          dump_code = 'd'
+        if len(to_dump_dict):
+          self._tid_file.write('%.02f %s %r\n' % (now, dump_code, to_dump_dict))
+          if can_full_dump:
+            self._next_full_dump = now + self._full_dump_period
+          if self._next_dump is not None:
+            self._next_dump = now + self._burst_period
+            self._since_last_burst.clear()
+      if not has_bootstraped:
+        doBootstrap()
+      #if len(self._storage_id_to_transaction_id_list_dict) == 0:
+      #  self._storage.update(self._transcient)
+      #  self._transcient.clear()
+    finally:
+      self._storage_id_lock.release()
+
+  def dump(self):
+    self._storage_id_lock.acquire()
+    try:
+      return self._storage.copy()
+    finally:
+      self._storage_id_lock.release()
+
+  def begin(self, transaction_id, storage_id_list):
+    self._storage_id_lock.acquire()
+    try:
+      self._transaction_tracker.begin(transaction_id, storage_id_list)
+      self._registerTransactionIDAndStorageID(transaction_id, storage_id_list)
+    finally:
+      self._storage_id_lock.release()
+
+  def abort(self, transaction_id):
+    self._storage_id_lock.acquire()
+    try:
+      try:
+        self._transaction_tracker.finish(transaction_id)
+      except TransactionException:
+        # Overwrite exception message
+        raise TransactionException, 'Abort received, but never began'
+      self._unregisterTransactionID(transaction_id)
+    finally:
+      self._storage_id_lock.release()
+
+  def commit(self, transaction_id, tid_dict):
+    self._storage_id_lock.acquire()
+    try:
+      try:
+        storage_id_list = self._transaction_tracker.finish(transaction_id)
+      except TransactionException:
+        # Overwrite exception message
+        raise TransactionException, 'Commit received, but never began'
+      check_dict = tid_dict.copy()
+      for storage_id in storage_id_list:
+        del check_dict[storage_id]
+      assert len(check_dict) == 0
+      self._transcient.update(tid_dict)
+      self._unregisterTransactionID(transaction_id)
+    finally:
+      self._storage_id_lock.release()
+
+class BootstrapContent(threading.Thread):
+  """
+    Thread used to bootstrap TIDStorage content.
+    This must be started at first client request, and must be run only once.
+    Global boolean "has_bootstraped" is set to true once it succeeded.
+  """
+
+  def __init__(self, *args, **kw):
+    threading.Thread.__init__(self, *args, **kw)
+    self.setDaemon(True)
+
+  def run(self):
+    """
+      Contact all zopes to serialize all their storages.
+    """
+    global has_bootstraped
+    base_url = options.base_url
+    if base_url is not None:
+      storage_id_to_object_path_dict = dict([(key, value[2]) for key, value
+        in options.known_tid_storage_identifier_dict.iteritems()
+        if value[2] is not None])
+      target_storage_id_set = sets.ImmutableSet(storage_id_to_object_path_dict.keys())
+      known_storage_id_set = sets.ImmutableSet(tid_storage.dump().keys())
+      to_check_storage_id_set = target_storage_id_set - known_storage_id_set
+      while len(to_check_storage_id_set) and can_bootstrap:
+        serialize_url = None
+        for storage_id in to_check_storage_id_set:
+          if can_bootstrap and storage_id not in tid_storage.dump().keys():
+            serialize_url = base_url % (storage_id_to_object_path_dict[storage_id], )
+            try:
+              # Query a Zope, which will contact this process in return to store
+              # the new TID number, making the given storage known.
+              page = urllib.urlopen(serialize_url)
+            except Exception, message:
+              log('Exception during bootstrap (%r):\n%s' % (serialize_url, ''.join(traceback.format_exception(*sys.exc_info()))))
+            else:
+              log('Opened %r: %r' % (serialize_url, page.read()))
+        # Let some time for zope to contact TIDStorage back and fill the gaps.
+        time.sleep(5)
+        known_storage_id_set = sets.ImmutableSet(tid_storage.dump().keys())
+        to_check_storage_id_set = target_storage_id_set - known_storage_id_set
+        if len(to_check_storage_id_set):
+          log('Bootstrap in progress... Mising storages: %r' % (to_check_storage_id_set, ))
+          # Retry a bit later
+          time.sleep(60)
+      if len(to_check_storage_id_set) == 0:
+        log('Bootstrap done (%i storages).' % (len(target_storage_id_set), ))
+        has_bootstraped = True
+    else:
+      has_bootstraped = True
+
+bootstrap_content = BootstrapContent()
+has_bootstraped = False
+can_bootstrap = True
+bootstrap_lock = threading.RLock()
+
+def doBootstrap():
+  acquired = bootstrap_lock.acquire(False)
+  if acquired:
+    try:
+      if not bootstrap_content.isAlive():
+        bootstrap_content.start()
+    finally:
+      bootstrap_lock.release()
+
+def log(message):
+  print >> sys.stdout, '%s: %s' % (time.asctime(), message)
+
+class PoliteThreadingTCPServer(SocketServer.ThreadingTCPServer):
+  def server_close(self):
+    # Make the port reusable for listening as soon as the socket closes.
+    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
+    self.socket.shutdown(socket.SHUT_RDWR)
+    self.socket.close()
+
+def main(address, port):
+  server = PoliteThreadingTCPServer((address, port), TIDServer)
+  try:
+    try:
+      log('Server listening.')
+      server.serve_forever()
+    except KeyboardInterrupt:
+      log('Shuting down (received KeyboardInterrupt).')
+  finally:
+    global can_bootstrap
+    can_bootstrap = False
+    log('Waiting for clients to disconnect...')
+    server.server_close()
+
+log_file_set = sets.Set()
+
+class LogFile:
+  """
+    Loggin-to-file class.
+    Allows rotating file.
+    Can be used as stdout/stderr file: no write from any thread is lost during
+    rotation. There is an unavoidable (?) race condition if anything gets
+    raised by the rotating thread between "self._to_buffer = False" completion
+    and following log flush.
+  """
+  _file = None
+
+  def __init__(self, file_name):
+    self._lock = threading.RLock()
+    self._file_name = file_name
+    self._to_buffer = False
+    self._buffer = []
+    self._open()
+    log_file_set.add(self)
+
+  def _open(self):
+    self._file = open(self._file_name, 'a', 0)
+
+  def write(self, value):
+    self._lock.acquire()
+    try:
+      if self._to_buffer:
+        self._buffer.append(value)
+      else:
+        self._file.write(value)
+    finally:
+      self._lock.release()
+
+  def close():
+    self._lock.acquire()
+    try:
+      log_file_set.remove(self)
+      self._file.close()
+      self._file = None
+    finally:
+      self._lock.release()
+
+  def rotate(self):
+    self._lock.acquire()
+    try:
+      self._to_buffer = True
+      self._file.close()
+      self._open()
+      # XXX: race condition below if rotating stderr: Any exception thrown
+      # here will be out-of-order in resulting log.
+      self._to_buffer = False
+      self.write(''.join(self._buffer))
+      # End of race condition.
+    finally:
+      self._lock.release()
+
+def HUPHandler(signal_number, stack):
+  rotate_count = 0
+  log('Rotating logfiles...')
+  for log_file in log_file_set:
+    rotate_count += 1
+    log_file.rotate()
+  log('Logfiles rotated (%i).' % (rotate_count, ))
+
+def USR1Handler(signal_number, stack):
+  log(repr(tid_storage))
+
+def TERMHandler(signal_number, stack):
+  log('Received SIGTERM, exiting.')
+  raise KeyboardInterrupt, 'Killed by SIGTERM'
+
+def usage():
+  print """
+Usage: %(arg0)s [-h] [-n|--nofork|--fg] [-l|--log] [-p|--port] [-a|--address]
+       [--pidfile] [--user] [--group] [-s|--status-file] [-b|--burst-period]
+       [-F|--full-dump-period]
+
+  -h
+    Display this help.
+
+  -n
+  --nofork
+  --fg
+    Do not fork in background.
+
+  -l filename
+  --log filename
+    Log to given filename, instead of default %(logfile_name)s.
+
+  -p number
+  --port number
+    Listen to given port number, intead of default %(port)i.
+
+  -a address
+  --address address
+    Listen to interface runing given address, instead of default %(address)s.
+
+  --pidfile file_path
+    If forking, this file will contain the pid of forked process.
+    If this argument is not provided, pid is written to %(pidfile_name)s.
+
+  --user user_name
+    Run as specified user.
+    Also, retrieve user's group and run as this group.
+
+  --group group_name
+    Run as specified group.
+    If both --user and --group are specified, --group must come last.
+
+  -s file_name
+  --status-file file_name
+    Append stored TIDs to file.
+    See also "burst-period" and "full-dump-period".
+    If not provided, no dump ever happens.
+
+  -b seconds
+  --burst-period seconds
+    Defines the age of last write after which an incremental write can happen.
+    Such write only contain what changed since last write.
+    If not provided, no incremental write is done.
+
+  -F seconds
+  --full-dump-period seconds
+    How many seconds must separate complete dumps to status file.
+    Those writes contain the complete current state.
+    If both a full dump and an incremental write can happen, full dump takes
+    precedence.
+    If not provided, no full dump is done.
+
+  -c file_name
+  --config file_name
+    Use given file as options file.
+    It must be a python file. See sample_options.py for possible values.
+    If provided and if configuration file defines base_url and
+    known_tid_storage_identifier_dict variables, this program will cause
+    generation of all tids before first write to status file.
+""" % {'arg0': sys.argv[0],
+       'logfile_name': Options.logfile_name,
+       'pidfile_name': Options.pidfile_name,
+       'port': Options.port,
+       'address': Options.address}
+
+class Options:
+  port = 9001
+  address = '0.0.0.0'
+  logfile_name = 'tidstorage.log'
+  pidfile_name = 'tidstorage.pid'
+  fork = True
+  setuid = None
+  setgid = None
+  status_file = None
+  burst_period = None
+  full_dump_period = None
+  known_tid_storage_identifier_dict = {}
+  base_url = None
+
+config_file_name = None
+
+options = Options()
+
+try:
+  opts, args = getopt.getopt(sys.argv[1:],
+                             'hnfl:p:a:s:b:F:c:',
+                             ['help', 'nofork', 'fg', 'log=', 'port=',
+                              'address=', 'pidfile=', 'user=', 'group=',
+                              'status-file=', 'burst-period=',
+                              'full-dump-period=', 'config='])
+except:
+  usage()
+  raise
+
+for opt, arg in opts:
+  if opt in ('-h', '--help'):
+    usage()
+    sys.exit()
+  elif opt in ('-n', '--fg', '--nofork'):
+    options.fork = False
+  elif opt in ('-l', '--log'):
+    options.logfile_name = arg
+  elif opt in ('-p', '--port'):
+    options.port = int(arg)
+  elif opt in ('-a', '--address'):
+    options.address = arg
+  elif opt == '--pidfile':
+    options.pidfile_name = arg
+  elif opt == '--user':
+    pw = pwd.getpwnam(arg)
+    options.setuid = pw.pw_uid
+    options.setgid = pw.pw_gid
+  elif opt == '--group':
+    options.setgid = grp.getgrnam(arg).gr_gid
+  elif opt in ('-s', '--status-file'):
+    options.status_file = arg
+  elif opt in ('-b', '--burst-period'):
+    options.burst_period = int(arg)
+  elif opt in ('-F', '--full-dump-period'):
+    options.full_dump_period = int(arg)
+  elif opt in ('-c', '--config'):
+    config_file_name = arg
+
+if config_file_name is not None:
+  config_file = os.path.splitext(os.path.basename(config_file_name))[0]
+  config_path = os.path.dirname(config_file_name)
+  if len(config_path):
+    config_path = [config_path]
+  else:
+    config_path = sys.path
+  file, path, description = imp.find_module(config_file, config_path)
+  module = imp.load_module(config_file, file, path, description)
+  file.close()
+  for option_id in [x for x in dir(Options) if x[:1] != '_']:
+    if option_id not in options.__dict__ and hasattr(module, option_id):
+      setattr(options, option_id, getattr(module, option_id))
+
+if options.logfile_name is not None:
+  options.logfile_name = os.path.abspath(options.logfile_name)
+if options.status_file is not None:
+  options.status_file = os.path.abspath(options.status_file)
+
+if options.setgid is not None:
+  os.setgid(options.setgid)
+
+if options.setuid is not None:
+  os.setuid(options.setuid)
+
+tid_storage = TIDStorage(tid_file_path=options.status_file,
+                         burst_period=options.burst_period,
+                         full_dump_period=options.full_dump_period)
+
+signal.signal(signal.SIGHUP, HUPHandler)
+signal.signal(signal.SIGUSR1, USR1Handler)
+signal.signal(signal.SIGTERM, TERMHandler)
+
+if options.fork:
+  pid = os.fork()
+  if pid == 0:
+    os.umask(027)
+    os.setsid()
+    pid = os.fork()
+    if pid == 0:
+      os.close(0)
+      os.close(1)
+      os.close(2)
+      os.open('/dev/null', os.O_RDWR)
+      os.dup2(0, 1)
+      os.dup2(0, 2)
+      sys.stdout = sys.stderr = LogFile(options.logfile_name)
+      os.chdir('/')
+      try:
+        main(options.address, options.port)
+      except:
+        # Log exception before returning.
+        log('Exception caught outside of "main". Previous log entries might ' \
+            'be out of order because of this exception.\n%s' % (
+            ''.join(traceback.format_exception(*sys.exc_info())), ))
+      else:
+        log('Exited normaly.')
+    else:
+      pidfile = open(options.pidfile_name, 'w')
+      pidfile.write(str(pid))
+      pidfile.close()
+      os._exit(0)
+  else:
+    # TODO: monitor child startup to make it easier to use.
+    os._exit(0)
+else:
+  main(options.address, options.port)
+

Propchange: erp5/trunk/products/TIDStorage/bin/server.py
------------------------------------------------------------------------------
    svn:executable = *

Added: erp5/trunk/products/TIDStorage/repozo/from_z2.8.8_repozo.diff
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/repozo/from_z2.8.8_repozo.diff?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/repozo/from_z2.8.8_repozo.diff (added)
+++ erp5/trunk/products/TIDStorage/repozo/from_z2.8.8_repozo.diff [utf8] Thu Nov  6 17:14:24 2008
@@ -1,0 +1,112 @@
+--- /home/vincent/bin/zope2.8/bin/repozo.py	2007-02-09 13:52:35.000000000 +0100
++++ repozo.py	2007-10-26 15:30:43.311046075 +0200
+@@ -50,6 +50,12 @@
+         Compress with gzip the backup files.  Uses the default zlib
+         compression level.  By default, gzip compression is not used.
+ 
++    -m / --max-tid
++        Stop at given TID when saving the Data.fs.
++
++    -M / --print-max-tid
++        Print the last saved transaction's tid.
++
+ Options for -R/--recover:
+     -D str
+     --date=str
+@@ -70,6 +76,7 @@
+ import time
+ import errno
+ import getopt
++import base64
+ 
+ from ZODB.FileStorage import FileStorage
+ 
+@@ -104,10 +111,11 @@
+ def parseargs():
+     global VERBOSE
+     try:
+-        opts, args = getopt.getopt(sys.argv[1:], 'BRvhf:r:FD:o:Qz',
++        opts, args = getopt.getopt(sys.argv[1:], 'BRvhf:r:FD:o:Qzm:M',
+                                    ['backup', 'recover', 'verbose', 'help',
+                                     'file=', 'repository=', 'full', 'date=',
+-                                    'output=', 'quick', 'gzip'])
++                                    'output=', 'quick', 'gzip', 'max-tid=',
++                                    'print-max-tid'])
+     except getopt.error, msg:
+         usage(1, msg)
+ 
+@@ -120,6 +128,8 @@
+         output = None       # where to write recovered data; None = stdout
+         quick = False       # -Q flag state
+         gzip = False        # -z flag state
++        print_tid = False   # -M flag state
++        max_tid = None      # -m argument, if any
+ 
+     options = Options()
+ 
+@@ -150,6 +160,10 @@
+             options.output = arg
+         elif opt in ('-z', '--gzip'):
+             options.gzip = True
++        elif opt in ('-M', '--print-max-tid'):
++             options.print_tid = True
++        elif opt in ('-m', '--max-tid'):
++             options.max_tid = base64.decodestring(arg)
+         else:
+             assert False, (opt, arg)
+ 
+@@ -174,6 +188,12 @@
+         if options.file is not None:
+             log('--file option is ignored in recover mode')
+             options.file = None
++        if options.print_tid:
++            log('--print-max-tid is ignored in recover mode')
++            options.print_tid = False
++        if options.max_tid is not None:
++            log('--max-tid is ignored in recover mode')
++            options.max_tid = None
+     return options
+ 
+ 
+@@ -349,13 +369,19 @@
+ 
+ def do_full_backup(options):
+     # Find the file position of the last completed transaction.
+-    fs = FileStorage(options.file, read_only=True)
++    fs = FileStorage(options.file, read_only=True, stop=options.max_tid)
+     # Note that the FileStorage ctor calls read_index() which scans the file
+     # and returns "the position just after the last valid transaction record".
+     # getSize() then returns this position, which is exactly what we want,
+     # because we only want to copy stuff from the beginning of the file to the
+     # last valid transaction record.
+     pos = fs.getSize()
++    if options.print_tid:
++      undo_log = fs.undoLog(last=-1)
++      if len(undo_log):
++        print >> sys.stdout, 'Last TID: %s' % (undo_log[0]['id'], )
++      else:
++        print >> sys.stderr, 'Cannot get latest TID'
+     fs.close()
+     options.full = True
+     dest = os.path.join(options.repository, gen_filename(options))
+@@ -375,13 +401,19 @@
+ 
+ def do_incremental_backup(options, reposz, repofiles):
+     # Find the file position of the last completed transaction.
+-    fs = FileStorage(options.file, read_only=True)
++    fs = FileStorage(options.file, read_only=True, stop=options.max_tid)
+     # Note that the FileStorage ctor calls read_index() which scans the file
+     # and returns "the position just after the last valid transaction record".
+     # getSize() then returns this position, which is exactly what we want,
+     # because we only want to copy stuff from the beginning of the file to the
+     # last valid transaction record.
+     pos = fs.getSize()
++    if options.print_tid:
++      undo_log = fs.undoLog(last=-1)
++      if len(undo_log):
++        print >> sys.stdout, 'Last TID: %s' % (undo_log[0]['id'], )
++      else:
++        print >> sys.stderr, 'Cannot get latest TID'
+     fs.close()
+     options.full = False
+     dest = os.path.join(options.repository, gen_filename(options))

Added: erp5/trunk/products/TIDStorage/repozo/repozo_tidstorage.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/repozo/repozo_tidstorage.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/repozo/repozo_tidstorage.py (added)
+++ erp5/trunk/products/TIDStorage/repozo/repozo_tidstorage.py [utf8] Thu Nov  6 17:14:24 2008
@@ -1,0 +1,353 @@
+#!/usr/bin/python
+
+##############################################################################
+#
+# Copyright (c) 2007 Nexedi SARL. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+# Parts of this file are borrowed from Zope 2.8.8 repozo.py script.
+# Essentialy "usage" and "parseargs" methods.
+# So it's released under the ZPL v2.0, as is Zope 2.8.8 .
+
+""" repozo wrapper to backup for multiple Data.fs files in a consistent way.
+
+Usage: %(program)s [-h|--help] [-c|--config configuration_file]
+       [--repozo repozo_command] [-R|--recover|--recover_check]
+       [-H|--host address] [-p|--port port_number] [-u|--url formated_url]
+       [...]
+
+  -h
+  --help
+    Display this help and exit.
+
+  -c configuration_file
+  --config configuration_file
+    Use given file as configuration file.
+    It must be a python file. See sample_configuration.py for required values.
+    Recquired if neither -h nor --help are given.
+
+  --repozo repozo_command
+    Use given executable as repozo command.
+    Default: repozo.py
+
+  -R
+  --recover
+    Instead of saving existing Data.fs, perform an automated recovery from
+    backups + timestamp file.
+
+  --recover_check
+    Similar to above, except that it restores file to temp folder and compares
+    with existing file.
+    Files restored this way are automaticaly deleted after check.
+  
+  -H address
+  --host address
+    TIDStorage server host address.
+    Overrides setting found in configuration_file.
+    Not required if recovering (see above).
+
+  -p port_number
+  --port port_number
+    TIDStorage port nuber.
+    Overrides setting found in configuration_file.
+    Not required if recovering (see above).
+
+  -u formated_url
+  --url formated_url
+    Zope base url, optionnaly with credentials.
+    Overrides setting found in configuration_file.
+    Not required if recovering (see above).
+
+  All others parameters are transmitted to repozo but are partly processed by
+  getopt. To transmit unprocessed parameters to repozo, pass them as an
+  argument.
+"""
+
+from ExchangeProtocol import ExchangeProtocol
+import socket
+import base64
+import imp
+import getopt
+import sys
+import os
+# urllib2 does not support (?) urls containing credentials
+# (http://login:password@...) but it's fine with urllib.
+from urllib import urlopen
+import traceback
+import md5
+import time
+import tempfile
+from struct import pack, unpack
+
+program = sys.argv[0]
+
+def log(message):
+  print message
+
+class TIDClient:
+  def __init__(self, address):
+    # TODO: handle diconnections nicely
+    self._address = address
+    self._socket = socket.socket()
+    self._socket.connect(address)
+    self._protocol_handler = ExchangeProtocol(socket=self._socket)
+
+  def __call__(self):
+    """
+      Return dict currently stored on the server.
+    """
+    self._protocol_handler.send_field('dump')
+    return self._protocol_handler.recv_dict()
+
+def backup(address, known_tid_storage_identifier_dict, repozo_formated_command, zope_formated_url=None):
+  connection = TIDClient(address)
+  to_load = known_tid_storage_identifier_dict.keys()
+  load_count = 2
+  while len(to_load):
+    if load_count < 1:
+      raise ValueError, 'It was impossible to retrieve all required TIDs. Missing: %s' (to_load, )
+    to_load = []
+    load_count -= 1
+    stored_tid_dict = connection()
+    #log(stored_tid_dict)
+    for key, (file_path, storage_path, object_path) in known_tid_storage_identifier_dict.iteritems():
+      if key not in stored_tid_dict and zope_formated_url is not None:
+        to_load.append(key)
+        if object_path is not None:
+          serialize_url = zope_formated_url % (object_path, )
+          log(serialize_url)
+          try:
+            response = urlopen(serialize_url)
+          except Exception, message:
+            # Prevent exceptions from interrupting the backup.
+            # We don't care about how well the web server is working, the only
+            # important thing is to get all TIDs in TIDStorage, and it's checked
+            # later.
+            log(''.join(traceback.format_exception(*sys.exc_info())))
+
+  backup_count = 0
+  total_count = len(known_tid_storage_identifier_dict)
+  for key, (file_path, storage_path, object_path) in known_tid_storage_identifier_dict.iteritems():
+    tid_as_int = stored_tid_dict[key] + 1
+    tid = base64.encodestring(pack('>Q', tid_as_int)).rstrip()
+    repozo_command = repozo_formated_command % (storage_path, file_path, tid)
+    if not os.access(storage_path, os.R_OK):
+      os.makedirs(storage_path)
+    log('Runing %r...' % (repozo_command, ))
+    status = os.system(repozo_command)
+    status = os.WEXITSTATUS(status)
+    if status == 0:
+      backup_count += 1
+    else:
+      log('Error occured while saving %s: exit status=%i' % (file_path, status))
+  log('Saved %i FileStorages out of %i.' % (backup_count, total_count))
+  return total_count - backup_count
+
+def get_md5_diggest(file_instance, length):
+  BLOCK_SIZE=512
+  file_instance.seek(0)
+  md5sum = md5.new()
+  read = file_instance.read
+  update = md5sum.update
+  while length > 0:
+    to_read = min(BLOCK_SIZE, length)
+    buffer = read(to_read)
+    if len(buffer) != to_read:
+      log('Warning: read %i instead of requiested %i, stopping read' % (len(buffer), to_read))
+      length = 0
+    else:
+      length -= to_read
+    update(buffer)
+  return md5sum.hexdigest()
+
+def recover(known_tid_storage_identifier_dict, repozo_formated_command, check=False):
+  recovered_count = 0
+  total_count = len(known_tid_storage_identifier_dict)
+  for key, (file_path, storage_path, object_path) in known_tid_storage_identifier_dict.iteritems():
+    if not os.access(storage_path, os.R_OK):
+      log('Warning: unable to recover %s because %s is missing/unreadable.' % (file_path, storage_path))
+      continue
+    if check:
+      original_file_path = file_path
+      file_path = os.path.join(tempfile.gettempdir(), os.path.basename(file_path))
+    repozo_command = repozo_formated_command % (storage_path, file_path)
+    status = os.system(repozo_command)
+    status = os.WEXITSTATUS(status)
+    if status == 0:
+      recovered_count += 1
+    else:
+      log('Error occured while recovering %s: exit status=%i' % (file_path, status))
+    if check:
+      log('Info: Comparing restored %s with original %s' % (file_path, original_file_path))
+      recovered_file = open(file_path, 'r')
+      original_file = open(original_file_path, 'r')
+      try:
+        recovered_file.seek(0, 2)
+        original_file.seek(0, 2)
+        recovered_file_length = recovered_file.tell()
+        original_file_length = original_file.tell()
+        checked_length = recovered_file_length
+        if recovered_file_length < original_file_length:
+          log('Info: Shorter than original: -%i bytes (-%.02f%%)' % \
+              (original_file_length - recovered_file_length,
+               1 - (float(recovered_file_length) / original_file_length)))
+        elif recovered_file_length > original_file_length:
+          log('ERROR: Longer than original: +%i bytes (+%.02f%%). Was original packed since backup ?' % \
+              (recovered_file_length - original_file_length,
+               float(recovered_file_length) / original_file_length))
+          checked_length = None
+        if checked_length is not None:
+          recovered_file_diggest = get_md5_diggest(recovered_file, checked_length)
+          original_file_diggest = get_md5_diggest(original_file, checked_length)
+          if recovered_file_diggest != original_file_diggest:
+            log('ERROR: Recovered md5 does not match original: %s != %s.' % \
+                (recovered_file_diggest, original_file_diggest))
+      finally:
+        recovered_file.close()
+        original_file.close()
+      os.unlink(file_path)
+
+  log('Restored %i FileStorages out of %i.' % (recovered_count, total_count))
+  return total_count - recovered_count
+
+def usage(code, msg=''):
+  outfp = sys.stderr
+  if code == 0:
+    outfp = sys.stdout
+
+  print >> outfp, __doc__ % globals()
+  if msg:
+    print >> outfp, msg
+
+  sys.exit(code)
+
+def parseargs():
+  try:
+    opts, args = getopt.getopt(sys.argv[1:], 'vQr:FhzMRc:H:p:u:',
+                               ['help', 'verbose', 'quick', 'full',
+                                'gzip', 'print-max-tid', 'repository',
+                                'repozo=', 'config=', 'host=', 'port=',
+                                'url=', 'recover', 'recover_check'])
+  except getopt.error, msg:
+    usage(1, msg)
+
+  class Options:
+    timestamp_file_path = None
+    repozo_file_name = 'repozo.py'
+    configuration_file_name = None
+    repozo_opts = ['-B']
+    host = None
+    port = None
+    base_url = None
+    known_tid_storage_identifier_dict = {}
+    recover = False
+    dry_run = False
+
+  options = Options()
+
+  if args:
+    options.repozo_opts.extend(args)
+
+  for opt, arg in opts:
+    if opt in ('-h', '--help'):
+      usage(0)
+    elif opt in ('-c', '--config'):
+      options.configuration_file_name = arg
+    elif opt == '--repozo':
+      options.repozo_file_name = arg
+    elif opt in ('-R', '--recover', '--recover_check'):
+      options.repozo_opts[0] = '-R'
+      options.recover = True
+      if opt == '--recover_check':
+        options.dry_run = True
+    elif opt in ('-H', '--host'):
+      options.host = arg
+    elif opt in ('-p', '--port'):
+      try:
+        options.port = int(port)
+      except ValueError, msg:
+        usage(1, msg)
+    elif opt in ('-u', '--url'):
+      options.url = arg
+    elif opt in ('-r', '--repository'):
+      options.repozo_opts.append('%s %s' % (opt, arg))
+    else:
+      options.repozo_opts.append(opt)
+
+  if options.configuration_file_name is None:
+    usage(1, 'Either -c or --config is required.')
+
+  configuration_filename, ext = os.path.splitext(os.path.basename(options.configuration_file_name))
+  configuration_path = os.path.dirname(options.configuration_file_name)
+  if len(configuration_path):
+    configuration_path = [configuration_path]
+  else:
+    configuration_path = sys.path
+  file, path, description = imp.find_module(configuration_filename, configuration_path)
+  module = imp.load_module(configuration_filename, file, path, description)
+  file.close()
+  try:
+    options.known_tid_storage_identifier_dict = module.known_tid_storage_identifier_dict
+    options.timestamp_file_path = module.timestamp_file_path
+  except AttributeError, msg:
+    usage(1, msg)
+  for option_id in ('port', 'host', 'base_url'):
+    if getattr(options, option_id) is None:
+      setattr(options, option_id, getattr(module, option_id, None))
+  # XXX: we do not check any option this way, it's too dangerous.
+  #options.repozo_opts.extend(getattr(module, 'repozo_opts', []))
+  if options.port is None:
+    options.port = 9001
+
+  if options.host is None:
+    usage(1, 'Either -H or --host is required (or host value should be set in configuration file).')
+
+  return options
+
+options = parseargs()
+address = (options.host, options.port)
+zope_formated_url = options.base_url
+if options.base_url is not None and '%s' not in zope_formated_url:
+  raise ValueError, 'Given base url (%r) is not properly formated, it must contain one \'%%s\'.' % (zope_formated_url, )
+repozo_formated_command = '%s %s -r "%%s"' % (options.repozo_file_name, ' '.join(options.repozo_opts))
+if options.recover:
+  timestamp_file = open(options.timestamp_file_path, 'r')
+  timestamp = ''
+  read_line = ' '
+  while len(read_line):
+    timestamp = read_line
+    read_line = timestamp_file.readline()
+  timestamp = timestamp.strip('\r\n \t')
+  if timestamp is not None:
+    repozo_formated_command += ' -o "%%s" -D %s' % (timestamp, )
+  result = recover(
+    known_tid_storage_identifier_dict=options.known_tid_storage_identifier_dict,
+    repozo_formated_command=repozo_formated_command,
+    check=options.dry_run)
+else:
+  repozo_formated_command += ' -f "%s" -m "%s"'
+  result = backup(
+    address=address,
+    known_tid_storage_identifier_dict=options.known_tid_storage_identifier_dict,
+    zope_formated_url=zope_formated_url,
+    repozo_formated_command=repozo_formated_command)
+  if result == 0:
+    # Paranoid mode:
+    # Issue a system-wide "sync" command to make sure all files which were saved
+    # are really present on disk.
+    os.system('sync')
+    timestamp_file = open(options.timestamp_file_path, 'a', 0)
+    try:
+      # Borrowed from repozo.
+      timestamp_file.write('\n%04d-%02d-%02d-%02d-%02d-%02d' % time.gmtime()[:6])
+    finally:
+      timestamp_file.close()
+
+sys.exit(result)

Propchange: erp5/trunk/products/TIDStorage/repozo/repozo_tidstorage.py
------------------------------------------------------------------------------
    svn:executable = *

Added: erp5/trunk/products/TIDStorage/repozo/restore_tidstorage.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/repozo/restore_tidstorage.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/repozo/restore_tidstorage.py (added)
+++ erp5/trunk/products/TIDStorage/repozo/restore_tidstorage.py [utf8] Thu Nov  6 17:14:24 2008
@@ -1,0 +1,188 @@
+#!/usr/bin/python
+
+##############################################################################
+#
+# Copyright (c) 2007 Nexedi SARL. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+# Parts of this file are borrowed from Zope 2.8.8 repozo.py script.
+# Essentialy "usage", "parseargs" and parts of "restore" methods.
+# So it's released under the ZPL v2.0, as is Zope 2.8.8 .
+
+"""
+Usage: %(program)s [-h|--help] [-c|--config configuration_file]
+
+  -h
+  --help
+    Display this help and exit.
+
+  -c configuration_file
+  --config configuration_file
+    Use given file as configuration file.
+    It must be a python file.
+    Recquired if neither -h nor --help are given.
+"""
+
+import imp
+import getopt
+import sys
+import os
+# urllib2 does not support (?) urls containing credentials
+# (http://login:password@...) but it's fine with urllib.
+from struct import pack
+import shutil
+from ZODB.FileStorage import FileStorage
+
+program = sys.argv[0]
+
+def log(message):
+  print message
+
+def parse(status_file):
+  tid_log = open(status_file)
+  content = {}
+  last_timestamp = None
+
+  line = tid_log.readline()
+  while line != '':
+    split_line = line.split(' ', 2)
+    assert len(split_line) == 3, repr(split_line)
+    line_timestamp, line_type, line_dict = split_line
+    line_timestamp = float(line_timestamp)
+    assert line_type in ('f', 'd'), repr(line_type)
+    if last_timestamp is None:
+      last_timestamp = line_timestamp
+    else:
+      assert last_timestamp < line_timestamp, '%r < %r' % (last_timestamp, line_timestamp)
+    line_dict = eval(line_dict, None)
+    assert isinstance(line_dict, dict), type(line_dict)
+    assert len(line_dict), repr(line_dict)
+    if line_type == 'd':
+      for key, value in line_dict.iteritems():
+        if key in content:
+          assert content[key] < value, '%r < %r' % (content[key], value)
+        content[key] = value
+    elif line_type == 'f':
+      for key, value in content.iteritems():
+        assert key in line_dict, repr(key)
+        assert value <= line_dict[key], '%r <= %r' % (value, line_dict[key])
+      content = line_dict
+    line = tid_log.readline()
+  return content
+
+READCHUNK = 10 * 1024 * 1024
+
+def recover(data_fs_backup_path_dict, status_file):
+  last_tid_dict = parse(status_file)
+  for storage_id, (file_path, backup_path) in data_fs_backup_path_dict.iteritems():
+    # Derived from repozo (function=do_full_backup)
+    # TODO: optimise to read backup only once.
+    can_restore = False
+    if os.path.exists(backup_path):
+      if os.path.exists(file_path):
+        print 'Both original and backup files exist for %r. If previous restoration was successful, you should delete the backup for this restoration to take place. Original: %r Backup: %r' % (storage_id, file_path, backup_path)
+      else:
+        print 'Only backup file is available for %r: %r. Assuming it\'s ok and restoring to %r' % (storage_id, backup_path, file_path)
+        can_restore = True
+    else:
+      if os.path.exists(file_path):
+        sys.stdout.write('Copying %r to %r... ' % (file_path, backup_path))
+        shutil.copy(file_path, backup_path)
+        initial_size = stat(file_path).st_size
+        final_size = stat(backup_path).st_size
+        if initial_size == final_size:
+          can_restore = True
+          print 'Done.'
+        else:
+          print 'Backup size %i differs from original size %i. Is the original file (%r) still in use ? Is there enough free disk space at destination (%r) ?' % (final_size, initial_size, file_path, backup_path)
+      else:
+        print 'Cannot find any file for %r: %r and %r do not exist.' % (storage_id, file_path, backup_path)
+    if can_restore:
+      last_tid = last_tid_dict[storage_id] + 1
+      tid = pack('>Q', last_tid)
+      # Find the file position of the last completed transaction.
+      fs = FileStorage(backup_path, read_only=True, stop=tid)
+      # Note that the FileStorage ctor calls read_index() which scans the file
+      # and returns "the position just after the last valid transaction record".
+      # getSize() then returns this position, which is exactly what we want,
+      # because we only want to copy stuff from the beginning of the file to the
+      # last valid transaction record.
+      pos = fs.getSize()
+      fs.close()
+      print 'Restoring backup: %s bytes (transaction %r) from %s to %s' % (pos, tid, backup_path, file_path)
+      source_file = open(backup_path, 'rb')
+      destination_file = open(file_path, 'wb')
+      while pos:
+        todo = min(READCHUNK, pos)
+        data = source_file.read(todo)
+        if not data:
+          print 'Unexpected end of data stream (should contain %i more bytes)' % (pos, )
+          break
+        destination_file.write(data)
+        pos -= len(data)
+      destination_file.close()
+      source_file.close()
+    else:
+      print 'Skipping restoration of %r (%r).' % (file_path, storage_id)
+
+def usage(code, msg=''):
+  outfp = sys.stderr
+  if code == 0:
+    outfp = sys.stdout
+
+  print >> outfp, __doc__ % globals()
+  if msg:
+    print >> outfp, msg
+
+  sys.exit(code)
+
+def parseargs():
+  try:
+    opts, args = getopt.getopt(sys.argv[1:], 'hc:',
+                               ['help', 'config='])
+  except getopt.error, msg:
+    usage(1, msg)
+
+  class Options:
+    configuration_file_name = None
+    status_file = None
+
+  options = Options()
+
+  for opt, arg in opts:
+    if opt in ('-h', '--help'):
+      usage(0)
+    elif opt in ('-c', '--config'):
+      options.configuration_file_name = arg
+
+  if options.configuration_file_name is None:
+    usage(1, 'Either -c or --config is required.')
+
+  configuration_filename, ext = os.path.splitext(os.path.basename(options.configuration_file_name))
+  configuration_path = os.path.dirname(options.configuration_file_name)
+  if len(configuration_path):
+    configuration_path = [configuration_path]
+  else:
+    configuration_path = sys.path
+  file, path, description = imp.find_module(configuration_filename, configuration_path)
+  module = imp.load_module(configuration_filename, file, path, description)
+  file.close()
+  try:
+    options.data_fs_backup_path_dict = module.data_fs_backup_path_dict
+    options.status_file = module.status_file
+  except AttributeError, msg:
+    usage(1, msg)
+  return options
+
+options = parseargs()
+recover(
+  data_fs_backup_path_dict=options.data_fs_backup_path_dict,
+  status_file=options.status_file)
+

Propchange: erp5/trunk/products/TIDStorage/repozo/restore_tidstorage.py
------------------------------------------------------------------------------
    svn:executable = *

Added: erp5/trunk/products/TIDStorage/repozo/sample_configuration.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/repozo/sample_configuration.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/repozo/sample_configuration.py (added)
+++ erp5/trunk/products/TIDStorage/repozo/sample_configuration.py [utf8] Thu Nov  6 17:14:24 2008
@@ -1,0 +1,35 @@
+# COMMON
+# This part is used both by server_v2.py and repozo_tidstorage_v2.py
+known_tid_storage_identifier_dict = {
+  "((('localhost', 8200),), '2')":
+    ('/home/vincent/zeo2/var2/Data.fs',
+     '/home/vincent/tmp/repozo/z22',
+     'foo_test'),
+  "((('localhost', 8200),), '1')":
+    ('/home/vincent/zeo2/var/Data.fs',
+     '/home/vincent/tmp/repozo/z21',
+     'bar_test'),
+  "((('localhost', 8100),), '1')":
+    ('/home/vincent/zeo1/var/Data.fs',
+     '/home/vincent/tmp/repozo/z11',
+     'baz_test'),
+}
+base_url = 'http://localhost:5080/erp5/%s/modifyContext'
+port = 9001
+host = '127.0.0.1'
+
+# SERVER
+# This part is only used by server_v2.py
+#logfile_name = 'tidstorage.log'
+#pidfile_name = 'tidstorage.pid'
+#fork = False
+#setuid = None
+#setgid = None
+status_file = 'tidstorage.tid'
+burst_period = 30
+full_dump_period = 300
+
+# REPOZO_TIDSTORAGE
+# This part is only used by repozo_tidstorage_v2.py
+timestamp_file_path = 'repozo_tidstorage_timestamp.log'
+

Added: erp5/trunk/products/TIDStorage/tests/testTIDServer.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/tests/testTIDServer.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/tests/testTIDServer.py (added)
+++ erp5/trunk/products/TIDStorage/tests/testTIDServer.py [utf8] Thu Nov  6 17:14:24 2008
@@ -1,0 +1,256 @@
+#!/usr/bin/python
+
+from ExchangeProtocol import ExchangeProtocol
+import traceback
+import socket
+import sys
+
+assert len(sys.argv) == 3, 'Requires exactly 2 arguments: <address> <port>'
+
+address = sys.argv[1]
+port = int(sys.argv[2])
+
+class TIDClient:
+  def __init__(self, address):
+    self._to_server = socket.socket()
+    self._to_server.connect(address)
+    self._exchange_protocol = ExchangeProtocol(self._to_server)
+
+  def _dump(self, test_id=None):
+    self._exchange_protocol.send_field('dump')
+    received_dict = self._exchange_protocol.recv_dict()
+    if test_id is None:
+      result = received_dict
+    else:
+      id_len = len(test_id) + 1 # Add 1 to strip underscore.
+      result = dict([(key[id_len:], value) \
+                     for key, value in received_dict.iteritems() \
+                     if key.startswith(test_id)])
+    return dict([(key, int(value)) for key, value in result.iteritems()])
+
+  def dump(self, test_id):
+    return self._dump(test_id=test_id)
+
+  def dump_all(self):
+    return self._dump()
+
+  def begin(self, test_id, transaction_id, storage_id_list):
+    self._exchange_protocol.send_field('begin')
+    self._exchange_protocol.send_field(transaction_id)
+    internal_storage_id_list = ['%s_%s' % (test_id, x) \
+                                for x in storage_id_list]
+    self._exchange_protocol.send_list(internal_storage_id_list)
+
+  def abort(self, test_id, transaction_id):
+    self._exchange_protocol.send_field('abort')
+    self._exchange_protocol.send_field(transaction_id)
+
+  def commit(self, test_id, transaction_id, storage_tid_dict):
+    self._exchange_protocol.send_field('commit')
+    self._exchange_protocol.send_field(transaction_id)
+    internal_storage_tid_dict = {}
+    for key, value in storage_tid_dict.iteritems():
+      internal_storage_tid_dict['%s_%s' % (test_id, key)] = value
+    self._exchange_protocol.send_dict(internal_storage_tid_dict)
+
+class TestTIDServerV2:
+  def __init__(self, address, port):
+    self._client = TIDClient((address, port))
+ 
+  def assertEqual(self, value, target):
+    assert value == target, 'value %r does not match target %r' % (value, target)
+ 
+  def testInitialValue(self, test_id):
+    """
+      Check that the storage is empty
+    """
+    self.assertEqual(self._client.dump_all(), {})
+
+  def testScenario1(self, test_id):
+    """
+      Simple begin - commit case.
+    """
+    storage_tid_dict = {'s1': 1}
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.begin(test_id, 't1', storage_tid_dict.keys())
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.commit(test_id, 't1', storage_tid_dict)
+    self.assertEqual(self._client.dump(test_id), storage_tid_dict)
+
+  def testScenario2(self, test_id):
+    """
+      Simple begin - abort case.
+    """
+    storage_tid_dict = {'s1': 1}
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.begin(test_id, 't1', storage_tid_dict.keys())
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.abort(test_id, 't1')
+    self.assertEqual(self._client.dump(test_id), {})
+
+  def testScenario3(self, test_id):
+    """
+      2 concurent transactions impacting a common storage.
+      Second transaction begins after first does, and commits before
+      first does.
+    """
+    t1_storage_tid_dict = {'s1': 1, 's2': 1}
+    t2_storage_tid_dict = {'s1': 2, 's3': 1}
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.commit(test_id, 't2', t2_storage_tid_dict)
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.commit(test_id, 't1', t1_storage_tid_dict)
+    self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 1, 's3': 1})
+
+  def testScenario4(self, test_id):
+    """
+      3 concurent transactions.
+      Transactions 1 and 2 impact same storage s1.
+      Transaction 3 impacts storage s3 after transaction 2 commited.
+      Still, as storage 3  was part of a non-commitable-yet transaction,
+      it must not be commited untill all blockable (here, t1) transaction have
+      ended.
+    """
+    t1_storage_tid_dict = {'s1': 1, 's2': 2}
+    t2_storage_tid_dict = {'s1': 2, 's3': 1}
+    t3_storage_tid_dict = {'s3': 1}
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.commit(test_id, 't2', t2_storage_tid_dict)
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.begin(test_id, 't3', t3_storage_tid_dict.keys())
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.commit(test_id, 't3', t3_storage_tid_dict)
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.commit(test_id, 't1', t1_storage_tid_dict)
+    self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 2, 's3': 1})
+
+  def testScenario4bis(self, test_id):
+    """
+      3 concurent transactions.
+      Transactions 1 and 2 impact same storage s1.
+      Transaction 3 impacts storage s3 after transaction 2 commited.
+      Still, as storage 3  was part of a non-commitable-yet transaction,
+      it must not be commited untill all blockable (here, t1) transaction have
+      ended.
+      In this version, t1 aborts: for example, tpc_vote failed. As the data
+      was already sent to storage, it might be already present on disk (and
+      anyway, tid is not to be used anymore), so it's valid for t2 to commit
+      with tid 2 even if t1 aborted tid 1.
+    """
+    t1_storage_tid_dict = {'s1': 1, 's2': 2}
+    t2_storage_tid_dict = {'s1': 2, 's3': 1}
+    t3_storage_tid_dict = {'s3': 1}
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.commit(test_id, 't2', t2_storage_tid_dict)
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.begin(test_id, 't3', t3_storage_tid_dict.keys())
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.commit(test_id, 't3', t3_storage_tid_dict)
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.abort(test_id, 't1')
+    self.assertEqual(self._client.dump(test_id), {'s1': 2, 's3': 1})
+
+  def testScenario5(self, test_id):
+    """
+      2 concurent transactions impacting a common storage.
+      Second transaction begins after first does, and commits after
+      first does.
+    """
+    t1_storage_tid_dict = {'s1': 2}
+    t2_storage_tid_dict = {'s1': 1, 's2': 1}
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.commit(test_id, 't1', t1_storage_tid_dict)
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.commit(test_id, 't2', t2_storage_tid_dict)
+    self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 1})
+
+  def testScenario6(self, test_id):
+    """
+      2 concurent transactions impacting separate sets of storages.
+      Check that the first commit impacts dump data immediately.
+    """
+    t1_storage_tid_dict = {'s1': 1}
+    t2_storage_tid_dict = {'s2': 1}
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.commit(test_id, 't1', t1_storage_tid_dict)
+    self.assertEqual(self._client.dump(test_id), {'s1': 1})
+    self._client.commit(test_id, 't2', t2_storage_tid_dict)
+    self.assertEqual(self._client.dump(test_id), {'s1': 1, 's2': 1})
+
+  def testScenario7(self, test_id):
+    """
+      3 concurent transactions.
+      t1 and t2 impact a set of different storages.
+      t3 impacts a set of storage containing the ones from t1 and the ones
+      from t2.
+      Check that nothing impacts dump data until everything is commited.
+    """
+    t1_storage_tid_dict = {'s1': 1}
+    t2_storage_tid_dict = {'s2': 2}
+    t3_storage_tid_dict = {'s1': 2, 's2': 2}
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.begin(test_id, 't3', t3_storage_tid_dict.keys())
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.commit(test_id, 't1', t1_storage_tid_dict)
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.commit(test_id, 't2', t2_storage_tid_dict)
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.commit(test_id, 't3', t3_storage_tid_dict)
+    self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 2})
+
+  def testScenario8(self, test_id):
+    """
+      Simple increase case.
+    """
+    self.assertEqual(self._client.dump(test_id), {})
+    t1_storage_tid_dict = {}
+    for s1_value in (1, 2):
+      previous_t1_storage_tid_dict = t1_storage_tid_dict
+      t1_storage_tid_dict = {'s1': s1_value}
+      self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
+      self.assertEqual(self._client.dump(test_id), previous_t1_storage_tid_dict)
+      self._client.commit(test_id, 't1', t1_storage_tid_dict)
+      self.assertEqual(self._client.dump(test_id), t1_storage_tid_dict)
+
+  def run(self):
+    for test_method_id in [x for x in dir(self) if x.startswith('test')]:
+      self.log("Runing %s..." % (test_method_id, ))
+      try:
+        try:
+          getattr(self, test_method_id)(test_id=test_method_id)
+        except AssertionError:
+          self.log('F\n')
+          self.log('\n'.join(traceback.format_exception(*sys.exc_info())))
+      finally:
+        self.log('\n')
+
+  def log(self, message):
+    sys.stdout.write(message)
+
+test = TestTIDServerV2(address, port)
+test.run()
+

Propchange: erp5/trunk/products/TIDStorage/tests/testTIDServer.py
------------------------------------------------------------------------------
    svn:executable = *

Added: erp5/trunk/products/TIDStorage/transaction_transaction.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/transaction_transaction.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/transaction_transaction.py (added)
+++ erp5/trunk/products/TIDStorage/transaction_transaction.py [utf8] Thu Nov  6 17:14:24 2008
@@ -1,0 +1,254 @@
+############################################################################
+#
+# Copyright (c) 2007, 2008 Nexedi SARL and Contributors. All Rights Reserved.
+#                    Vincent Pelletier <vincent at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+#
+##############################################################################
+
+from ExchangeProtocol import ExchangeProtocol
+from transaction._transaction import Transaction
+from zLOG import LOG, WARNING
+import socket
+import thread
+import struct
+import sys
+
+GET_LAST_COMMITED_TID_METHOD_ID = 'getLastCommitedTID'
+TID_STORAGE_ADDRESS = ('127.0.0.1', 9001)
+
+tid_storage = None
+zope_identifier = None
+
+# Borrowed from CMFActivity.ActivityTool.getCurrentNode
+def getZopeId():
+  """ Return current node in form ip:port """
+  global zope_identifier
+  if zope_identifier is None:
+    port = ''
+    from asyncore import socket_map
+    for k, v in socket_map.items():
+      if hasattr(v, 'port'):
+        # see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
+        type = str(getattr(v, '__class__', 'unknown'))
+        if type == 'ZServer.HTTPServer.zhttp_server':
+          port = v.port
+          break
+    assert port != '', 'zhttp_server not started yet'
+    ip = socket.gethostbyname(socket.gethostname())
+    if TID_STORAGE_ADDRESS[0] != '127.0.0.1':
+      assert ip != '127.0.0.1', 'self address must not be 127.0.0.1 if TIDStorage is remote'
+    zope_identifier = '%s:%s' %(ip, port)
+  return zope_identifier
+
+def getFilestorageList(resource_list):
+  return getFilestorageToTIDMapping(resource_list).keys()
+
+def getFilestorageToTIDMapping(resource_list):
+  datafs_tid_update_dict = {}
+  for resource in resource_list:
+    storage = getattr(resource, '_storage', None)
+    if storage is not None:
+      getLastCommitedTID = getattr(storage, GET_LAST_COMMITED_TID_METHOD_ID,
+                                   None)
+      if getLastCommitedTID is not None:
+        tid = getLastCommitedTID()
+        _addr = tuple([tuple(x) for x in getattr(storage, '_addr', [])])
+        _storage = getattr(storage, '_storage', '')
+        datafs_id = repr((_addr, _storage))
+        assert datafs_id not in datafs_tid_update_dict
+        if tid is None:
+          datafs_tid_update_dict[datafs_id] = None
+        else:
+          # unpack stolen from ZODB/utils.py:u64
+          datafs_tid_update_dict[datafs_id] = struct.unpack(">Q", tid)[0]
+  return datafs_tid_update_dict
+
+class BufferedSocket:
+  """
+    Write-only thread-safe buffered socket.
+    Attemps to reconnect at most once per flush.
+  """
+
+  _socket_lock = thread.allocate_lock()
+  _connected = False
+  
+  def __init__(self, address):
+    self._socket = socket.socket()
+    self._address = address
+    self._send_buffer_dict = {}
+
+  def _connect(self):
+    try:
+      self._socket.connect(self._address)
+      self._notifyConnected()
+    except socket.error, message:
+      # We don't want to have an error line per failed connection attemp, to
+      # avoid flooding the logfile.
+      pass
+
+  def _getSendBuffer(self, ident):
+    send_buffer = self._send_buffer_dict.get(ident)
+    if send_buffer is None:
+      send_buffer = self._send_buffer_dict[ident] = []
+    return send_buffer
+
+  def _notifyDisconnected(self, message):
+    if self._connected:
+      self._connected = False
+      LOG('TIDStorage', WARNING, 'Disconnected: %s' % (message, ))
+
+  def _notifyConnected(self):
+    if not self._connected:
+      self._connected = True
+      # Display a log message at WARNING level, so that reconnection message
+      # are visible when disconnection messages are visible, even if it is
+      # not a warning, properly speaking.
+      LOG('TIDStorage', WARNING, 'Connected')
+
+  def send(self, to_send):
+    send_buffer = self._getSendBuffer(thread.get_ident())
+    send_buffer.append(to_send)
+
+  def flush(self):
+    """
+      Flush send buffer and actually send data, with extra checks to behave
+      nicely if connection is broken.
+      Do not retry to send if something goes wrong (data is then lost !).
+      Here, most important thing is speed, not data.
+      Serialize usage.
+    """
+    ident = thread.get_ident()
+    self._socket_lock.acquire()
+    try:
+      if not self._connected:
+        self._connect()
+      if self._connected:
+        try:
+          self._socket.sendall(''.join(self._getSendBuffer(ident)))
+        except socket.error, message:
+          self._notifyDisconnected(message)
+          try:
+            self._socket.shutdown(socket.SHUT_RDWR)
+          except socket.error:
+            self._socket.close()
+          self._socket = socket.socket()
+    finally:
+      self._socket_lock.release()
+    self._send_buffer_dict[ident] = []
+
+class TIDClient:
+  def __init__(self, address):
+    self._buffered_socket = BufferedSocket(address)
+    self._field_exchange = ExchangeProtocol(socket=self._buffered_socket)
+
+  def commit(self, tid_update_dict):
+    """
+      Send given dict to TIDStorage server.
+    """
+    self._send_command('commit')
+    self._field_exchange.send_dict(tid_update_dict)
+    self._buffered_socket.flush()
+
+  def begin(self, storage_id_list):
+    """
+      Inform TIDStorage connection tracking that commit was initiated.
+    """
+    self._send_command('begin')
+    self._field_exchange.send_list(storage_id_list)
+    self._buffered_socket.flush()
+  
+  def abort(self):
+    """
+      Inform TIDStorage connection tracking that commit was aborted.
+    """
+    self._send_command('abort')
+    self._buffered_socket.flush()
+
+  def _send_command(self, command):
+    """
+      Every command must be followed by an identifier.
+      This identifier is used to track transactions, so the same identifier
+      must not be used twice at the same time, but can be reused later.
+    """
+    self._field_exchange.send_field(command)
+    self._field_exchange.send_field('%s_%x' % (getZopeId(), thread.get_ident()))
+
+original__commitResources = Transaction._commitResources
+def _commitResources(self, *args, **kw):
+  """
+    Hook Transaction's _commitResources.
+
+    Before:
+     - Initialise TIDClient if needed
+     - Check if there is any storage we are interested in in current commit
+     - If so, issue a begin
+    
+    After (2 cases):
+     - original__commitResources raised:
+       - Issue an abort
+     - otherwise:
+       - Issue a commit
+
+    Note to editors: Prevent your code from raising anything ! This method
+    MUST NOT raise any exception, except that it MUST NOT hide any exception
+    raised by original__commitResources.
+    """
+  has_storages = False
+  try:
+    global tid_storage
+    if tid_storage is None:
+      tid_storage = TIDClient(TID_STORAGE_ADDRESS)
+    filestorage_list = getFilestorageList(self._resources)
+    if len(filestorage_list):
+      has_storages = True
+      tid_storage.begin(filestorage_list)
+  except:
+    LOG('TIDStorage _commitResources', WARNING, 'Exception in begin phase', error=sys.exc_info())
+  try:
+    result = original__commitResources(self, *args, **kw)
+  except:
+    if has_storages:
+      exception = sys.exc_info()
+      try:
+        tid_storage.abort()
+      except:
+        LOG('TIDStorage _commitResources', WARNING, 'Exception in abort phase', error=sys.exc_info())
+      # Re-raise original exception, in case sendTIDCommitAbort tainted
+      # last exception value.
+      raise exception[0], exception[1], exception[2]
+    else:
+      raise
+  else:
+    if has_storages:
+      # Now that everything has been commited, all exceptions relative to added
+      # code must be swalowed (but still reported) to avoid confusing transaction
+      # system.
+      try:
+        tid_storage.commit(getFilestorageToTIDMapping(self._resources))
+      except:
+        LOG('TIDStorage _commitResources', WARNING, 'Exception in commit phase', error=sys.exc_info())
+  return result
+ 
+Transaction._commitResources = _commitResources
+

Added: erp5/trunk/products/TIDStorage/utils/check_tid_log.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/utils/check_tid_log.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/utils/check_tid_log.py (added)
+++ erp5/trunk/products/TIDStorage/utils/check_tid_log.py [utf8] Thu Nov  6 17:14:24 2008
@@ -1,0 +1,71 @@
+#!/usr/bin/python
+##############################################################################
+#
+# Copyright (c) 2008 Nexedi SARL and Contributors. All Rights Reserved.
+#                    Vincent Pelletier <vincent at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+#
+##############################################################################
+
+# Checks the sanity of a tidstorage TID log provided via stdin.
+# Exit status:
+#  0 Success
+#  1 Failure
+# Error is displayed on stderr.
+# On success, final tid values for each storage is displayed on stdout.
+
+import sys
+content = {}
+last_timestamp = None
+
+line = sys.stdin.readline()
+while line != '':
+  split_line = line.split(' ', 2)
+  assert len(split_line) == 3, repr(split_line)
+  line_timestamp, line_type, line_dict = split_line
+  line_timestamp = float(line_timestamp)
+  assert line_type in ('f', 'd'), repr(line_type)
+  if last_timestamp is None:
+    last_timestamp = line_timestamp
+  else:
+    assert last_timestamp < line_timestamp, '%r < %r' % (last_timestamp, line_timestamp)
+  line_dict = eval(line_dict, None)
+  assert isinstance(line_dict, dict), type(line_dict)
+  assert len(line_dict), repr(line_dict)
+  if line_type == 'd':
+    for key, value in line_dict.iteritems():
+      if key in content:
+        assert content[key] < value, '%r < %r' % (content[key], value)
+      content[key] = value
+  elif line_type == 'f':
+    for key, value in content.iteritems():
+      assert key in line_dict, repr(key)
+      assert value <= line_dict[key], '%r <= %r' % (value, line_dict[key])
+    content = line_dict
+  line = sys.stdin.readline()
+
+key_list = content.keys()
+key_list.sort()
+for key in key_list:
+  print '%r %r' % (key, content[key])
+

Propchange: erp5/trunk/products/TIDStorage/utils/check_tid_log.py
------------------------------------------------------------------------------
    svn:executable = *

Added: erp5/trunk/products/TIDStorage/utils/tid_log_to_full.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/utils/tid_log_to_full.py?rev=24513&view=auto
==============================================================================
--- erp5/trunk/products/TIDStorage/utils/tid_log_to_full.py (added)
+++ erp5/trunk/products/TIDStorage/utils/tid_log_to_full.py [utf8] Thu Nov  6 17:14:24 2008
@@ -1,0 +1,69 @@
+#!/usr/bin/python
+##############################################################################
+#
+# Copyright (c) 2008 Nexedi SARL and Contributors. All Rights Reserved.
+#                    Vincent Pelletier <vincent at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+#
+##############################################################################
+
+# Transforms a TIDStorage TID log provided on stdin, which might contain
+# full statuses and/or incremental changes, and provides a version on stdout
+# containing only full statuses.
+# Also, does sanity checks on the given file.
+# Exit status:
+#  0 Success
+#  1 Failure
+
+import sys
+content = {}
+last_timestamp = None
+
+line = sys.stdin.readline()
+while line != '':
+  split_line = line.split(' ', 2)
+  assert len(split_line) == 3, repr(split_line)
+  line_timestamp, line_type, line_dict = split_line
+  line_timestamp = float(line_timestamp)
+  assert line_type in ('f', 'd'), repr(line_type)
+  if last_timestamp is None:
+    last_timestamp = line_timestamp
+  else:
+    assert last_timestamp < line_timestamp, '%r < %r' % (last_timestamp, line_timestamp)
+  line_dict = eval(line_dict, None)
+  assert isinstance(line_dict, dict), type(line_dict)
+  assert len(line_dict), repr(line_dict)
+  if line_type == 'd':
+    for key, value in line_dict.iteritems():
+      if key in content:
+        assert content[key] < value, '%r < %r' % (content[key], value)
+      content[key] = value
+    print '%r f %r' % (line_timestamp, content)
+  elif line_type == 'f':
+    for key, value in content.iteritems():
+      assert key in line_dict, repr(key)
+      assert value <= line_dict[key], '%r <= %r' % (value, line_dict[key])
+    content = line_dict
+    print line.strip()
+  line = sys.stdin.readline()
+

Propchange: erp5/trunk/products/TIDStorage/utils/tid_log_to_full.py
------------------------------------------------------------------------------
    svn:executable = *




More information about the Erp5-report mailing list