[Erp5-report] r24541 - in /erp5/trunk/products/TIDStorage: bin/ tests/

nobody at svn.erp5.org nobody at svn.erp5.org
Mon Nov 10 11:54:38 CET 2008


Author: vincent
Date: Mon Nov 10 11:54:36 2008
New Revision: 24541

URL: http://svn.erp5.org?rev=24541&view=rev
Log:
No TID must be available to backup tools until bootstrap is over.
 - Do not update _storage during bootstrap
 - Make bootstrap process check _transcient (requires a new accessor)
 - Add a test (requires new protocol command to check bootstrap status)

Modified:
    erp5/trunk/products/TIDStorage/bin/tidstorage.py
    erp5/trunk/products/TIDStorage/tests/testTIDServer.py

Modified: erp5/trunk/products/TIDStorage/bin/tidstorage.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/bin/tidstorage.py?rev=24541&r1=24540&r2=24541&view=diff
==============================================================================
--- erp5/trunk/products/TIDStorage/bin/tidstorage.py [utf8] (original)
+++ erp5/trunk/products/TIDStorage/bin/tidstorage.py [utf8] Mon Nov 10 11:54:36 2008
@@ -179,6 +179,9 @@
     tid_dict = self._field_exchange.recv_dict()
     self._tid_storage.commit(identifier, tid_dict)
 
+  def bootstraped(self):
+    self._field_exchange.send_int(has_bootstraped and 1 or 0)
+
   def handle(self):
     global tid_storage
     self._tid_storage = tid_storage
@@ -187,7 +190,8 @@
       'begin': self.begin,
       'abort': self.abort,
       'commit': self.commit,
-      'dump': self.dump
+      'dump': self.dump,
+      'bootstraped': self.bootstraped,
     }
     self.log('Connected')
     try:
@@ -327,38 +331,36 @@
           storage_id_set = self._storage_id_to_storage_id_set_dict[storage_id]
           # Raises if not found
           storage_id_set.remove(storage_id)
-      if self._tid_file is not None:
-        now = time.time()
-        can_full_dump = has_bootstraped and (self._next_full_dump is not None) and (self._next_full_dump < now)
-        can_dump = (not can_full_dump) and (self._next_dump is not None) and (self._next_dump < now)
-        record_for_dump = can_dump or (self._next_dump is not None)
-        append_to_file = has_bootstraped and (can_dump or can_full_dump)
+      if has_bootstraped:
+        if self._tid_file is not None:
+          now = time.time()
+          can_full_dump = (self._next_full_dump is not None) and (self._next_full_dump < now)
+          can_dump = (not can_full_dump) and (self._next_dump is not None) and (self._next_dump < now)
+          record_for_dump = can_dump or (self._next_dump is not None)
+          append_to_file = (can_dump or can_full_dump)
+        else:
+          append_to_file = record_for_dump = can_dump = can_full_dump = False
+        for key, value in self._storage_id_to_storage_id_set_dict.iteritems():
+          if len(value) == 0 and key in self._transcient:
+            self._storage[key] = self._transcient.pop(key)
+            if record_for_dump:
+              self._since_last_burst.add(key)
+        if append_to_file:
+          if can_full_dump:
+            to_dump_dict = self._storage
+            dump_code = 'f'
+          else:
+            to_dump_dict = dict([(key, self._storage[key]) for key in self._since_last_burst])
+            dump_code = 'd'
+          if len(to_dump_dict):
+            self._tid_file.write('%.02f %s %r\n' % (now, dump_code, to_dump_dict))
+            if can_full_dump:
+              self._next_full_dump = now + self._full_dump_period
+            if self._next_dump is not None:
+              self._next_dump = now + self._burst_period
+              self._since_last_burst.clear()
       else:
-        append_to_file = record_for_dump = can_dump = can_full_dump = False
-      for key, value in self._storage_id_to_storage_id_set_dict.iteritems():
-        if len(value) == 0 and key in self._transcient:
-          self._storage[key] = self._transcient.pop(key)
-          if record_for_dump:
-            self._since_last_burst.add(key)
-      if append_to_file:
-        if can_full_dump:
-          to_dump_dict = self._storage
-          dump_code = 'f'
-        else:
-          to_dump_dict = dict([(key, self._storage[key]) for key in self._since_last_burst])
-          dump_code = 'd'
-        if len(to_dump_dict):
-          self._tid_file.write('%.02f %s %r\n' % (now, dump_code, to_dump_dict))
-          if can_full_dump:
-            self._next_full_dump = now + self._full_dump_period
-          if self._next_dump is not None:
-            self._next_dump = now + self._burst_period
-            self._since_last_burst.clear()
-      if not has_bootstraped:
         doBootstrap()
-      #if len(self._storage_id_to_transaction_id_list_dict) == 0:
-      #  self._storage.update(self._transcient)
-      #  self._transcient.clear()
     finally:
       self._storage_id_lock.release()
 
@@ -366,6 +368,13 @@
     self._storage_id_lock.acquire()
     try:
       return self._storage.copy()
+    finally:
+      self._storage_id_lock.release()
+
+  def dump_transcient(self):
+    self._storage_id_lock.acquire()
+    try:
+      return self._transcient.copy()
     finally:
       self._storage_id_lock.release()
 
@@ -433,12 +442,12 @@
         else:
           storage_id_to_object_path_dict[key] = mountpoint
       target_storage_id_set = sets.ImmutableSet(storage_id_to_object_path_dict.keys())
-      known_storage_id_set = sets.ImmutableSet(tid_storage.dump().keys())
+      known_storage_id_set = sets.ImmutableSet(tid_storage.dump_transcient().keys())
       to_check_storage_id_set = target_storage_id_set - known_storage_id_set
       while len(to_check_storage_id_set) and can_bootstrap:
         serialize_url = None
         for storage_id in to_check_storage_id_set:
-          if can_bootstrap and storage_id not in tid_storage.dump().keys():
+          if can_bootstrap and storage_id not in tid_storage.dump_transcient().keys():
             serialize_url = base_url % (storage_id_to_object_path_dict[storage_id], )
             try:
               # Query a Zope, which will contact this process in return to store
@@ -450,7 +459,7 @@
               log('Opened %r: %r' % (serialize_url, page.read()))
         # Let some time for zope to contact TIDStorage back and fill the gaps.
         time.sleep(5)
-        known_storage_id_set = sets.ImmutableSet(tid_storage.dump().keys())
+        known_storage_id_set = sets.ImmutableSet(tid_storage.dump_transcient().keys())
         to_check_storage_id_set = target_storage_id_set - known_storage_id_set
         if len(to_check_storage_id_set):
           log('Bootstrap in progress... Mising storages: %r' % (to_check_storage_id_set, ))

Modified: erp5/trunk/products/TIDStorage/tests/testTIDServer.py
URL: http://svn.erp5.org/erp5/trunk/products/TIDStorage/tests/testTIDServer.py?rev=24541&r1=24540&r2=24541&view=diff
==============================================================================
--- erp5/trunk/products/TIDStorage/tests/testTIDServer.py [utf8] (original)
+++ erp5/trunk/products/TIDStorage/tests/testTIDServer.py [utf8] Mon Nov 10 11:54:36 2008
@@ -3,6 +3,7 @@
 from ExchangeProtocol import ExchangeProtocol
 import traceback
 import socket
+import time
 import sys
 
 assert len(sys.argv) == 3, 'Requires exactly 2 arguments: <address> <port>'
@@ -53,6 +54,14 @@
       internal_storage_tid_dict['%s_%s' % (test_id, key)] = value
     self._exchange_protocol.send_dict(internal_storage_tid_dict)
 
+  def bootstraped(self):
+    self._exchange_protocol.send_field('bootstraped')
+    return self._exchange_protocol.recv_int()
+
+  def waitForBootstrap(self):
+    while not self.bootstraped():
+      time.sleep(0.1)
+
 class TestTIDServerV2:
   def __init__(self, address, port):
     self._client = TIDClient((address, port))
@@ -60,13 +69,35 @@
   def assertEqual(self, value, target):
     assert value == target, 'value %r does not match target %r' % (value, target)
  
-  def testInitialValue(self, test_id):
+  def test_01_InitialValue(self, test_id):
     """
       Check that the storage is empty
     """
     self.assertEqual(self._client.dump_all(), {})
-
-  def testScenario1(self, test_id):
+  
+  def test_02_Bootstrap(self, test_id):
+    """
+      Trigger bootstrap and check that no value is visible until bootstrap is
+      done.
+    """
+    t1_storage_tid_dict = {'s0': 1}
+    t2_storage_tid_dict = {'s1': 1}
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.begin(test_id, 't1', t1_storage_tid_dict.keys())
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.commit(test_id, 't1', t1_storage_tid_dict)
+    # Bootstrap is runing on the server, nothing is visible yet.
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.waitForBootstrap()
+    # Nothing is available yet, we need one more transaction to happen.
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.begin(test_id, 't2', t2_storage_tid_dict.keys())
+    self.assertEqual(self._client.dump(test_id), {})
+    self._client.commit(test_id, 't2', t2_storage_tid_dict)
+    # Now everything must be available.
+    self.assertEqual(self._client.dump(test_id), {'s0': 1, 's1': 1})
+
+  def test_03_Scenario1(self, test_id):
     """
       Simple begin - commit case.
     """
@@ -77,7 +108,7 @@
     self._client.commit(test_id, 't1', storage_tid_dict)
     self.assertEqual(self._client.dump(test_id), storage_tid_dict)
 
-  def testScenario2(self, test_id):
+  def test_04_Scenario2(self, test_id):
     """
       Simple begin - abort case.
     """
@@ -88,7 +119,7 @@
     self._client.abort(test_id, 't1')
     self.assertEqual(self._client.dump(test_id), {})
 
-  def testScenario3(self, test_id):
+  def test_05_Scenario3(self, test_id):
     """
       2 concurent transactions impacting a common storage.
       Second transaction begins after first does, and commits before
@@ -106,7 +137,7 @@
     self._client.commit(test_id, 't1', t1_storage_tid_dict)
     self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 1, 's3': 1})
 
-  def testScenario4(self, test_id):
+  def test_06_Scenario4(self, test_id):
     """
       3 concurent transactions.
       Transactions 1 and 2 impact same storage s1.
@@ -132,7 +163,7 @@
     self._client.commit(test_id, 't1', t1_storage_tid_dict)
     self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 2, 's3': 1})
 
-  def testScenario4bis(self, test_id):
+  def test_07_Scenario4bis(self, test_id):
     """
       3 concurent transactions.
       Transactions 1 and 2 impact same storage s1.
@@ -162,7 +193,7 @@
     self._client.abort(test_id, 't1')
     self.assertEqual(self._client.dump(test_id), {'s1': 2, 's3': 1})
 
-  def testScenario5(self, test_id):
+  def test_08_Scenario5(self, test_id):
     """
       2 concurent transactions impacting a common storage.
       Second transaction begins after first does, and commits after
@@ -180,7 +211,7 @@
     self._client.commit(test_id, 't2', t2_storage_tid_dict)
     self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 1})
 
-  def testScenario6(self, test_id):
+  def test_09_Scenario6(self, test_id):
     """
       2 concurent transactions impacting separate sets of storages.
       Check that the first commit impacts dump data immediately.
@@ -197,7 +228,7 @@
     self._client.commit(test_id, 't2', t2_storage_tid_dict)
     self.assertEqual(self._client.dump(test_id), {'s1': 1, 's2': 1})
 
-  def testScenario7(self, test_id):
+  def test_10_Scenario7(self, test_id):
     """
       3 concurent transactions.
       t1 and t2 impact a set of different storages.
@@ -222,7 +253,7 @@
     self._client.commit(test_id, 't3', t3_storage_tid_dict)
     self.assertEqual(self._client.dump(test_id), {'s1': 2, 's2': 2})
 
-  def testScenario8(self, test_id):
+  def test_11_Scenario8(self, test_id):
     """
       Simple increase case.
     """




More information about the Erp5-report mailing list