[Erp5-report] r31150 gregory - in /experimental/FileSystem: ./ tests/
nobody at svn.erp5.org
nobody at svn.erp5.org
Tue Dec 8 13:46:43 CET 2009
Author: gregory
Date: Tue Dec 8 13:46:43 2009
New Revision: 31150
URL: http://svn.erp5.org?rev=31150&view=rev
Log:
Initial import of experimental FileSystem product.
Added:
experimental/FileSystem/
experimental/FileSystem/Directory.py
experimental/FileSystem/File.py
experimental/FileSystem/README
experimental/FileSystem/__init__.py
experimental/FileSystem/tests/
experimental/FileSystem/tests/testDirectory.py (with props)
Added: experimental/FileSystem/Directory.py
URL: http://svn.erp5.org/experimental/FileSystem/Directory.py?rev=31150&view=auto
==============================================================================
--- experimental/FileSystem/Directory.py (added)
+++ experimental/FileSystem/Directory.py [utf8] Tue Dec 8 13:46:43 2009
@@ -1,0 +1,286 @@
+##############################################################################
+#
+# Copyright (c) 2008 Nexedi SARL and Contributors. All Rights Reserved.
+# Vincent Pelletier <vincent at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+from Shared.DC.ZRDB.TM import TM
+import os
+import sys
+import shutil
+import tempfile
+from Globals import InitializeClass
+from AccessControl import ClassSecurityInfo
+from zLOG import LOG, WARNING
+
+from File import File
+
+class Directory(TM):
+ """
+ Implement transactional directory access.
+
+ An instance can span over multiple transactions, but must not be used by
+ multiple transactions at a time.
+ Each transaction causes the creation of a temporary folder, which is
+ deleted at commit.
+
+ TODO:
+ - declare decent security
+ - handle subdirectories, not just files
+ """
+
+ _tmp_path = None
+ security=ClassSecurityInfo()
+
+ def __init__(self, path, cleanup_at_abort=True):
+ self._path = os.path.normpath(path)
+ self._action_dict = {}
+ self._cleanup_path_list = []
+ self._cleanup_at_abort = cleanup_at_abort
+
+ def _sanitizeRelativePath(self, initial_path):
+ # TODO: improve
+ path = os.path.normpath(initial_path)
+ if os.path.isabs(path):
+ if not(path.startswith(self._path)):
+ raise ValueError, 'Path can not be absolute: %r' % (initial_path, )
+ path = path[len(self._path) + 1:]
+ try:
+ is_same_file = os.path.samefile(self._path, os.path.join(self._path, path))
+ except OSError:
+ is_same_file = False
+ if is_same_file:
+ raise ValueError, 'Path must contain a non-empty filename: %r is the same as %r' % (initial_path, self._path)
+ return path
+
+ def _getTmpPath(self, relative_path):
+ assert self._tmp_path is not None
+ return os.path.join(self._tmp_path, relative_path)
+
+ def _getPath(self, relative_path):
+ return os.path.join(self._path, relative_path)
+
+ def _copyToTmp(self, filename):
+ path = self._getPath(filename)
+ tmp_path = self._getTmpPath(filename)
+ shutil.copy2(path, tmp_path)
+ return tmp_path
+
+ def _register(self, mode=0):
+ """
+ Register to transaction manager.
+ Also, unconditionaly check if it is possible to access self._path in
+ desired mode. Otherwise, raise IOError.
+ Possible values for mode are:
+ 0
+ No special mode required.
+ os.R_OK
+ Read access required.
+ os.W_OK
+ Write access required.
+ os.R_OK | os.W_OK
+ Read and write access both required.
+ Execution right is always required for any operation on the directory.
+ """
+ if not os.access(self._path, mode | os.X_OK):
+ raise IOError, 'Impossible to access %r for %s%sx' % (self._path,
+ (mode & os.R_OK) and 'r' or '-',
+ (mode & os.W_OK) and 'w' or '-'
+ )
+ TM._register(self)
+ assert self._registered
+
+ def _setAction(self, source, destination):
+ self._action_dict[destination] = source
+
+ security.declarePublic('open')
+ def open(self, *args):
+ """
+ Return a file descriptor for given file name.
+ """
+ args = list(args)
+ args_len = len(args)
+ if args_len == 0:
+ raise TypeError, 'open() takes at least 1 argument (0 given)'
+ if args_len > 1:
+ if len(args[1]) == 0:
+ raise IOError, 'invalid mode: %s' % (args[1], )
+ mode = args[1][0]
+ else:
+ mode = 'r'
+ filename = self._sanitizeRelativePath(args[0])
+ path = self._getPath(filename)
+ if mode == 'w':
+ self._register(os.W_OK)
+ tmp_path = self._getTmpPath(filename)
+ self._setAction(tmp_path, path)
+ elif mode == 'r':
+ self._register()
+ tmp_path = self._copyToTmp(filename)
+ self._cleanup_path_list.append(tmp_path)
+ elif mode == 'a':
+ self._register(os.W_OK)
+ if os.path.exists(self._getPath(filename)):
+ tmp_path = self._copyToTmp(filename)
+ else:
+ tmp_path = self._getTmpPath(filename)
+ self._setAction(tmp_path, path)
+ else:
+ raise ValueError, "%r is not a valid open mode" % (mode, )
+ args[0] = tmp_path
+ return File(*args)
+
+ security.declarePublic('rename')
+ def rename(self, source, destination):
+ """
+ Rename file.
+ """
+ source = self._sanitizeRelativePath(source)
+ destination = self._sanitizeRelativePath(destination)
+ self._register(os.W_OK)
+ source_path = self._getPath(source)
+ source_tmp_path = self._copyToTmp(source)
+ destination_path = self._getPath(destination)
+ self._setAction(source_tmp_path, destination_path)
+ self._setAction(None, source_path)
+
+ security.declarePublic('unlink')
+ def unlink(self, filename):
+ """
+ Delete file.
+ NOOP if file does not exist.
+ """
+ filename = self._sanitizeRelativePath(filename)
+ self._register(os.W_OK)
+ to_delete = self._getPath(filename)
+ self._setAction(None, to_delete)
+
+ def _begin(self):
+ assert self._tmp_path is None
+ assert len(self._action_dict) == 0
+ assert len(self._cleanup_path_list) == 0
+ self._tmp_path = tempfile.mkdtemp()
+
+ def tpc_vote(self, *ignored):
+ """
+ Check as many parameters as possible to lower the probability of an
+ error happening in _finish.
+ If there is nothing in _action_dict, there is no check to do.
+ Otherwise, check:
+ - Check if destination is writable
+ - Check if source files exist
+ - Check if there is enough free space at destination to receive data
+ Note: it's just an approximation due to filesystem block and metadata
+ size.
+ Also, filesystem status can change while this method is runing, so
+ values can be inconsistent.
+ - Check if there are enough free inodes at destination
+ - Check if destination filesystem is mounted read-only
+ - Check filename length
+ """
+ if len(self._action_dict):
+ size_change = 0
+ inode_count_change = 0
+ filesystem_stat = os.statvfs(self._path)
+ if filesystem_stat.f_flag & 1: # XXX 1 == ST_RDONLY
+ raise Exception, 'Destination is mounted read-only'
+ if not os.access(self._path, os.W_OK):
+ raise Exception, 'Destination is read-only'
+ for destination, source in self._action_dict.iteritems():
+ if source is None:
+ try:
+ destination_stat = os.stat(destination)
+ except OSError:
+ pass
+ else:
+ size_change -= destination_stat.st_size
+ inode_count_change -= 1
+ else:
+ if len(source) > filesystem_stat.f_namemax:
+ raise Exception, 'Filename too long: %r %r > %r' % \
+ (source, len(source), filesystem_stat.f_namemax)
+ # Raises if file is not found
+ source_stat = os.stat(source)
+ size_change += source_stat.st_size
+ try:
+ destination_stat = os.stat(destination)
+ except OSError:
+ inode_count_change += 1
+ else:
+ size_change -= destination_stat.st_size
+ free_space = filesystem_stat.f_bsize * filesystem_stat.f_bavail
+ if size_change > free_space:
+ raise Exception, 'Not enough space available at destination: '\
+ '%r > %r' % (size_change, free_space)
+ if filesystem_stat.f_files != 0 and \
+ inode_count_change > filesystem_stat.f_favail:
+ raise Exception, 'Not enough file descriptors are available at '\
+ 'destination: %r > %r' % (inode_count_change,
+ filesystem_stat.f_favail)
+ self._finalize = 1
+
+ def _do_unlink(self, path):
+ try:
+ os.unlink(path)
+ except OSError:
+ LOG('FileSystem.Directory', WARNING, 'unlink raised', error=sys.exc_info())
+
+ def _finish(self):
+ for destination, source in self._action_dict.iteritems():
+ if source is None:
+ self._do_unlink(destination)
+ else:
+ shutil.copyfile(source, destination)
+ self._do_unlink(source)
+ for path in self._cleanup_path_list:
+ self._do_unlink(path)
+ try:
+ os.rmdir(self._tmp_path)
+ except OSError:
+ LOG('FileSystem.Directory', WARNING, 'rmdir raised', error=sys.exc_info())
+ self._action_dict.clear()
+ self._tmp_path = None
+ self._cleanup_path_list = []
+
+ def _abort(self):
+ if self._cleanup_at_abort:
+ for destination, source in self._action_dict.iteritems():
+ if source is None:
+ continue
+ self._do_unlink(source)
+ for path in self._cleanup_path_list:
+ self._do_unlink(path)
+ try:
+ os.rmdir(self._tmp_path)
+ except OSError:
+ LOG('FileSystem.Directory', WARNING, 'rmdir raised', error=sys.exc_info())
+ else:
+ LOG('FileSystem.Directory', WARNING, 'Transaction aborted. Temporary folder left behind: %r' % (self._tmp_path, ))
+ self._action_dict.clear()
+ self._tmp_path = None
+ self._cleanup_path_list = []
+
+InitializeClass(Directory)
+
Added: experimental/FileSystem/File.py
URL: http://svn.erp5.org/experimental/FileSystem/File.py?rev=31150&view=auto
==============================================================================
--- experimental/FileSystem/File.py (added)
+++ experimental/FileSystem/File.py [utf8] Tue Dec 8 13:46:43 2009
@@ -1,0 +1,41 @@
+##############################################################################
+#
+# Copyright (c) 2008 Nexedi SARL and Contributors. All Rights Reserved.
+# Vincent Pelletier <vincent at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+from Products.PythonScripts.Utility import allow_class
+
+class File(file):
+ """
+ Dummy class.
+ Its only purpose is to allow accessing file class methods.
+ This module MUST NOT be allowed for import into unrestricted python code.
+ Only instances of this class should be accessible, as returned by
+ Directory class.
+ """
+ pass
+
+allow_class(File)
Added: experimental/FileSystem/README
URL: http://svn.erp5.org/experimental/FileSystem/README?rev=31150&view=auto
==============================================================================
--- experimental/FileSystem/README (added)
+++ experimental/FileSystem/README [utf8] Tue Dec 8 13:46:43 2009
@@ -1,0 +1,63 @@
+FileSystem product
+------------------
+
+/!\ WARNING /!\
+
+ Before using this product, please read all the documentation and source code
+ to ensure it fits your needs and requirements.
+
+
+Purpose:
+
+ Add transactional capabilities the file system access.
+
+
+When to use it?
+
+ When file accesses within a Zope transaction should happen at transaction
+ commit, and nothing should happen on abort.
+
+
+How does it works?
+
+ It implements the two-phase commit by storing files into a temporary
+ directory during transaction then move or drop them at commit or abort.
+
+
+Features:
+
+ - Open/create a file in read/write/append modes
+ - Rename/unkink a file
+ - Can be imported from restricted python
+
+Usage example:
+
+ # open a directory with FileSystem
+ from FileSystem.Directory import Directory
+ my_directory = Directory('/my_directory')
+
+ # create a new file and write some data
+ my_file = my_directory.open('my_file_name', 'w')
+ my_file.write('my_content')
+
+ # close the file then commit to change my_directory's content
+ my_file.close()
+ get_transaction().commit()
+
+
+Known issues:
+
+ - Transactionality on top of file system cannot be perfectly achieved, so this
+ product *will* break in some cases. It tries to detect those cases at tpc_vote
+ time, but there is no way to be sure nothing will change until tpc_finish:
+ - lack of disk space or inodes since vote
+ - any change happening on temporary directory
+ - (...many other cases)
+ - No conflict detection: Concurrent accesses are solved by "last write
+ (commit) wins".
+ - Only single-level-directory file accesses are implemented.
+ - No file access rights management accessible from user
+ - All file accesses are done with zope process user
+ - No path jail mechanism (developper can access what zope process user can
+ access)
+ - Naive Zope security declarations
Added: experimental/FileSystem/__init__.py
URL: http://svn.erp5.org/experimental/FileSystem/__init__.py?rev=31150&view=auto
==============================================================================
--- experimental/FileSystem/__init__.py (added)
+++ experimental/FileSystem/__init__.py [utf8] Tue Dec 8 13:46:43 2009
@@ -1,0 +1,31 @@
+##############################################################################
+#
+# Copyright (c) 2008 Nexedi SARL and Contributors. All Rights Reserved.
+# Vincent Pelletier <vincent at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+from AccessControl.SecurityInfo import allow_module
+allow_module('Products.FileSystem.Directory') # XXX: There should be a cleaner way
+
Added: experimental/FileSystem/tests/testDirectory.py
URL: http://svn.erp5.org/experimental/FileSystem/tests/testDirectory.py?rev=31150&view=auto
==============================================================================
--- experimental/FileSystem/tests/testDirectory.py (added)
+++ experimental/FileSystem/tests/testDirectory.py [utf8] Tue Dec 8 13:46:43 2009
@@ -1,0 +1,453 @@
+#!/usr/bin/python
+##############################################################################
+#
+# Copyright (c) 2008 Nexedi SARL and Contributors. All Rights Reserved.
+# Vincent Pelletier <vincent at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+import unittest
+import tempfile
+import os
+from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
+from AccessControl.SecurityManagement import newSecurityManager
+from Products.FileSystem.Directory import Directory
+
+class TestDirectory(ERP5TypeTestCase):
+
+ def getTitle(self):
+ return "Directory"
+
+ def getBusinessTemplateList(self):
+ return ()
+
+ def test_01_create(self):
+ path_1 = tempfile.mkdtemp()
+ try:
+ file_1_name = 'foo'
+ file_1_content = 'this is a test'
+ file_1_final_path = os.path.join(path_1, file_1_name)
+ directory_1 = Directory(path_1)
+ file_1 = directory_1.open(file_1_name, 'w')
+ file_1.write(file_1_content)
+ file_1.close()
+ self.assertRaises(IOError, open, file_1_final_path, 'r')
+ get_transaction().commit()
+ file_1_final = open(file_1_final_path, 'r') # Must succeed now
+ self.assertEqual(file_1_final.read(64), file_1_content)
+ file_1_final.close()
+ finally:
+ try:
+ os.unlink(file_1_final_path)
+ except OSError:
+ pass
+ os.rmdir(path_1)
+
+ def test_02_createAndAbort(self):
+ path_1 = tempfile.mkdtemp()
+ try:
+ file_1_name = 'foo'
+ file_1_content = 'this is a test'
+ file_1_final_path = os.path.join(path_1, file_1_name)
+ directory_1 = Directory(path_1, cleanup_at_abort=True)
+ file_1 = directory_1.open(file_1_name, 'w')
+ file_1.write(file_1_content)
+ file_1.close()
+ self.assertRaises(IOError, open, file_1_final_path, 'r')
+ get_transaction().abort()
+ self.assertRaises(IOError, open, file_1_final_path, 'r')
+ finally:
+ try:
+ os.unlink(file_1_final_path)
+ except OSError:
+ pass
+ os.rmdir(path_1)
+
+ def test_03_rename(self):
+ path_1 = tempfile.mkdtemp()
+ try:
+ file_1_initial_name = 'foo'
+ file_1_final_name = 'bar'
+ file_1_content = 'this is a test'
+ file_1_initial_path = os.path.join(path_1, file_1_initial_name)
+ file_1_final_path = os.path.join(path_1, file_1_final_name)
+ # Create initial file
+ file_1 = open(file_1_initial_path, 'w')
+ file_1.write(file_1_content)
+ file_1.close()
+ # Rename
+ directory_1 = Directory(path_1)
+ directory_1.rename(file_1_initial_name, file_1_final_name)
+ self.assertRaises(IOError, open, file_1_final_path, 'r')
+ get_transaction().commit()
+ self.assertRaises(IOError, open, file_1_initial_path, 'r')
+ file_1_final = open(file_1_final_path, 'r') # Must succeed now
+ self.assertEqual(file_1_final.read(64), file_1_content)
+ file_1_final.close()
+ finally:
+ try:
+ os.unlink(file_1_initial_path)
+ except OSError:
+ pass
+ try:
+ os.unlink(file_1_final_path)
+ except OSError:
+ pass
+ os.rmdir(path_1)
+
+ def test_04_renameAndAbort(self):
+ path_1 = tempfile.mkdtemp()
+ try:
+ file_1_initial_name = 'foo'
+ file_1_final_name = 'bar'
+ file_1_content = 'this is a test'
+ file_1_initial_path = os.path.join(path_1, file_1_initial_name)
+ file_1_final_path = os.path.join(path_1, file_1_final_name)
+ # Create initial file
+ file_1 = open(file_1_initial_path, 'w')
+ file_1.write(file_1_content)
+ file_1.close()
+ # Rename
+ directory_1 = Directory(path_1, cleanup_at_abort=True)
+ directory_1.rename(file_1_initial_name, file_1_final_name)
+ self.assertRaises(IOError, open, file_1_final_path, 'r')
+ get_transaction().abort()
+ self.assertRaises(IOError, open, file_1_final_path, 'r')
+ file_1_final = open(file_1_initial_path, 'r') # Must still succeed
+ self.assertEqual(file_1_final.read(64), file_1_content)
+ file_1_final.close()
+ finally:
+ try:
+ os.unlink(file_1_initial_path)
+ except OSError:
+ pass
+ try:
+ os.unlink(file_1_final_path)
+ except OSError:
+ pass
+ os.rmdir(path_1)
+
+ def test_05_unlink(self):
+ path_1 = tempfile.mkdtemp()
+ try:
+ file_1_name = 'foo'
+ file_1_content = 'this is a test'
+ file_1_path = os.path.join(path_1, file_1_name)
+ # Create initial file
+ file_1 = open(file_1_path, 'w')
+ file_1.write(file_1_content)
+ file_1.close()
+ # Unlink
+ directory_1 = Directory(path_1)
+ directory_1.unlink(file_1_name)
+ get_transaction().commit()
+ self.assertRaises(IOError, open, file_1_path, 'r')
+ finally:
+ try:
+ os.unlink(file_1_path)
+ except OSError:
+ pass
+ os.rmdir(path_1)
+
+ def test_06_unlinkAndAbort(self):
+ path_1 = tempfile.mkdtemp()
+ try:
+ file_1_name = 'foo'
+ file_1_content = 'this is a test'
+ file_1_path = os.path.join(path_1, file_1_name)
+ # Create initial file
+ file_1 = open(file_1_path, 'w')
+ file_1.write(file_1_content)
+ file_1.close()
+ # Unlink
+ directory_1 = Directory(path_1, cleanup_at_abort=True)
+ directory_1.unlink(file_1_name)
+ get_transaction().abort()
+ file_1_final = open(file_1_path, 'r') # Must still succeed
+ self.assertEqual(file_1_final.read(64), file_1_content)
+ file_1_final.close()
+ finally:
+ try:
+ os.unlink(file_1_path)
+ except OSError:
+ pass
+ os.rmdir(path_1)
+
+ def test_07_unlinkConcurency(self):
+ path_1 = tempfile.mkdtemp()
+ try:
+ file_1_name = 'foo'
+ file_1_path = os.path.join(path_1, file_1_name)
+ # Create initial file
+ file_1 = open(file_1_path, 'w')
+ file_1.close()
+ # Unlink via Directory class
+ directory_1 = Directory(path_1)
+ directory_1.unlink(file_1_name)
+ # Unlink directly
+ os.unlink(file_1_path)
+ get_transaction().commit() # Must not raise
+ finally:
+ try:
+ os.unlink(file_1_path)
+ except OSError:
+ pass
+ os.rmdir(path_1)
+
+ def test_08_renameConcurency(self):
+ path_1 = tempfile.mkdtemp()
+ try:
+ file_1_name = 'foo'
+ file_1_final_name = 'bar'
+ file_1_path = os.path.join(path_1, file_1_name)
+ file_1_final_path = os.path.join(path_1, file_1_final_name)
+ # Create initial file
+ file_1 = open(file_1_path, 'w')
+ file_1.close()
+ # Rename
+ directory_1 = Directory(path_1)
+ directory_1.rename(file_1_name, file_1_final_name)
+ # Unlink source file
+ os.unlink(file_1_path)
+ get_transaction().commit() # Must not raise
+ file_1_final = open(file_1_final_path, 'r') # Must not raise
+ finally:
+ try:
+ os.unlink(file_1_path)
+ except OSError:
+ pass
+ try:
+ os.unlink(file_1_final_path)
+ except OSError:
+ pass
+ os.rmdir(path_1)
+
+ def test_09_openForRead(self):
+ path_1 = tempfile.mkdtemp()
+ try:
+ file_1_name = 'foo'
+ file_1_content = 'this is a test'
+ file_1_path = os.path.join(path_1, file_1_name)
+ # Create initial file
+ file_1 = open(file_1_path, 'w')
+ file_1.write(file_1_content)
+ file_1.close()
+ # Open with Directory
+ directory_1 = Directory(path_1)
+ directory_file_1 = directory_1.open(file_1_name, 'r')
+ # Modify (empty) file content
+ file_1 = open(file_1_path, 'w')
+ file_1.close()
+ # Check that Directory file content remained the same
+ self.assertEqual(directory_file_1.read(64), file_1_content)
+ directory_file_1.close()
+ get_transaction().commit()
+ finally:
+ try:
+ os.unlink(file_1_path)
+ except OSError:
+ pass
+ os.rmdir(path_1)
+
+ def test_10_openForAppend(self):
+ path_1 = tempfile.mkdtemp()
+ try:
+ file_1_name = 'foo'
+ file_1_content = 'this is a test'
+ file_1_path = os.path.join(path_1, file_1_name)
+ # Create initial file
+ file_1 = open(file_1_path, 'w')
+ file_1.write(file_1_content)
+ file_1.close()
+ # Open with Directory
+ directory_1 = Directory(path_1)
+ directory_file_1 = directory_1.open(file_1_name, 'a')
+ directory_file_1.write(file_1_content)
+ directory_file_1.close()
+ # Check that original file content did not change yet
+ file_1 = open(file_1_path, 'r')
+ self.assertEqual(file_1.read(64), file_1_content)
+ file_1.close()
+ get_transaction().commit()
+ # Check that original file content is updated
+ file_1 = open(file_1_path, 'r')
+ self.assertEqual(file_1.read(128), file_1_content * 2)
+ file_1.close()
+ finally:
+ try:
+ os.unlink(file_1_path)
+ except OSError:
+ pass
+ os.rmdir(path_1)
+
+ def test_11_openWithValidAbsolutePath(self):
+ path_1 = tempfile.mkdtemp()
+ try:
+ file_1_name = 'foo'
+ file_1_path = os.path.join(path_1, file_1_name)
+ directory_1 = Directory(path_1)
+ # Check that it's allowed to open for write.
+ file_1 = open(file_1_path, 'w')
+ file_1.close()
+ # Do the actual test
+ directory_file_1 = directory_1.open(file_1_path, 'w') # Must not raise.
+ directory_file_1.close()
+ get_transaction().commit()
+ finally:
+ try:
+ os.unlink(file_1_path)
+ except OSError:
+ pass
+ os.rmdir(path_1)
+
+ def test_12_openWithInvalidAbsolutePath(self):
+ path_1 = tempfile.mkdtemp()
+ path_2 = tempfile.mkdtemp()
+ try:
+ file_1_name = 'foo'
+ file_1_path = os.path.join(path_1, file_1_name)
+ file_2_path = os.path.join(path_2, file_1_name)
+ directory_1 = Directory(path_1)
+ # Check that it's allowed to open for write.
+ file_1 = open(file_2_path, 'w')
+ file_1.close()
+ # Do the actual test
+ self.assertRaises(ValueError, directory_1.open, file_2_path, 'w') # Must raise.
+ get_transaction().commit()
+ finally:
+ try:
+ os.unlink(file_1_path)
+ except OSError:
+ pass
+ os.rmdir(path_1)
+ try:
+ os.unlink(file_2_path)
+ except OSError:
+ pass
+ os.rmdir(path_2)
+
+ def test_13_checkNoFilenameCorruptionWithUncleanDirectoryPath(self):
+ path_1 = tempfile.mkdtemp()
+ try:
+ file_1_name = 'foo'
+ file_1_path = os.path.join(path_1, file_1_name)
+ directory_1 = Directory(path_1 + os.sep)
+ directory_file_1 = directory_1.open(file_1_path, 'w')
+ directory_file_1.close()
+ get_transaction().commit()
+ file_1 = open(file_1_path, 'r') # Must not raise
+ file_1.close()
+ finally:
+ try:
+ os.unlink(file_1_path)
+ except OSError:
+ pass
+ os.rmdir(path_1)
+
+ def test_14_checkExceptionsWithMissingRights(self):
+ path_1 = tempfile.mkdtemp()
+ try:
+ file_1_name = 'foo'
+ file_1_path = os.path.join(path_1, file_1_name)
+ directory_1 = Directory(path_1 + os.sep)
+ # For each right combination, list open modes which must succeed.
+ # All others are supposed to fail.
+ rights_dict = {
+ os.R_OK | os.W_OK | os.X_OK : ['w', 'a', 'r'],
+ os.R_OK | os.W_OK : [],
+ os.R_OK | os.X_OK : ['r'],
+ os.R_OK : [],
+ os.W_OK | os.X_OK : ['w', 'a', 'r'],
+ os.W_OK : [],
+ os.X_OK : ['r'],
+ 0 : []
+ }
+ open(file_1_path, 'w').close()
+ # Strip inode type from mode (directory, pipe, socket, etc...) as they
+ # are forbidden as chmod values.
+ initial_file_mode = os.stat(path_1).st_mode & 007777
+ # Only keep suid/sgid/sticky part of mode.
+ clean_file_mode = initial_file_mode & 07000
+ for rights, mode_list in rights_dict.iteritems():
+ os.chmod(path_1, clean_file_mode | (rights << 6))
+ for mode, access in (('w', os.W_OK | os.X_OK), ('a', os.W_OK | os.X_OK), ('r', os.X_OK)):
+ if mode in mode_list:
+ self.assertTrue(os.access(path_1, access))
+ directory_1.open(file_1_name, mode).close()
+ else:
+ self.assertFalse(os.access(path_1, access))
+ self.assertRaises(IOError, directory_1.open, file_1_name, mode)
+ get_transaction().commit()
+ finally:
+ os.chmod(path_1, initial_file_mode)
+ try:
+ os.unlink(file_1_path)
+ except OSError:
+ pass
+ os.rmdir(path_1)
+
+ def test_15_testEmptyFilenameCannotBeUsed(self):
+ path_1 = tempfile.mkdtemp()
+ try:
+ directory_1 = Directory(path_1)
+ for filename in ('', path_1):
+ self.assertRaises(ValueError, directory_1.open, filename, 'r')
+ self.assertRaises(ValueError, directory_1.open, filename, 'w')
+ self.assertRaises(ValueError, directory_1.open, filename, 'a')
+ self.assertRaises(ValueError, directory_1.unlink, filename)
+ self.assertRaises(ValueError, directory_1.rename, filename, 'foo')
+ self.assertRaises(ValueError, directory_1.rename, 'foo', filename)
+ get_transaction().commit() # This must not create any file
+ finally:
+ os.rmdir(path_1)
+
+ def test_16_openForAppend(self):
+ path_1 = tempfile.mkdtemp()
+ try:
+ file_1_name = 'foo'
+ file_1_content = 'this is a test'
+ file_1_path = os.path.join(path_1, file_1_name)
+ # Ensure the file doesn't exists
+ self.assertRaises(IOError, open, file_1_path)
+ # Open the file with Directory in append mode
+ directory_1 = Directory(path_1)
+ directory_file_1 = directory_1.open(file_1_name, 'a')
+ directory_file_1.write(file_1_content)
+ directory_file_1.close()
+ # Check that original file is unkwown
+ self.assertRaises(IOError, open, file_1_path)
+ # Check that original file content is updated after commit
+ get_transaction().commit()
+ file_1 = open(file_1_path, 'r')
+ self.assertEqual(file_1.read(128), file_1_content)
+ file_1.close()
+ finally:
+ try:
+ os.unlink(file_1_path)
+ except OSError:
+ pass
+ os.rmdir(path_1)
+
+if __name__ == '__main__':
+ unittest.main()
Propchange: experimental/FileSystem/tests/testDirectory.py
------------------------------------------------------------------------------
svn:executable = *
More information about the Erp5-report
mailing list