[Neo-report] r2711 jm - in /trunk/neo: client/ client/handlers/ storage/database/

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Apr 12 18:15:59 CEST 2011


Author: jm
Date: Tue Apr 12 18:15:58 2011
New Revision: 2711

Log:
Rewrite storage cache of client

- Stop using a list of (in)validated tid (hence removal of RevisionIndex),
  because it can't work in all cases and would even cause memory leaks.
  For example, this bug could lead to ConflictError with a single client.
  Fixit it also requires that database backends always return the next serial.
- Several performance improvements. The most important one is when the latest
  version of an object is cached: it inherits the access counter of the
  previous one (for the same oid), which gets in turn its counter reset.
- Do not waste CPU evaluating the real size taken by an entry in memory.
  Just use 'len' on the value (which is always a pickle data, i.e. a string).

Removed:
    trunk/neo/client/mq_index.py
Modified:
    trunk/neo/client/app.py
    trunk/neo/client/cache.py
    trunk/neo/client/handlers/master.py
    trunk/neo/storage/database/btree.py
    trunk/neo/storage/database/manager.py
    trunk/neo/storage/database/mysqldb.py

Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Tue Apr 12 18:15:58 2011
@@ -42,12 +42,11 @@ from neo.client.handlers import storage,
 from neo.lib.dispatcher import Dispatcher, ForgottenPacket
 from neo.client.poll import ThreadedPoll, psThreadedPoll
 from neo.client.iterator import Iterator
-from neo.client.mq import MQ
+from neo.client.cache import ClientCache
 from neo.client.pool import ConnectionPool
 from neo.lib.util import u64, parseMasterList
 from neo.lib.profiling import profiler_decorator, PROFILING_ENABLED
 from neo.lib.debug import register as registerLiveDebugger
-from neo.client.mq_index import RevisionIndex
 from neo.client.container import ThreadContainer, TransactionContainer
 
 if PROFILING_ENABLED:
@@ -93,9 +92,7 @@ class Application(object):
 
         # no self-assigned UUID, primary master will supply us one
         self.uuid = None
-        self.mq_cache = MQ()
-        self.cache_revision_index = RevisionIndex()
-        self.mq_cache.addIndex(self.cache_revision_index)
+        self._cache = ClientCache()
         self.new_oid_list = []
         self.last_oid = '\0' * 8
         self.storage_event_handler = storage.StorageEventHandler(self)
@@ -432,20 +429,17 @@ class Application(object):
 
         self._load_lock_acquire()
         try:
-            try:
-                return self._loadFromCache(oid, serial, tid)
-            except KeyError:
-                pass
-            data, start_serial, end_serial = self._loadFromStorage(oid, serial,
-                tid)
-            self._cache_lock_acquire()
-            try:
-                self.mq_cache[(oid, start_serial)] = data, end_serial
-            finally:
-                self._cache_lock_release()
-            if data == '':
-                raise NEOStorageCreationUndoneError(dump(oid))
-            return data, start_serial, end_serial
+            result = self._loadFromCache(oid, serial, tid)
+            if not result:
+                result = self._loadFromStorage(oid, serial, tid)
+                self._cache_lock_acquire()
+                try:
+                    self._cache.store(oid, *result)
+                finally:
+                    self._cache_lock_release()
+                if result[0] == '':
+                    raise NEOStorageCreationUndoneError(dump(oid))
+            return result
         finally:
             self._load_lock_release()
 
@@ -479,24 +473,17 @@ class Application(object):
         return data, tid, next_tid
 
     @profiler_decorator
-    def _loadFromCache(self, oid, at_tid, before_tid):
+    def _loadFromCache(self, oid, at_tid=None, before_tid=None):
         """
-        Load from local cache, raising KeyError if not found.
+        Load from local cache, return None if not found.
         """
         self._cache_lock_acquire()
         try:
-            if at_tid is not None:
-                tid = at_tid
-            elif before_tid is not None:
-                tid = self.cache_revision_index.getSerialBefore(oid,
-                    before_tid)
-            else:
-                tid = self.cache_revision_index.getLatestSerial(oid)
-            if tid is None:
-                raise KeyError
-            # Raises KeyError on miss
-            data, next_tid = self.mq_cache[(oid, tid)]
-            return (data, tid, next_tid)
+            if at_tid:
+                result = self._cache.load(oid, at_tid + '*')
+                assert not result or result[1] == at_tid
+                return result
+            return self._cache.load(oid, before_tid)
         finally:
             self._cache_lock_release()
 
@@ -808,14 +795,7 @@ class Application(object):
             # Update cache
             self._cache_lock_acquire()
             try:
-                mq_cache = self.mq_cache
-                update = mq_cache.update
-                def updateNextSerial(value):
-                    data, next_tid = value
-                    assert next_tid is None, (dump(oid), dump(base_tid),
-                        dump(next_tid))
-                    return (data, tid)
-                get_baseTID = txn_context['object_base_serial_dict'].get
+                cache = self._cache
                 for oid, data in txn_context['data_dict'].iteritems():
                     if data is None:
                         # this is just a remain of
@@ -823,16 +803,10 @@ class Application(object):
                         # was modified).
                         continue
                     # Update ex-latest value in cache
-                    base_tid = get_baseTID(oid)
-                    try:
-                        update((oid, base_tid), updateNextSerial)
-                    except KeyError:
-                        pass
-                    if data == '':
-                        self.cache_revision_index.invalidate([oid], tid)
-                    else:
+                    cache.invalidate(oid, tid)
+                    if data:
                         # Store in cache with no next_tid
-                        mq_cache[(oid, tid)] = (data, None)
+                        cache.store(oid, data, tid, None)
             finally:
                 self._cache_lock_release()
             txn_container.delete(transaction)
@@ -1105,7 +1079,7 @@ class Application(object):
         # by a pack), so don't bother invalidating on other clients.
         self._cache_lock_acquire()
         try:
-            self.mq_cache.clear()
+            self._cache.clear()
         finally:
             self._cache_lock_release()
 

Modified: trunk/neo/client/cache.py
==============================================================================
--- trunk/neo/client/cache.py [iso-8859-1] (original)
+++ trunk/neo/client/cache.py [iso-8859-1] Tue Apr 12 18:15:58 2011
@@ -1,13 +1,13 @@
 ##############################################################################
 #
-# Copyright (c) 2005 Nexedi SARL and Contributors. All Rights Reserved.
-#                    Yoshinori Okuji <yo at nexedi.com>
+# Copyright (c) 2011 Nexedi SARL and Contributors. All Rights Reserved.
+#                    Julien Muchembled <jm at nexedi.com>
 #
 # WARNING: This program as such is intended to be used by professional
-# programmers who take the whole responsability of assessing all potential
+# programmers who take the whole responsibility of assessing all potential
 # consequences resulting from its eventual inadequacies and bugs
 # End users who are looking for a ready-to-use solution with commercial
-# garantees and support are strongly adviced to contract a Free Software
+# guarantees and support are strongly advised to contract a Free Software
 # Service Company
 #
 # This program is Free Software; you can redistribute it and/or
@@ -17,140 +17,41 @@
 #
 # This program is distributed in the hope that it will be useful,
 # but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 # GNU General Public License for more details.
 #
 # You should have received a copy of the GNU General Public License
 # along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 #
 ##############################################################################
 
-"""
-Multi-Queue Cache Algorithm.
-"""
+import math
 
-from math import log
+class CacheItem(object):
 
-class MQIndex(object):
-    """
-    Virtual base class for MQ cache external indexes.
-    """
-    def clear(self):
-        """
-        Called when MQ is cleared.
-        """
-        raise NotImplementedError
-
-    def remove(self, key):
-        """
-        Called when key's value is removed from cache, and key is pushed to
-        history buffer.
-        """
-        raise NotImplementedError
-
-    def add(self, key):
-        """
-        Called when key is added into cache.
-        It is either a new key, or a know key comming back from history
-        buffer.
-        """
-        raise NotImplementedError
-
-class Element(object):
-    """
-      This class defines an element of a FIFO buffer.
-    """
-    pass
-
-class FIFO(object):
-    """
-      This class implements a FIFO buffer.
-    """
-
-    def __init__(self):
-        self.head = None
-        self.tail = None
-        self._len = 0
-        self.prev = None
-        self.data = None
-        self.next = None
-        self.level = None
-        self.counter = None
-        self.value = None
-        self.element = None
-        self.key = None
-        self.expire_time = None
-
-    def __len__(self):
-        return self._len
-
-    def append(self):
-        element = Element()
-        element.next = None
-        element.prev = self.tail
-        if self.tail is not None:
-            self.tail.next = element
-        self.tail = element
-        if self.head is None:
-            self.head = element
-        self._len += 1
-        return element
-
-    def shift(self):
-        element = self.head
-        if element is None:
-            return None
-        del self[element]
-        del element.next
-        del element.prev
-        return element
-
-    def __delitem__(self, element):
-        if element.next is None:
-            self.tail = element.prev
-        else:
-            element.next.prev = element.prev
-
-        if element.prev is None:
-            self.head = element.next
-        else:
-            element.prev.next = element.next
-
-        self._len -= 1
-
-class Data(object):
-    """
-      Data for each element in a FIFO buffer.
-    """
-    pass
-
-def sizeof(o):
-    """This function returns the estimated size of an object."""
-    if isinstance(o, tuple):
-        return sum((sizeof(s) for s in o))
-    elif o is None:
-        # XXX: unknown size (arch pointer ?)
-        return 0
-    else:
-        return len(o)+16
-
-class MQ(object):
-    """
-      This class manages cached data by a variant of Multi-Queue.
-
-      This class caches various sizes of objects. Here are some considerations:
-
-      - Expired objects are not really deleted immediately. But if GC is invoked too often,
-        it degrades the performance significantly.
-
-      - If large objects are cached, the number of cached objects decreases. This might affect
-        the cache hit ratio. It might be better to tweak a buffer level according to the size of
-        an object.
-
-      - Stored values must be strings.
+    __slots__ = ('oid', 'tid', 'next_tid', 'data',
+                 'counter', 'level', 'expire',
+                 'prev', 'next')
+
+    def __repr__(self):
+        s = ''
+        for attr in self.__slots__:
+            try:
+                value = getattr(self, attr)
+                if value:
+                    if attr in ('prev', 'next'):
+                        s += ' %s=<...>' % attr
+                        continue
+                    elif attr == 'data':
+                        value = '...'
+                s += ' %s=%r' % (attr, value)
+            except AttributeError:
+                pass
+        return '<%s%s>' % (self.__class__.__name__, s)
 
-      - The size calculation is not accurate.
+class ClientCache(object):
+    """In-memory pickle cache based on Multi-Queue cache algorithm
 
       Quick description of Multi-Queue algorithm:
       - There are multiple "regular" queues, plus a history queue
@@ -159,224 +60,179 @@ class MQ(object):
         longer lifespan)
         -> The more often an object is accessed, the higher lifespan it will
            have
-      - Upon a cache miss, the oldest entry of first non-empty queue is
-        transfered to history queue
       - Upon cache or history hit, object frequency is increased and object
         might get moved to longer-lived queue
       - Each access "ages" objects in cache, and an aging object is moved to
-        shorter-lived queue as it ages without being accessed
+        shorter-lived queue as it ages without being accessed, or in the
+        history queue if it's really too old.
     """
 
-    def __init__(self, life_time=10000, buffer_levels=9,
-            max_history_size=100000, max_size=20*1024*1024):
-        self._index_list = []
+    __slots__ = ('_life_time', '_max_history_size', '_max_size',
+                 '_queue_list', '_oid_dict', '_time', '_size', '_history_size')
+
+    def __init__(self, life_time=10000, max_history_size=100000,
+                                        max_size=20*1024*1024):
         self._life_time = life_time
-        self._buffer_levels = buffer_levels
         self._max_history_size = max_history_size
         self._max_size = max_size
         self.clear()
 
-    def addIndex(self, index, reindex=True):
-        """
-        Add an index ot this cache.
-        index
-            Object implementing methods from MQIndex class.
-        reindex (True)
-            If True, give all existing keys as new to index.
-        """
-        if reindex:
-            # Index existing entries
-            add = index.add
-            for key in self._data:
-                add(key)
-        self._index_list.append(index)
-
-    def _mapIndexes(self, method_id, args=(), kw={}):
-        for index in self._index_list:
-            getattr(index, method_id)(*args, **kw)
-
     def clear(self):
-        self._history_buffer = FIFO()
-        self._cache_buffers = []
-        for level in range(self._buffer_levels):
-            self._cache_buffers.append(FIFO())
-        self._data = {}
+        """Reset cache"""
+        self._queue_list = [None] # first is history
+        self._oid_dict = {}
         self._time = 0
         self._size = 0
-        self._mapIndexes('clear')
-
-    def has_key(self, key):
-        if key in self._data:
-            data = self._data[key]
-            if data.level >= 0:
-                return 1
-        return 0
-
-    __contains__ = has_key
-
-    def fetch(self, key):
-        """
-          Fetch a value associated with the key.
-        """
-        data = self._data[key]
-        if data.level >= 0:
-            value = data.value
-            self._size -= sizeof(value)
-            self.store(key, value)
-            return value
-        raise KeyError(key)
-
-    __getitem__ = fetch
-
-    def get(self, key, d=None):
-        try:
-            return self.fetch(key)
-        except KeyError:
-            return d
-
-    def _evict(self, key):
-        """
-          Evict an element to the history buffer.
-        """
-        self._mapIndexes('remove', (key, ))
-        data = self._data[key]
-        self._size -= sizeof(data.value)
-        del self._cache_buffers[data.level][data.element]
-        element = self._history_buffer.append()
-        data.level = -1
-        data.element = element
-        del data.value
-        del data.expire_time
-        element.data = data
-        if len(self._history_buffer) > self._max_history_size:
-            element = self._history_buffer.shift()
-            del self._data[element.data.key]
+        self._history_size = 0
 
-    def store(self, key, value):
-        cache_buffers = self._cache_buffers
+    def _iterQueue(self, level):
+        """for debugging purpose"""
+        if level < len(self._queue_list):
+            item = head = self._queue_list[level]
+            if item:
+                while 1:
+                    yield item
+                    item = item.next
+                    if item is head:
+                        break
 
+    def _add(self, item):
+        level = item.level
         try:
-            data = self._data[key]
-        except KeyError:
-            counter = 1
-            self._mapIndexes('add', (key, ))
+            head = self._queue_list[level]
+        except IndexError:
+            assert len(self._queue_list) == level
+            self._queue_list.append(item)
+            item.prev = item.next = item
         else:
-            level, element, counter = data.level, data.element, data.counter + 1
-            if level >= 0:
-                del cache_buffers[level][element]
+            if head:
+                item.prev = tail = head.prev
+                tail.next = head.prev = item
+                item.next = head
             else:
-                del self._history_buffer[element]
-                self._mapIndexes('add', (key, ))
-
+                self._queue_list[level] = item
+                item.prev = item.next = item
+        if level:
+            item.expire = self._time + self._life_time
+        else:
+            self._size -= len(item.data)
+            item.data = None
+            if self._history_size < self._max_history_size:
+                self._history_size += 1
+            else:
+                self._remove(head)
+                item_list = self._oid_dict[head.oid]
+                item_list.remove(head)
+                if not item_list:
+                    del self._oid_dict[head.oid]
+
+    def _remove(self, item):
+        level = item.level
+        if level is not None:
+            item.level = level - 1
+            next = item.next
+            if next is item:
+                self._queue_list[level] = next = None
+            else:
+                item.prev.next = next
+                next.prev = item.prev
+                if self._queue_list[level] is item:
+                    self._queue_list[level] = next
+            return next
+
+    def _fetched(self, item, _log=math.log):
+        self._remove(item)
+        item.counter = counter = item.counter + 1
         # XXX It might be better to adjust the level according to the object
         # size.
-        level = min(int(log(counter, 2)), self._buffer_levels - 1)
-        element = cache_buffers[level].append()
-        data = Data()
-        data.key = key
-        data.expire_time = self._time + self._life_time
-        data.level = level
-        data.element = element
-        data.value = value
-        data.counter = counter
-        element.data = data
-        self._data[key] = data
-        self._size += sizeof(value)
-        del value
-
-        self._time += 1
-
-        # Expire old elements.
-        time = self._time
-        for level, cache_buffer in enumerate(cache_buffers):
-            head = cache_buffer.head
-            if head is not None and head.data.expire_time < time:
-                del cache_buffer[head]
-                data = head.data
-                if level > 0:
-                    new_level = level - 1
-                    element = cache_buffers[new_level].append()
-                    element.data = data
-                    data.expire_time = time + self._life_time
-                    data.level = new_level
-                    data.element = element
-                else:
-                    self._evict(data.key)
+        item.level = 1 + int(_log(counter, 2))
+        self._add(item)
 
-        # Limit the size.
+        self._time = time = self._time + 1
+        for head in self._queue_list[1:]:
+            if head and head.expire < time:
+                self._remove(head)
+                self._add(head)
+                break
+
+    def _load(self, oid, before_tid=None):
+        item_list = self._oid_dict.get(oid)
+        if item_list:
+            if before_tid:
+                for item in reversed(item_list):
+                    if item.tid < before_tid:
+                        next_tid = item.next_tid
+                        if next_tid and next_tid < before_tid:
+                            break
+                        return item
+            else:
+                item = item_list[-1]
+                if not item.next_tid:
+                    return item
+
+    def load(self, oid, before_tid=None):
+        """Return a revision of oid that was current before given tid"""
+        item = self._load(oid, before_tid)
+        if item:
+            data = item.data
+            if data is not None:
+                self._fetched(item)
+                return data, item.tid, item.next_tid
+
+    def store(self, oid, data, tid, next_tid):
+        """Store a new data record in the cache"""
+        size = len(data)
         max_size = self._max_size
-        if self._size > max_size:
-            for cache_buffer in cache_buffers:
-                while self._size > max_size:
-                    element = cache_buffer.head
-                    if element is None:
-                        break
-                    self._evict(element.data.key)
-                if self._size <= max_size:
-                    break
-
-    __setitem__ = store
-
-    def invalidate(self, key):
-        if key in self._data:
-            data = self._data[key]
-            if data.level >= 0:
-                del self._cache_buffers[data.level][data.element]
-                self._evict(key)
-                return
-        raise KeyError, "%s was not found in the cache" % (key, )
-
-    __delitem__ = invalidate
-
-    def update(self, key, callback):
-        """
-        Update entry without changing its level.
-        """
-        data = self._data[key]
-        if data.level < 0:
-            raise KeyError(key)
-        data.value = callback(data.value)
-
-# Here is a test.
-if __name__ == '__main__':
-    import hotshot, hotshot.stats
-
-    def test():
-        cache = MQ(life_time=100, buffer_levels=9, max_history_size=10000,
-                   max_size=2*1024*1024)
-
-        cache[0] = 'foo'
-        assert cache[0] == 'foo', '0 should be present'
-        del cache[0]
-        assert cache.get(0) is None, '0 should not be present'
-
-        for i in xrange(10000):
-            assert cache.get(i) is None, '%d should not be present' % i
-
-        for i in xrange(10000):
-            cache[i] = str(i)
-            assert cache.get(i) == str(i), '%d does not exist' % i
-
-        for i in xrange(10000 - 100 - 1):
-            assert cache.get(i) is None, '%d should not be present' % i
-
-        for i in xrange(10):
-            cache[i] = str(i)
-
-        for _ in xrange(1000):
-            for i in xrange(10):
-                assert cache.get(i) == str(i), '%d does not exist' % i
-
-        for i in xrange(10, 500):
-            cache[i] = str(i)
-
-        for i in xrange(10):
-            assert cache.get(i) == str(i), '%d does not exist' % i
-
-    prof = hotshot.Profile("mq.prof")
-    prof.runcall(test)
-    prof.close()
-    stats = hotshot.stats.load("mq.prof")
-    stats.strip_dirs()
-    stats.sort_stats('time', 'calls')
-    stats.print_stats(20)
+        if size < max_size:
+            item = self._load(oid, next_tid)
+            if item:
+                assert not (item.data or item.level)
+                assert item.tid == tid and item.next_tid == next_tid
+                self._history_size -= 1
+            else:
+                item = CacheItem()
+                item.oid = oid
+                item.tid = tid
+                item.next_tid = next_tid
+                item.counter = 0
+                item.level = None
+                try:
+                    item_list = self._oid_dict[oid]
+                except KeyError:
+                    self._oid_dict[oid] = [item]
+                else:
+                    if next_tid:
+                        for i, x in enumerate(item_list):
+                            if tid < x.tid:
+                                break
+                        item_list.insert(i, item)
+                    else:
+                        if item_list:
+                            prev = item_list[-1]
+                            item.counter = prev.counter
+                            prev.counter = 0
+                            if prev.level > 1:
+                                self._fetched(prev)
+                        item_list.append(item)
+            item.data = data
+            self._fetched(item)
+            self._size += size
+            if max_size < self._size:
+                for head in self._queue_list[1:]:
+                    while head:
+                        next = self._remove(head)
+                        head.level = 0
+                        self._add(head)
+                        if self._size <= max_size:
+                            return
+                        head = next
+
+    def invalidate(self, oid, tid):
+        """Mark data record as being valid only up to given tid"""
+        try:
+            item = self._oid_dict[oid][-1]
+        except KeyError:
+            pass
+        else:
+            if item.next_tid is None:
+                item.next_tid = tid

Modified: trunk/neo/client/handlers/master.py
==============================================================================
--- trunk/neo/client/handlers/master.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/master.py [iso-8859-1] Tue Apr 12 18:15:58 2011
@@ -121,11 +121,12 @@ class PrimaryNotificationsHandler(BaseHa
         app = self.app
         app._cache_lock_acquire()
         try:
-            # ZODB required a dict with oid as key, so create it
-            app.cache_revision_index.invalidate(oid_list, tid)
+            invalidate = app._cache.invalidate
+            for oid in oid_list:
+                invalidate(oid, tid)
             db = app.getDB()
             if db is not None:
-                db.invalidate(tid, dict.fromkeys(oid_list, tid))
+                db.invalidate(tid, oid_list)
         finally:
             app._cache_lock_release()
 

Removed: trunk/neo/client/mq_index.py
==============================================================================
--- trunk/neo/client/mq_index.py [iso-8859-1] (original)
+++ trunk/neo/client/mq_index.py (removed)
@@ -1,152 +0,0 @@
-#
-# Copyright (C) 2010-2011  Nexedi SA
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-from neo.client.mq import MQIndex
-
-class RevisionIndex(MQIndex):
-    """
-    This cache index allows accessing a specific revision of a cached object.
-    It requires cache key to be a 2-tuple, composed of oid and revision.
-
-    Note: it is expected that rather few revisions are held in cache, with few
-    lookups for old revisions, so they are held in a simple sorted list
-    Note2: all methods here must be called with cache lock acquired.
-    """
-    def __init__(self):
-        # key: oid
-        # value: tid list, from highest to lowest
-        self._oid_dict = {}
-        # key: oid
-        # value: tid list, from lowest to highest
-        self._invalidated = {}
-
-    def clear(self):
-        self._oid_dict.clear()
-        self._invalidated.clear()
-
-    def remove(self, key):
-        oid_dict = self._oid_dict
-        oid, tid = key
-        tid_list = oid_dict[oid]
-        tid_list.remove(tid)
-        if not tid_list:
-            # No more serial known for this object, drop entirely
-            del oid_dict[oid]
-            self._invalidated.pop(oid, None)
-
-    def add(self, key):
-        oid_dict = self._oid_dict
-        oid, tid = key
-        try:
-            serial_list = oid_dict[oid]
-        except KeyError:
-            serial_list = oid_dict[oid] = []
-        else:
-            assert tid not in serial_list
-        if not(serial_list) or tid > serial_list[0]:
-            serial_list.insert(0, tid)
-        else:
-            serial_list.insert(0, tid)
-            serial_list.sort(reverse=True)
-        invalidated = self._invalidated
-        try:
-            tid_list = invalidated[oid]
-        except KeyError:
-            pass
-        else:
-            try:
-                tid_list.remove(tid)
-            except ValueError:
-                pass
-            else:
-                if not tid_list:
-                    del invalidated[oid]
-
-    def invalidate(self, oid_list, tid):
-        """
-        Mark object invalidated by given transaction.
-        Must be called with increasing TID values (which is standard for
-        ZODB).
-        """
-        invalidated = self._invalidated
-        oid_dict = self._oid_dict
-        for oid in (x for x in oid_list if x in oid_dict):
-            try:
-                tid_list = invalidated[oid]
-            except KeyError:
-                tid_list = invalidated[oid] = []
-            assert not tid_list or tid > tid_list[-1], (dump(oid), dump(tid),
-                dump(tid_list[-1]))
-            tid_list.append(tid)
-
-    def getSerialBefore(self, oid, tid):
-        """
-        Get the first tid in cache which value is lower that given tid.
-        """
-        # WARNING: return-intensive to save on indentation
-        oid_list = self._oid_dict.get(oid)
-        if oid_list is None:
-            # Unknown oid
-            return None
-        for result in oid_list:
-            if result < tid:
-                # Candidate found
-                break
-        else:
-            # No candidate in cache.
-            return None
-        # Check if there is a chance that an intermediate revision would
-        # exist, while missing from cache.
-        try:
-            inv_tid_list = self._invalidated[oid]
-        except KeyError:
-            return result
-        # Remember: inv_tid_list is sorted in ascending order.
-        for inv_tid in inv_tid_list:
-            if tid < inv_tid:
-                # We don't care about invalidations past requested TID.
-                break
-            elif result < inv_tid < tid:
-                # An invalidation was received between candidate revision,
-                # and before requested TID: there is a matching revision we
-                # don't know of, so we cannot answer.
-                return None
-        return result
-
-    def getLatestSerial(self, oid):
-        """
-        Get the latest tid for given object.
-        """
-        result = self._oid_dict.get(oid)
-        if result is not None:
-            result = result[0]
-            try:
-                tid_list = self._invalidated[oid]
-            except KeyError:
-                pass
-            else:
-                if result < tid_list[-1]:
-                    # An invalidation happened from a transaction later than
-                    # our most recent view of this object, so we cannot answer.
-                    result = None
-        return result
-
-    def getSerialList(self, oid):
-        """
-        Get the list of all serials cache knows about for given object.
-        """
-        return self._oid_dict.get(oid, [])[:]
-

Modified: trunk/neo/storage/database/btree.py
==============================================================================
--- trunk/neo/storage/database/btree.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/btree.py [iso-8859-1] Tue Apr 12 18:15:58 2011
@@ -275,37 +275,22 @@ class BTreeDatabaseManager(DatabaseManag
 
     def _getObject(self, oid, tid=None, before_tid=None):
         tserial = self._obj.get(oid)
-        if tserial is None:
-            result = None
-        else:
+        if tserial is not None:
             if tid is None:
-                if before_tid is None:
-                    try:
+                try:
+                    if before_tid is None:
                         tid = tserial.maxKey()
-                    except ValueError:
-                        tid = None
-                else:
-                    before_tid -= 1
-                    try:
-                        tid = tserial.maxKey(before_tid)
-                    except ValueError:
-                        tid = None
-            if tid is None:
-                result = None
-            else:
-                result = tserial.get(tid, None)
+                    else:
+                        tid = tserial.maxKey(before_tid - 1)
+                except ValueError:
+                    return
+            result = tserial.get(tid)
             if result:
-                compression, checksum, data, value_serial = result
-                if before_tid is None:
+                try:
+                    next_serial = tserial.minKey(tid + 1)
+                except ValueError:
                     next_serial = None
-                else:
-                    try:
-                        next_serial = tserial.minKey(tid + 1)
-                    except ValueError:
-                        next_serial = None
-                result = (tid, next_serial, compression, checksum, data,
-                    value_serial)
-        return result
+                return (tid, next_serial) + result
 
     def doSetPartitionTable(self, ptid, cell_list, reset):
         pt = self._pt

Modified: trunk/neo/storage/database/manager.py
==============================================================================
--- trunk/neo/storage/database/manager.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/manager.py [iso-8859-1] Tue Apr 12 18:15:58 2011
@@ -255,6 +255,8 @@ class DatabaseManager(object):
         else:
             serial, next_serial, compression, checksum, data, data_serial = \
                 result
+            assert before_tid is None or next_serial is None or \
+                   before_tid <= next_serial
             if data is None and resolve_data:
                 try:
                     _, compression, checksum, data = self._getObjectData(oid,

Modified: trunk/neo/storage/database/mysqldb.py
==============================================================================
--- trunk/neo/storage/database/mysqldb.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/mysqldb.py [iso-8859-1] Tue Apr 12 18:15:58 2011
@@ -335,50 +335,30 @@ class MySQLDatabaseManager(DatabaseManag
     def _getObject(self, oid, tid=None, before_tid=None):
         q = self.query
         partition = self._getPartition(oid)
+        sql = """SELECT serial, compression, checksum, value, value_serial
+                    FROM obj
+                    WHERE partition = %d
+                    AND oid = %d""" % (partition, oid)
         if tid is not None:
-            r = q("""SELECT serial, compression, checksum, value, value_serial
-                        FROM obj
-                        WHERE partition = %d AND oid = %d AND serial = %d""" \
-                    % (partition, oid, tid))
-            try:
-                serial, compression, checksum, data, value_serial = r[0]
-                next_serial = None
-            except IndexError:
-                return None
+            sql += ' AND serial = %d' % tid
         elif before_tid is not None:
-            r = q("""SELECT serial, compression, checksum, value, value_serial
-                        FROM obj
-                        WHERE partition = %d
-                        AND oid = %d AND serial < %d
-                        ORDER BY serial DESC LIMIT 1""" \
-                    % (partition, oid, before_tid))
-            try:
-                serial, compression, checksum, data, value_serial = r[0]
-            except IndexError:
-                return None
-            r = q("""SELECT serial FROM obj_short
-                        WHERE partition = %d
-                        AND oid = %d AND serial >= %d
-                        ORDER BY serial LIMIT 1""" \
-                    % (partition, oid, before_tid))
-            try:
-                next_serial = r[0][0]
-            except IndexError:
-                next_serial = None
+            sql += ' AND serial < %d ORDER BY serial DESC LIMIT 1' % before_tid
         else:
             # XXX I want to express "HAVING serial = MAX(serial)", but
             # MySQL does not use an index for a HAVING clause!
-            r = q("""SELECT serial, compression, checksum, value, value_serial
-                        FROM obj
-                        WHERE partition = %d AND oid = %d
-                        ORDER BY serial DESC LIMIT 1""" \
-                    % (partition, oid))
-            try:
-                serial, compression, checksum, data, value_serial = r[0]
-                next_serial = None
-            except IndexError:
-                return None
-
+            sql += ' ORDER BY serial DESC LIMIT 1'
+        r = q(sql)
+        try:
+            serial, compression, checksum, data, value_serial = r[0]
+        except IndexError:
+            return None
+        r = q("""SELECT serial FROM obj_short
+                    WHERE partition = %d AND oid = %d AND serial > %d
+                    ORDER BY serial LIMIT 1""" % (partition, oid, serial))
+        try:
+            next_serial = r[0][0]
+        except IndexError:
+            next_serial = None
         return serial, next_serial, compression, checksum, data, value_serial
 
     def doSetPartitionTable(self, ptid, cell_list, reset):




More information about the Neo-report mailing list