[Erp5-report] r46031 nicolas - in /erp5/trunk/utils/xupdate_processor/src/xupdate_processor...
nobody at svn.erp5.org
nobody at svn.erp5.org
Mon Oct 31 09:47:06 CET 2011
Author: nicolas
Date: Mon Oct 31 09:47:05 2011
New Revision: 46031
URL: http://svn.erp5.org?rev=46031&view=rev
Log:
Add support of mixed content updates.
Modified:
erp5/trunk/utils/xupdate_processor/src/xupdate_processor/content_handler.py
erp5/trunk/utils/xupdate_processor/src/xupdate_processor/tests/xupdate_processor_test_suite.py
Modified: erp5/trunk/utils/xupdate_processor/src/xupdate_processor/content_handler.py
URL: http://svn.erp5.org/erp5/trunk/utils/xupdate_processor/src/xupdate_processor/content_handler.py?rev=46031&r1=46030&r2=46031&view=diff
==============================================================================
--- erp5/trunk/utils/xupdate_processor/src/xupdate_processor/content_handler.py [utf8] (original)
+++ erp5/trunk/utils/xupdate_processor/src/xupdate_processor/content_handler.py [utf8] Mon Oct 31 09:47:05 2011
@@ -7,6 +7,11 @@ from copy import deepcopy
import re
attribute_axes_regex = re.compile(r'(?P<parent_node>^.*(?=/(attribute::|@)))(?P<attribute_axes>/(attribute::|@))(?P<attribute_id>[\w\-:]*)$')
+text_axes_regex = re.compile(r'(?P<parent_node>^.*(?=/text\(\)))(/text\(\))(?P<predicate>\[(?P<position>(\d))])?')
+
+TEXT_ME_FLAG = 1
+TAIL_ME_FLAG = 2
+
XUPDATE_NS = 'http://www.xmldb.org/xupdate'
@@ -38,44 +43,68 @@ class XUpdateHandler(ContentHandler):
self._parent_node_append_stack = []
self._current_position_to_append_stack = []
self._attr_name_stack = []
+ self._capture_string = False
+
+ def pop(self):
+ return self._node_stack.pop()
+
+ def push(self, node):
+ self._node_stack.append(node)
def startPrefixMapping(self, prefix, uri):
self.nsmap[prefix] = uri
def startElementNS(self, name, qname, attrs):
uri, localname = name
- #store position of current string_stack
- #to join all characters contents for the current element only
+ # store position of current string_stack
+ # to join all characters contents for the current element only
self._current_position_string_stack.append(len(self._string_stack))
if uri == XUPDATE_NS:
- #This is an xupdate action
+ # This is an xupdate action
if localname == 'modifications':
if self._modification_starts:
raise XUpdateValidationException('{%s}modifications already read' % XUPDATE_NS)
else:
self.modification_starts = True
elif localname == 'update':
- #get the node fromp xpath expression
+ # get the node fromp xpath expression
xpath_expression = attrs.getValueByQName('select')
- matched_re = attribute_axes_regex.match(xpath_expression)
- if matched_re is not None:
- #Unfortunately, etree manage attributes as smart_string results (they known their parents)
- # But they do not know in which attribute they are linked
- #so, use a workaround to update attribute nodes like dictionaries
- #('/attribute::' or '/@' axes)
- #XXX This code will be removed after release of lxml 2.2.4
- parent_node_expression = matched_re.group('parent_node')
- attribute_id = matched_re.group('attribute_id')
+ attribute_match_re = attribute_axes_regex.match(xpath_expression)
+ text_match_re = text_axes_regex.match(xpath_expression)
+ if attribute_match_re is not None:
+ # Unfortunately, etree manage attributes as smart_string results but they known their parents.
+ # They do not know in which element they are linked
+ # so, use a workaround to update attribute nodes like dictionaries
+ # ('/attribute::' or '/@' axes)
+ # XXX This code will be removed after release of lxml 2.2.4
+ parent_node_expression = attribute_match_re.group('parent_node')
+ attribute_id = attribute_match_re.group('attribute_id')
if ':' in attribute_id:
prefix, local_name = attribute_id.split(':')
uri = self.nsmap[prefix]
attribute_id = '{%s}%s' % (uri, local_name,)
node = self.result_tree.xpath(parent_node_expression, namespaces=self.nsmap)[0]
- self._node_stack.append((node, attribute_id))
+ self.push((node, attribute_id))
+ self._current_position_to_append_stack.append(len(self._node_stack))
+ elif text_match_re is not None:
+ self._capture_string = True
+ # The target is a text node. Fetch it's parent or sibling node in order
+ # to attach the updated text nodes to its tail attribute
+ parent_node_xpath = text_match_re.group('parent_node')
+ # Mixed Content, fetch its sibling node
+ previous_node_xpath = xpath_expression + '/preceding-sibling::*[1]'
+ previous_node_list = self.result_tree.xpath(previous_node_xpath, namespaces=self.nsmap)
+ if previous_node_list:
+ previous_node = previous_node_list[0]
+ self.push((previous_node, TAIL_ME_FLAG))
+ else:
+ # this is text of parent_node
+ parent_node = self.result_tree.xpath(parent_node_xpath, namespaces=self.nsmap)[0]
+ self.push((parent_node, TEXT_ME_FLAG))
self._current_position_to_append_stack.append(len(self._node_stack))
else:
node = self.result_tree.xpath(xpath_expression, namespaces=self.nsmap)[0]
- self._node_stack.append(node)
+ self.push(node)
self._current_position_to_append_stack.append(len(self._node_stack))
elif localname == 'remove':
xpath_expression = attrs.getValueByQName('select')
@@ -106,11 +135,11 @@ class XUpdateHandler(ContentHandler):
tag_name = '{%s}%s' % (uri, local_name,)
nsmap[prefix] = uri
node = Element(tag_name, nsmap=nsmap)
- self._node_stack.append(node)
+ self.push(node)
self._current_position_to_append_stack.append(len(self._node_stack))
elif localname == 'attribute':
node = self._node_stack[-1]
- self._node_stack.append(node)
+ self.push(node)
attr_name = attrs.getValueByQName('name')
if ':' in attr_name:
uri = attrs.getValueByQName('namespace')
@@ -120,21 +149,21 @@ class XUpdateHandler(ContentHandler):
elif localname == 'insert-before':
node = self.result_tree.xpath(attrs.getValueByQName('select'), namespaces=self.nsmap)[0]
self._current_position_to_append_stack.append(len(self._node_stack))
- self._node_stack.append(node)
+ self.push(node)
elif localname == 'insert-after':
node = self.result_tree.xpath(attrs.getValueByQName('select'), namespaces=self.nsmap)[0]
self._current_position_to_append_stack.append(len(self._node_stack))
- self._node_stack.append(node)
+ self.push(node)
elif localname == 'rename':
node = self.result_tree.xpath(attrs.getValueByQName('select'), namespaces=self.nsmap)[0]
- self._node_stack.append(node)
+ self.push(node)
elif localname == 'processing-instruction':
node_name = attrs.getValueByQName('name')
node = ProcessingInstruction(node_name)
- self._node_stack.append(node)
+ self.push(node)
elif localname == 'comment':
node = Comment()
- self._node_stack.append(node)
+ self.push(node)
elif localname == 'variable':
name = attrs.getValueByQName('name')
node = self.result_tree.xpath(attrs.getValueByQName('select'), namespaces=self.nsmap)[0]
@@ -142,12 +171,15 @@ class XUpdateHandler(ContentHandler):
elif localname == 'value-of':
select = attrs.getValueByQName('select')
if select[0] == '$':
- #This is a variable
+ # This is a variable
node = self._variable_dict[select[1:]]
else:
- #This is an xpath expression
+ # This is an xpath expression
node = self.result_tree.xpath(select, namespaces=self.nsmap)[0]
- self._node_stack.append(deepcopy(node))
+ self.push(deepcopy(node))
+ elif localname == 'text':
+ self._capture_string = True
+ self.push(self._node_stack[-1])
else:
raise NotImplementedError(localname)
else:
@@ -163,34 +195,49 @@ class XUpdateHandler(ContentHandler):
attr_name = '{%s}%s' % (uri, attr_name)
new_attr_dict[attr_name] = value
new_node.attrib.update(new_attr_dict)
- self._node_stack.append(new_node)
+ self.push(new_node)
self._current_position_to_append_stack.append(len(self._node_stack))
+ parent_position = self._current_position_to_append_stack[-2]-1
+ if self._string_stack[parent_position:]:
+ # means there is text before current node,
+ # so append current string_stack to parent_node.text
+ token_list = [self._string_stack.pop() for token in self._string_stack[parent_position:]]
+ token_list.reverse()
+ content = ''.join(token_list)
+ self._node_stack[-2].text = content
def characters(self, content):
- if content.strip():
+ if content.strip() or self._capture_string:
self._string_stack.append(content)
def endElementNS(self, name, qname):
uri, localname = name
last_position = self._current_position_string_stack.pop()
token_list = [self._string_stack.pop() for token in self._string_stack[last_position:]]
- token_list.reverse()
- content = ''.join(token_list)
- if not content.strip():
+ if token_list:
+ token_list.reverse()
+ content = ''.join(token_list)
+ else:
content = None
if uri == XUPDATE_NS:
- #This is an xupdate action
+ # This is an xupdate action
+ self._capture_string = False
if localname == 'modifications':
pass
elif localname == 'update':
- #get the node fromp xpath expression
+ # get the node from xpath expression
node_stack_position = self._current_position_to_append_stack.pop()
- node_to_append_list = [self._node_stack.pop() for node in self._node_stack[node_stack_position:]]
- node = self._node_stack.pop()
+ node_to_append_list = [self.pop() for node in self._node_stack[node_stack_position:]]
+ node = self.pop()
if isinstance(node, tuple):
- #This is a attribute update
- node, attribute_id = node
- node.attrib.update({attribute_id: content})
+ if node[1] is TEXT_ME_FLAG:
+ node[0].text = content
+ elif node[1] is TAIL_ME_FLAG:
+ node[0].tail = content
+ else:
+ #This is a attribute update
+ node, attribute_id = node
+ node.attrib.update({attribute_id: content})
else:
node.extend(node_to_append_list)
if len(node):
@@ -202,17 +249,17 @@ class XUpdateHandler(ContentHandler):
elif localname == 'append':
xpath_position, node_stack_position = self._current_position_to_append_stack.pop()
parent_node = self._parent_node_append_stack.pop()
- node_to_append_list = [self._node_stack.pop() for node in self._node_stack[node_stack_position:]]
+ node_to_append_list = [self.pop() for node in self._node_stack[node_stack_position:]]
if xpath_position == 'first()':
for node in node_to_append_list:
parent_node.insert(0, node)
else:
- #Include last()
+ # Include last()
node_to_append_list.reverse()
parent_node.extend(node_to_append_list)
elif localname == 'element':
node_stack_position = self._current_position_to_append_stack.pop()
- node_to_append_list = [self._node_stack.pop() for node in self._node_stack[node_stack_position:]]
+ node_to_append_list = [self.pop() for node in self._node_stack[node_stack_position:]]
node_to_append_list.reverse()
node = self._node_stack[-1]
node.extend(node_to_append_list)
@@ -221,31 +268,31 @@ class XUpdateHandler(ContentHandler):
else:
node.text = content
elif localname == 'attribute':
- node = self._node_stack.pop()
+ node = self.pop()
attr_name = self._attr_name_stack.pop()
node.attrib.update({attr_name: content})
elif localname == 'insert-before':
node_stack_position = self._current_position_to_append_stack.pop()
- node_to_append_list = [self._node_stack.pop() for node in self._node_stack[node_stack_position+1:]]
+ node_to_append_list = [self.pop() for node in self._node_stack[node_stack_position+1:]]
node_to_append_list.reverse()
- sibling_node = self._node_stack.pop()
+ sibling_node = self.pop()
for node in node_to_append_list:
sibling_node.addprevious(node)
elif localname == 'insert-after':
node_stack_position = self._current_position_to_append_stack.pop()
- node_to_append_list = [self._node_stack.pop() for node in self._node_stack[node_stack_position+1:]]
- sibling_node = self._node_stack.pop()
+ node_to_append_list = [self.pop() for node in self._node_stack[node_stack_position+1:]]
+ sibling_node = self.pop()
for node in node_to_append_list:
sibling_node.addnext(node)
elif localname == 'rename':
- node = self._node_stack.pop()
+ node = self.pop()
if ':' in content:
- #Transform QName to compatible Tag name for lxml
+ # Transform QName to compatible Tag name for lxml
prefix, localname = content.split(':')
uri = self.nsmap.get(prefix)
content = '{%s}%s' % (uri, localname)
- #Update nsmap of existing node is forbiden,
- #So create a new node and replace it
+ # Update nsmap of existing node is forbiden,
+ # So create a new node and replace it
new_node = Element(content, nsmap={prefix: uri})
for child in node:
new_node.append(deepcopy(child))
@@ -257,32 +304,40 @@ class XUpdateHandler(ContentHandler):
if parent is not None:
node.getparent().replace(node, new_node)
else:
- #if root Element
+ # if root Element
self.result_tree._setroot(new_node)
else:
node.tag = content
elif localname == 'processing-instruction':
- #Stay orphans do not remove from stack
+ # Stay orphans do not remove from stack
node = self._node_stack[-1]
node.text = content
elif localname == 'comment':
- #Stay orphans do not remove from stack
+ # Stay orphans do not remove from stack
node = self._node_stack[-1]
node.text = content
elif localname == 'variable':
pass
elif localname == 'value-of':
pass
+ elif localname == 'text':
+ node = self.pop()
+ node.tail = content
else:
raise NotImplementedError(localname)
else:
node_stack_position = self._current_position_to_append_stack.pop()
- node_to_append_list = [self._node_stack.pop() for node in self._node_stack[node_stack_position:]]
- #Stay orphans do not remove from stack
+ node_to_append_list = [self.pop() for node in self._node_stack[node_stack_position:]]
+ # Stay orphans do not remove from stack
node_to_append_list.reverse()
node = self._node_stack[-1]
+ mode = None
+ if type(node) is tuple:
+ node, mode = node
node.extend(node_to_append_list)
- if len(node):
+ if mode is TEXT_ME_FLAG:
+ node.text = content
+ elif len(node):
node[-1].tail = content
else:
node.text = content
Modified: erp5/trunk/utils/xupdate_processor/src/xupdate_processor/tests/xupdate_processor_test_suite.py
URL: http://svn.erp5.org/erp5/trunk/utils/xupdate_processor/src/xupdate_processor/tests/xupdate_processor_test_suite.py?rev=46031&r1=46030&r2=46031&view=diff
==============================================================================
--- erp5/trunk/utils/xupdate_processor/src/xupdate_processor/tests/xupdate_processor_test_suite.py [utf8] (original)
+++ erp5/trunk/utils/xupdate_processor/src/xupdate_processor/tests/xupdate_processor_test_suite.py [utf8] Mon Oct 31 09:47:05 2011
@@ -626,6 +626,47 @@ class TestXUpdateProcessor(unittest.Test
self._assertXUprocWorks(xml_xu_string, xml_doc_string,
expected_result_string)
+ def test_mixed_content(self):
+ """handle mixed content
+ """
+ xml_doc_string = """
+<ul>
+ <node/>blablabla
+ <node>AAA<blank/>BBB</node>
+ <node>AAA<blank/>BBB</node>
+ <node>AAA<blank/>BBB<blank/>BBB</node>CCC
+ <a_node/>AAA
+ <b_node/>
+ <b_node/>
+ <node/>
+</ul>
+"""
+
+ xml_xu_string = """<xupdate:modifications xmlns:xupdate="http://www.xmldb.org/xupdate" version="1.0">
+ <xupdate:update select="/ul/text()[1]">yayaya
+ </xupdate:update>
+ <xupdate:update select="/ul/node[2]/text()[1]">C</xupdate:update>
+ <xupdate:update select="/ul/node[3]/text()[2]">D</xupdate:update>
+ <xupdate:update select="/ul/text()[2]">F
+ </xupdate:update>
+ <xupdate:update select="/ul/node[4]/text()[3]">E</xupdate:update>
+ <xupdate:update select="/ul/text()[3]"/>
+ <xupdate:insert-after select="/ul/b_node[2]">
+ <xupdate:text>G
+ </xupdate:text>
+ </xupdate:insert-after>
+ <xupdate:update select="/ul/node[5]">BBB<blank/>H</xupdate:update>
+</xupdate:modifications>
+"""
+ expected_result_string = """<ul><node/>yayaya
+ <node>C<blank/>BBB</node><node>AAA<blank/>D</node><node>AAA<blank/>BBB<blank/>E</node>F
+ <a_node/><b_node/><b_node/>G
+ <node>BBB<blank/>H</node></ul>
+"""
+
+ self._assertXUprocWorks(xml_xu_string, xml_doc_string,
+ expected_result_string)
+
def test_OOofiles1(self):
"""
"""
More information about the Erp5-report
mailing list