[Neo-report] r2711 jm - in /trunk/neo: client/ client/handlers/ storage/database/
nobody at svn.erp5.org
nobody at svn.erp5.org
Tue Apr 12 18:15:59 CEST 2011
Author: jm
Date: Tue Apr 12 18:15:58 2011
New Revision: 2711
Log:
Rewrite storage cache of client
- Stop using a list of (in)validated tid (hence removal of RevisionIndex),
because it can't work in all cases and would even cause memory leaks.
For example, this bug could lead to ConflictError with a single client.
Fixit it also requires that database backends always return the next serial.
- Several performance improvements. The most important one is when the latest
version of an object is cached: it inherits the access counter of the
previous one (for the same oid), which gets in turn its counter reset.
- Do not waste CPU evaluating the real size taken by an entry in memory.
Just use 'len' on the value (which is always a pickle data, i.e. a string).
Removed:
trunk/neo/client/mq_index.py
Modified:
trunk/neo/client/app.py
trunk/neo/client/cache.py
trunk/neo/client/handlers/master.py
trunk/neo/storage/database/btree.py
trunk/neo/storage/database/manager.py
trunk/neo/storage/database/mysqldb.py
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Tue Apr 12 18:15:58 2011
@@ -42,12 +42,11 @@ from neo.client.handlers import storage,
from neo.lib.dispatcher import Dispatcher, ForgottenPacket
from neo.client.poll import ThreadedPoll, psThreadedPoll
from neo.client.iterator import Iterator
-from neo.client.mq import MQ
+from neo.client.cache import ClientCache
from neo.client.pool import ConnectionPool
from neo.lib.util import u64, parseMasterList
from neo.lib.profiling import profiler_decorator, PROFILING_ENABLED
from neo.lib.debug import register as registerLiveDebugger
-from neo.client.mq_index import RevisionIndex
from neo.client.container import ThreadContainer, TransactionContainer
if PROFILING_ENABLED:
@@ -93,9 +92,7 @@ class Application(object):
# no self-assigned UUID, primary master will supply us one
self.uuid = None
- self.mq_cache = MQ()
- self.cache_revision_index = RevisionIndex()
- self.mq_cache.addIndex(self.cache_revision_index)
+ self._cache = ClientCache()
self.new_oid_list = []
self.last_oid = '\0' * 8
self.storage_event_handler = storage.StorageEventHandler(self)
@@ -432,20 +429,17 @@ class Application(object):
self._load_lock_acquire()
try:
- try:
- return self._loadFromCache(oid, serial, tid)
- except KeyError:
- pass
- data, start_serial, end_serial = self._loadFromStorage(oid, serial,
- tid)
- self._cache_lock_acquire()
- try:
- self.mq_cache[(oid, start_serial)] = data, end_serial
- finally:
- self._cache_lock_release()
- if data == '':
- raise NEOStorageCreationUndoneError(dump(oid))
- return data, start_serial, end_serial
+ result = self._loadFromCache(oid, serial, tid)
+ if not result:
+ result = self._loadFromStorage(oid, serial, tid)
+ self._cache_lock_acquire()
+ try:
+ self._cache.store(oid, *result)
+ finally:
+ self._cache_lock_release()
+ if result[0] == '':
+ raise NEOStorageCreationUndoneError(dump(oid))
+ return result
finally:
self._load_lock_release()
@@ -479,24 +473,17 @@ class Application(object):
return data, tid, next_tid
@profiler_decorator
- def _loadFromCache(self, oid, at_tid, before_tid):
+ def _loadFromCache(self, oid, at_tid=None, before_tid=None):
"""
- Load from local cache, raising KeyError if not found.
+ Load from local cache, return None if not found.
"""
self._cache_lock_acquire()
try:
- if at_tid is not None:
- tid = at_tid
- elif before_tid is not None:
- tid = self.cache_revision_index.getSerialBefore(oid,
- before_tid)
- else:
- tid = self.cache_revision_index.getLatestSerial(oid)
- if tid is None:
- raise KeyError
- # Raises KeyError on miss
- data, next_tid = self.mq_cache[(oid, tid)]
- return (data, tid, next_tid)
+ if at_tid:
+ result = self._cache.load(oid, at_tid + '*')
+ assert not result or result[1] == at_tid
+ return result
+ return self._cache.load(oid, before_tid)
finally:
self._cache_lock_release()
@@ -808,14 +795,7 @@ class Application(object):
# Update cache
self._cache_lock_acquire()
try:
- mq_cache = self.mq_cache
- update = mq_cache.update
- def updateNextSerial(value):
- data, next_tid = value
- assert next_tid is None, (dump(oid), dump(base_tid),
- dump(next_tid))
- return (data, tid)
- get_baseTID = txn_context['object_base_serial_dict'].get
+ cache = self._cache
for oid, data in txn_context['data_dict'].iteritems():
if data is None:
# this is just a remain of
@@ -823,16 +803,10 @@ class Application(object):
# was modified).
continue
# Update ex-latest value in cache
- base_tid = get_baseTID(oid)
- try:
- update((oid, base_tid), updateNextSerial)
- except KeyError:
- pass
- if data == '':
- self.cache_revision_index.invalidate([oid], tid)
- else:
+ cache.invalidate(oid, tid)
+ if data:
# Store in cache with no next_tid
- mq_cache[(oid, tid)] = (data, None)
+ cache.store(oid, data, tid, None)
finally:
self._cache_lock_release()
txn_container.delete(transaction)
@@ -1105,7 +1079,7 @@ class Application(object):
# by a pack), so don't bother invalidating on other clients.
self._cache_lock_acquire()
try:
- self.mq_cache.clear()
+ self._cache.clear()
finally:
self._cache_lock_release()
Modified: trunk/neo/client/cache.py
==============================================================================
--- trunk/neo/client/cache.py [iso-8859-1] (original)
+++ trunk/neo/client/cache.py [iso-8859-1] Tue Apr 12 18:15:58 2011
@@ -1,13 +1,13 @@
##############################################################################
#
-# Copyright (c) 2005 Nexedi SARL and Contributors. All Rights Reserved.
-# Yoshinori Okuji <yo at nexedi.com>
+# Copyright (c) 2011 Nexedi SARL and Contributors. All Rights Reserved.
+# Julien Muchembled <jm at nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
-# programmers who take the whole responsability of assessing all potential
+# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
-# garantees and support are strongly adviced to contract a Free Software
+# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
@@ -17,140 +17,41 @@
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
-"""
-Multi-Queue Cache Algorithm.
-"""
+import math
-from math import log
+class CacheItem(object):
-class MQIndex(object):
- """
- Virtual base class for MQ cache external indexes.
- """
- def clear(self):
- """
- Called when MQ is cleared.
- """
- raise NotImplementedError
-
- def remove(self, key):
- """
- Called when key's value is removed from cache, and key is pushed to
- history buffer.
- """
- raise NotImplementedError
-
- def add(self, key):
- """
- Called when key is added into cache.
- It is either a new key, or a know key comming back from history
- buffer.
- """
- raise NotImplementedError
-
-class Element(object):
- """
- This class defines an element of a FIFO buffer.
- """
- pass
-
-class FIFO(object):
- """
- This class implements a FIFO buffer.
- """
-
- def __init__(self):
- self.head = None
- self.tail = None
- self._len = 0
- self.prev = None
- self.data = None
- self.next = None
- self.level = None
- self.counter = None
- self.value = None
- self.element = None
- self.key = None
- self.expire_time = None
-
- def __len__(self):
- return self._len
-
- def append(self):
- element = Element()
- element.next = None
- element.prev = self.tail
- if self.tail is not None:
- self.tail.next = element
- self.tail = element
- if self.head is None:
- self.head = element
- self._len += 1
- return element
-
- def shift(self):
- element = self.head
- if element is None:
- return None
- del self[element]
- del element.next
- del element.prev
- return element
-
- def __delitem__(self, element):
- if element.next is None:
- self.tail = element.prev
- else:
- element.next.prev = element.prev
-
- if element.prev is None:
- self.head = element.next
- else:
- element.prev.next = element.next
-
- self._len -= 1
-
-class Data(object):
- """
- Data for each element in a FIFO buffer.
- """
- pass
-
-def sizeof(o):
- """This function returns the estimated size of an object."""
- if isinstance(o, tuple):
- return sum((sizeof(s) for s in o))
- elif o is None:
- # XXX: unknown size (arch pointer ?)
- return 0
- else:
- return len(o)+16
-
-class MQ(object):
- """
- This class manages cached data by a variant of Multi-Queue.
-
- This class caches various sizes of objects. Here are some considerations:
-
- - Expired objects are not really deleted immediately. But if GC is invoked too often,
- it degrades the performance significantly.
-
- - If large objects are cached, the number of cached objects decreases. This might affect
- the cache hit ratio. It might be better to tweak a buffer level according to the size of
- an object.
-
- - Stored values must be strings.
+ __slots__ = ('oid', 'tid', 'next_tid', 'data',
+ 'counter', 'level', 'expire',
+ 'prev', 'next')
+
+ def __repr__(self):
+ s = ''
+ for attr in self.__slots__:
+ try:
+ value = getattr(self, attr)
+ if value:
+ if attr in ('prev', 'next'):
+ s += ' %s=<...>' % attr
+ continue
+ elif attr == 'data':
+ value = '...'
+ s += ' %s=%r' % (attr, value)
+ except AttributeError:
+ pass
+ return '<%s%s>' % (self.__class__.__name__, s)
- - The size calculation is not accurate.
+class ClientCache(object):
+ """In-memory pickle cache based on Multi-Queue cache algorithm
Quick description of Multi-Queue algorithm:
- There are multiple "regular" queues, plus a history queue
@@ -159,224 +60,179 @@ class MQ(object):
longer lifespan)
-> The more often an object is accessed, the higher lifespan it will
have
- - Upon a cache miss, the oldest entry of first non-empty queue is
- transfered to history queue
- Upon cache or history hit, object frequency is increased and object
might get moved to longer-lived queue
- Each access "ages" objects in cache, and an aging object is moved to
- shorter-lived queue as it ages without being accessed
+ shorter-lived queue as it ages without being accessed, or in the
+ history queue if it's really too old.
"""
- def __init__(self, life_time=10000, buffer_levels=9,
- max_history_size=100000, max_size=20*1024*1024):
- self._index_list = []
+ __slots__ = ('_life_time', '_max_history_size', '_max_size',
+ '_queue_list', '_oid_dict', '_time', '_size', '_history_size')
+
+ def __init__(self, life_time=10000, max_history_size=100000,
+ max_size=20*1024*1024):
self._life_time = life_time
- self._buffer_levels = buffer_levels
self._max_history_size = max_history_size
self._max_size = max_size
self.clear()
- def addIndex(self, index, reindex=True):
- """
- Add an index ot this cache.
- index
- Object implementing methods from MQIndex class.
- reindex (True)
- If True, give all existing keys as new to index.
- """
- if reindex:
- # Index existing entries
- add = index.add
- for key in self._data:
- add(key)
- self._index_list.append(index)
-
- def _mapIndexes(self, method_id, args=(), kw={}):
- for index in self._index_list:
- getattr(index, method_id)(*args, **kw)
-
def clear(self):
- self._history_buffer = FIFO()
- self._cache_buffers = []
- for level in range(self._buffer_levels):
- self._cache_buffers.append(FIFO())
- self._data = {}
+ """Reset cache"""
+ self._queue_list = [None] # first is history
+ self._oid_dict = {}
self._time = 0
self._size = 0
- self._mapIndexes('clear')
-
- def has_key(self, key):
- if key in self._data:
- data = self._data[key]
- if data.level >= 0:
- return 1
- return 0
-
- __contains__ = has_key
-
- def fetch(self, key):
- """
- Fetch a value associated with the key.
- """
- data = self._data[key]
- if data.level >= 0:
- value = data.value
- self._size -= sizeof(value)
- self.store(key, value)
- return value
- raise KeyError(key)
-
- __getitem__ = fetch
-
- def get(self, key, d=None):
- try:
- return self.fetch(key)
- except KeyError:
- return d
-
- def _evict(self, key):
- """
- Evict an element to the history buffer.
- """
- self._mapIndexes('remove', (key, ))
- data = self._data[key]
- self._size -= sizeof(data.value)
- del self._cache_buffers[data.level][data.element]
- element = self._history_buffer.append()
- data.level = -1
- data.element = element
- del data.value
- del data.expire_time
- element.data = data
- if len(self._history_buffer) > self._max_history_size:
- element = self._history_buffer.shift()
- del self._data[element.data.key]
+ self._history_size = 0
- def store(self, key, value):
- cache_buffers = self._cache_buffers
+ def _iterQueue(self, level):
+ """for debugging purpose"""
+ if level < len(self._queue_list):
+ item = head = self._queue_list[level]
+ if item:
+ while 1:
+ yield item
+ item = item.next
+ if item is head:
+ break
+ def _add(self, item):
+ level = item.level
try:
- data = self._data[key]
- except KeyError:
- counter = 1
- self._mapIndexes('add', (key, ))
+ head = self._queue_list[level]
+ except IndexError:
+ assert len(self._queue_list) == level
+ self._queue_list.append(item)
+ item.prev = item.next = item
else:
- level, element, counter = data.level, data.element, data.counter + 1
- if level >= 0:
- del cache_buffers[level][element]
+ if head:
+ item.prev = tail = head.prev
+ tail.next = head.prev = item
+ item.next = head
else:
- del self._history_buffer[element]
- self._mapIndexes('add', (key, ))
-
+ self._queue_list[level] = item
+ item.prev = item.next = item
+ if level:
+ item.expire = self._time + self._life_time
+ else:
+ self._size -= len(item.data)
+ item.data = None
+ if self._history_size < self._max_history_size:
+ self._history_size += 1
+ else:
+ self._remove(head)
+ item_list = self._oid_dict[head.oid]
+ item_list.remove(head)
+ if not item_list:
+ del self._oid_dict[head.oid]
+
+ def _remove(self, item):
+ level = item.level
+ if level is not None:
+ item.level = level - 1
+ next = item.next
+ if next is item:
+ self._queue_list[level] = next = None
+ else:
+ item.prev.next = next
+ next.prev = item.prev
+ if self._queue_list[level] is item:
+ self._queue_list[level] = next
+ return next
+
+ def _fetched(self, item, _log=math.log):
+ self._remove(item)
+ item.counter = counter = item.counter + 1
# XXX It might be better to adjust the level according to the object
# size.
- level = min(int(log(counter, 2)), self._buffer_levels - 1)
- element = cache_buffers[level].append()
- data = Data()
- data.key = key
- data.expire_time = self._time + self._life_time
- data.level = level
- data.element = element
- data.value = value
- data.counter = counter
- element.data = data
- self._data[key] = data
- self._size += sizeof(value)
- del value
-
- self._time += 1
-
- # Expire old elements.
- time = self._time
- for level, cache_buffer in enumerate(cache_buffers):
- head = cache_buffer.head
- if head is not None and head.data.expire_time < time:
- del cache_buffer[head]
- data = head.data
- if level > 0:
- new_level = level - 1
- element = cache_buffers[new_level].append()
- element.data = data
- data.expire_time = time + self._life_time
- data.level = new_level
- data.element = element
- else:
- self._evict(data.key)
+ item.level = 1 + int(_log(counter, 2))
+ self._add(item)
- # Limit the size.
+ self._time = time = self._time + 1
+ for head in self._queue_list[1:]:
+ if head and head.expire < time:
+ self._remove(head)
+ self._add(head)
+ break
+
+ def _load(self, oid, before_tid=None):
+ item_list = self._oid_dict.get(oid)
+ if item_list:
+ if before_tid:
+ for item in reversed(item_list):
+ if item.tid < before_tid:
+ next_tid = item.next_tid
+ if next_tid and next_tid < before_tid:
+ break
+ return item
+ else:
+ item = item_list[-1]
+ if not item.next_tid:
+ return item
+
+ def load(self, oid, before_tid=None):
+ """Return a revision of oid that was current before given tid"""
+ item = self._load(oid, before_tid)
+ if item:
+ data = item.data
+ if data is not None:
+ self._fetched(item)
+ return data, item.tid, item.next_tid
+
+ def store(self, oid, data, tid, next_tid):
+ """Store a new data record in the cache"""
+ size = len(data)
max_size = self._max_size
- if self._size > max_size:
- for cache_buffer in cache_buffers:
- while self._size > max_size:
- element = cache_buffer.head
- if element is None:
- break
- self._evict(element.data.key)
- if self._size <= max_size:
- break
-
- __setitem__ = store
-
- def invalidate(self, key):
- if key in self._data:
- data = self._data[key]
- if data.level >= 0:
- del self._cache_buffers[data.level][data.element]
- self._evict(key)
- return
- raise KeyError, "%s was not found in the cache" % (key, )
-
- __delitem__ = invalidate
-
- def update(self, key, callback):
- """
- Update entry without changing its level.
- """
- data = self._data[key]
- if data.level < 0:
- raise KeyError(key)
- data.value = callback(data.value)
-
-# Here is a test.
-if __name__ == '__main__':
- import hotshot, hotshot.stats
-
- def test():
- cache = MQ(life_time=100, buffer_levels=9, max_history_size=10000,
- max_size=2*1024*1024)
-
- cache[0] = 'foo'
- assert cache[0] == 'foo', '0 should be present'
- del cache[0]
- assert cache.get(0) is None, '0 should not be present'
-
- for i in xrange(10000):
- assert cache.get(i) is None, '%d should not be present' % i
-
- for i in xrange(10000):
- cache[i] = str(i)
- assert cache.get(i) == str(i), '%d does not exist' % i
-
- for i in xrange(10000 - 100 - 1):
- assert cache.get(i) is None, '%d should not be present' % i
-
- for i in xrange(10):
- cache[i] = str(i)
-
- for _ in xrange(1000):
- for i in xrange(10):
- assert cache.get(i) == str(i), '%d does not exist' % i
-
- for i in xrange(10, 500):
- cache[i] = str(i)
-
- for i in xrange(10):
- assert cache.get(i) == str(i), '%d does not exist' % i
-
- prof = hotshot.Profile("mq.prof")
- prof.runcall(test)
- prof.close()
- stats = hotshot.stats.load("mq.prof")
- stats.strip_dirs()
- stats.sort_stats('time', 'calls')
- stats.print_stats(20)
+ if size < max_size:
+ item = self._load(oid, next_tid)
+ if item:
+ assert not (item.data or item.level)
+ assert item.tid == tid and item.next_tid == next_tid
+ self._history_size -= 1
+ else:
+ item = CacheItem()
+ item.oid = oid
+ item.tid = tid
+ item.next_tid = next_tid
+ item.counter = 0
+ item.level = None
+ try:
+ item_list = self._oid_dict[oid]
+ except KeyError:
+ self._oid_dict[oid] = [item]
+ else:
+ if next_tid:
+ for i, x in enumerate(item_list):
+ if tid < x.tid:
+ break
+ item_list.insert(i, item)
+ else:
+ if item_list:
+ prev = item_list[-1]
+ item.counter = prev.counter
+ prev.counter = 0
+ if prev.level > 1:
+ self._fetched(prev)
+ item_list.append(item)
+ item.data = data
+ self._fetched(item)
+ self._size += size
+ if max_size < self._size:
+ for head in self._queue_list[1:]:
+ while head:
+ next = self._remove(head)
+ head.level = 0
+ self._add(head)
+ if self._size <= max_size:
+ return
+ head = next
+
+ def invalidate(self, oid, tid):
+ """Mark data record as being valid only up to given tid"""
+ try:
+ item = self._oid_dict[oid][-1]
+ except KeyError:
+ pass
+ else:
+ if item.next_tid is None:
+ item.next_tid = tid
Modified: trunk/neo/client/handlers/master.py
==============================================================================
--- trunk/neo/client/handlers/master.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/master.py [iso-8859-1] Tue Apr 12 18:15:58 2011
@@ -121,11 +121,12 @@ class PrimaryNotificationsHandler(BaseHa
app = self.app
app._cache_lock_acquire()
try:
- # ZODB required a dict with oid as key, so create it
- app.cache_revision_index.invalidate(oid_list, tid)
+ invalidate = app._cache.invalidate
+ for oid in oid_list:
+ invalidate(oid, tid)
db = app.getDB()
if db is not None:
- db.invalidate(tid, dict.fromkeys(oid_list, tid))
+ db.invalidate(tid, oid_list)
finally:
app._cache_lock_release()
Removed: trunk/neo/client/mq_index.py
==============================================================================
--- trunk/neo/client/mq_index.py [iso-8859-1] (original)
+++ trunk/neo/client/mq_index.py (removed)
@@ -1,152 +0,0 @@
-#
-# Copyright (C) 2010-2011 Nexedi SA
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-from neo.client.mq import MQIndex
-
-class RevisionIndex(MQIndex):
- """
- This cache index allows accessing a specific revision of a cached object.
- It requires cache key to be a 2-tuple, composed of oid and revision.
-
- Note: it is expected that rather few revisions are held in cache, with few
- lookups for old revisions, so they are held in a simple sorted list
- Note2: all methods here must be called with cache lock acquired.
- """
- def __init__(self):
- # key: oid
- # value: tid list, from highest to lowest
- self._oid_dict = {}
- # key: oid
- # value: tid list, from lowest to highest
- self._invalidated = {}
-
- def clear(self):
- self._oid_dict.clear()
- self._invalidated.clear()
-
- def remove(self, key):
- oid_dict = self._oid_dict
- oid, tid = key
- tid_list = oid_dict[oid]
- tid_list.remove(tid)
- if not tid_list:
- # No more serial known for this object, drop entirely
- del oid_dict[oid]
- self._invalidated.pop(oid, None)
-
- def add(self, key):
- oid_dict = self._oid_dict
- oid, tid = key
- try:
- serial_list = oid_dict[oid]
- except KeyError:
- serial_list = oid_dict[oid] = []
- else:
- assert tid not in serial_list
- if not(serial_list) or tid > serial_list[0]:
- serial_list.insert(0, tid)
- else:
- serial_list.insert(0, tid)
- serial_list.sort(reverse=True)
- invalidated = self._invalidated
- try:
- tid_list = invalidated[oid]
- except KeyError:
- pass
- else:
- try:
- tid_list.remove(tid)
- except ValueError:
- pass
- else:
- if not tid_list:
- del invalidated[oid]
-
- def invalidate(self, oid_list, tid):
- """
- Mark object invalidated by given transaction.
- Must be called with increasing TID values (which is standard for
- ZODB).
- """
- invalidated = self._invalidated
- oid_dict = self._oid_dict
- for oid in (x for x in oid_list if x in oid_dict):
- try:
- tid_list = invalidated[oid]
- except KeyError:
- tid_list = invalidated[oid] = []
- assert not tid_list or tid > tid_list[-1], (dump(oid), dump(tid),
- dump(tid_list[-1]))
- tid_list.append(tid)
-
- def getSerialBefore(self, oid, tid):
- """
- Get the first tid in cache which value is lower that given tid.
- """
- # WARNING: return-intensive to save on indentation
- oid_list = self._oid_dict.get(oid)
- if oid_list is None:
- # Unknown oid
- return None
- for result in oid_list:
- if result < tid:
- # Candidate found
- break
- else:
- # No candidate in cache.
- return None
- # Check if there is a chance that an intermediate revision would
- # exist, while missing from cache.
- try:
- inv_tid_list = self._invalidated[oid]
- except KeyError:
- return result
- # Remember: inv_tid_list is sorted in ascending order.
- for inv_tid in inv_tid_list:
- if tid < inv_tid:
- # We don't care about invalidations past requested TID.
- break
- elif result < inv_tid < tid:
- # An invalidation was received between candidate revision,
- # and before requested TID: there is a matching revision we
- # don't know of, so we cannot answer.
- return None
- return result
-
- def getLatestSerial(self, oid):
- """
- Get the latest tid for given object.
- """
- result = self._oid_dict.get(oid)
- if result is not None:
- result = result[0]
- try:
- tid_list = self._invalidated[oid]
- except KeyError:
- pass
- else:
- if result < tid_list[-1]:
- # An invalidation happened from a transaction later than
- # our most recent view of this object, so we cannot answer.
- result = None
- return result
-
- def getSerialList(self, oid):
- """
- Get the list of all serials cache knows about for given object.
- """
- return self._oid_dict.get(oid, [])[:]
-
Modified: trunk/neo/storage/database/btree.py
==============================================================================
--- trunk/neo/storage/database/btree.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/btree.py [iso-8859-1] Tue Apr 12 18:15:58 2011
@@ -275,37 +275,22 @@ class BTreeDatabaseManager(DatabaseManag
def _getObject(self, oid, tid=None, before_tid=None):
tserial = self._obj.get(oid)
- if tserial is None:
- result = None
- else:
+ if tserial is not None:
if tid is None:
- if before_tid is None:
- try:
+ try:
+ if before_tid is None:
tid = tserial.maxKey()
- except ValueError:
- tid = None
- else:
- before_tid -= 1
- try:
- tid = tserial.maxKey(before_tid)
- except ValueError:
- tid = None
- if tid is None:
- result = None
- else:
- result = tserial.get(tid, None)
+ else:
+ tid = tserial.maxKey(before_tid - 1)
+ except ValueError:
+ return
+ result = tserial.get(tid)
if result:
- compression, checksum, data, value_serial = result
- if before_tid is None:
+ try:
+ next_serial = tserial.minKey(tid + 1)
+ except ValueError:
next_serial = None
- else:
- try:
- next_serial = tserial.minKey(tid + 1)
- except ValueError:
- next_serial = None
- result = (tid, next_serial, compression, checksum, data,
- value_serial)
- return result
+ return (tid, next_serial) + result
def doSetPartitionTable(self, ptid, cell_list, reset):
pt = self._pt
Modified: trunk/neo/storage/database/manager.py
==============================================================================
--- trunk/neo/storage/database/manager.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/manager.py [iso-8859-1] Tue Apr 12 18:15:58 2011
@@ -255,6 +255,8 @@ class DatabaseManager(object):
else:
serial, next_serial, compression, checksum, data, data_serial = \
result
+ assert before_tid is None or next_serial is None or \
+ before_tid <= next_serial
if data is None and resolve_data:
try:
_, compression, checksum, data = self._getObjectData(oid,
Modified: trunk/neo/storage/database/mysqldb.py
==============================================================================
--- trunk/neo/storage/database/mysqldb.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/mysqldb.py [iso-8859-1] Tue Apr 12 18:15:58 2011
@@ -335,50 +335,30 @@ class MySQLDatabaseManager(DatabaseManag
def _getObject(self, oid, tid=None, before_tid=None):
q = self.query
partition = self._getPartition(oid)
+ sql = """SELECT serial, compression, checksum, value, value_serial
+ FROM obj
+ WHERE partition = %d
+ AND oid = %d""" % (partition, oid)
if tid is not None:
- r = q("""SELECT serial, compression, checksum, value, value_serial
- FROM obj
- WHERE partition = %d AND oid = %d AND serial = %d""" \
- % (partition, oid, tid))
- try:
- serial, compression, checksum, data, value_serial = r[0]
- next_serial = None
- except IndexError:
- return None
+ sql += ' AND serial = %d' % tid
elif before_tid is not None:
- r = q("""SELECT serial, compression, checksum, value, value_serial
- FROM obj
- WHERE partition = %d
- AND oid = %d AND serial < %d
- ORDER BY serial DESC LIMIT 1""" \
- % (partition, oid, before_tid))
- try:
- serial, compression, checksum, data, value_serial = r[0]
- except IndexError:
- return None
- r = q("""SELECT serial FROM obj_short
- WHERE partition = %d
- AND oid = %d AND serial >= %d
- ORDER BY serial LIMIT 1""" \
- % (partition, oid, before_tid))
- try:
- next_serial = r[0][0]
- except IndexError:
- next_serial = None
+ sql += ' AND serial < %d ORDER BY serial DESC LIMIT 1' % before_tid
else:
# XXX I want to express "HAVING serial = MAX(serial)", but
# MySQL does not use an index for a HAVING clause!
- r = q("""SELECT serial, compression, checksum, value, value_serial
- FROM obj
- WHERE partition = %d AND oid = %d
- ORDER BY serial DESC LIMIT 1""" \
- % (partition, oid))
- try:
- serial, compression, checksum, data, value_serial = r[0]
- next_serial = None
- except IndexError:
- return None
-
+ sql += ' ORDER BY serial DESC LIMIT 1'
+ r = q(sql)
+ try:
+ serial, compression, checksum, data, value_serial = r[0]
+ except IndexError:
+ return None
+ r = q("""SELECT serial FROM obj_short
+ WHERE partition = %d AND oid = %d AND serial > %d
+ ORDER BY serial LIMIT 1""" % (partition, oid, serial))
+ try:
+ next_serial = r[0][0]
+ except IndexError:
+ next_serial = None
return serial, next_serial, compression, checksum, data, value_serial
def doSetPartitionTable(self, ptid, cell_list, reset):
More information about the Neo-report
mailing list