class DocxMerger
A class named DocxMerger
/tf/active/vicechatdev/word_merge.py
41 - 1150
moderate
Purpose
No detailed description available
Source Code
class DocxMerger:
def __init__(self, base_docx, revision_docx, output_docx):
"""Initialize with paths to the base document, revision document, and output path"""
self.base_docx = base_docx
self.revision_docx = revision_docx
self.output_docx = output_docx
self.temp_dirs = {}
self.id_maps = {
'comments': {},
'bookmarks': {},
'rels': {},
'footnotes': {},
'endnotes': {}
}
# Track maximum IDs
self.max_ids = {
'comments': 0,
'bookmarks': 0,
'rels': 0,
'footnotes': 0,
'endnotes': 0
}
# Register all namespaces for proper XML handling
for prefix, uri in NAMESPACES.items():
etree.register_namespace(prefix, uri)
def setup_temp_dirs(self):
"""Set up temporary directories for processing"""
for name in ['base', 'revision', 'output']:
self.temp_dirs[name] = tempfile.mkdtemp(prefix=f"docx_merge_{name}_")
logger.info(f"Created temporary directory: {self.temp_dirs[name]}")
def cleanup_temp_dirs(self):
"""Clean up temporary directories"""
for name, path in self.temp_dirs.items():
if os.path.exists(path):
shutil.rmtree(path)
logger.info(f"Removed temporary directory: {path}")
def extract_docx(self):
"""Extract docx files to temporary directories"""
with zipfile.ZipFile(self.base_docx, 'r') as zip_ref:
zip_ref.extractall(self.temp_dirs['base'])
logger.info(f"Extracted base document to {self.temp_dirs['base']}")
with zipfile.ZipFile(self.revision_docx, 'r') as zip_ref:
zip_ref.extractall(self.temp_dirs['revision'])
logger.info(f"Extracted revision document to {self.temp_dirs['revision']}")
# Copy base document to output directory
shutil.copytree(self.temp_dirs['base'], self.temp_dirs['output'], dirs_exist_ok=True)
logger.info(f"Created output document structure in {self.temp_dirs['output']}")
def get_max_id(self, xml_path, xpath, id_attr):
"""Get the maximum ID used in an XML file"""
if not os.path.exists(xml_path):
return 0
tree = etree.parse(xml_path)
root = tree.getroot()
max_id = 0
for item in root.xpath(xpath, namespaces=NAMESPACES):
try:
# Handle attributes with or without namespace prefix
if '}' in id_attr:
item_id = int(item.get(id_attr))
else:
item_id = int(item.get(f"{{{NAMESPACES['w']}}}{id_attr}"))
max_id = max(max_id, item_id)
except (ValueError, TypeError):
pass
return max_id
def initialize_id_maps(self):
"""Initialize ID maps and get maximum IDs from base document"""
# Comments
comments_path = os.path.join(self.temp_dirs['base'], "word", "comments.xml")
if os.path.exists(comments_path):
self.max_ids['comments'] = self.get_max_id(
comments_path,
".//w:comment",
"id"
)
# Bookmarks
document_path = os.path.join(self.temp_dirs['base'], "word", "document.xml")
if os.path.exists(document_path):
self.max_ids['bookmarks'] = self.get_max_id(
document_path,
".//w:bookmarkStart",
"id"
)
# Footnotes
footnotes_path = os.path.join(self.temp_dirs['base'], "word", "footnotes.xml")
if os.path.exists(footnotes_path):
self.max_ids['footnotes'] = self.get_max_id(
footnotes_path,
".//w:footnote",
"id"
)
# Endnotes
endnotes_path = os.path.join(self.temp_dirs['base'], "word", "endnotes.xml")
if os.path.exists(endnotes_path):
self.max_ids['endnotes'] = self.get_max_id(
endnotes_path,
".//w:endnote",
"id"
)
# Relationships
rels_path = os.path.join(self.temp_dirs['base'], "word", "_rels", "document.xml.rels")
if os.path.exists(rels_path):
tree = etree.parse(rels_path)
root = tree.getroot()
for rel in root.xpath("//rel:Relationship", namespaces=NAMESPACES):
rel_id = rel.get("Id")
if rel_id.startswith("rId"):
try:
id_num = int(rel_id[3:])
self.max_ids['rels'] = max(self.max_ids['rels'], id_num)
except ValueError:
pass
logger.info(f"Initialized ID maps with max IDs: {self.max_ids}")
def merge_comments(self):
"""Merge comments from revision document to base document"""
base_comments_path = os.path.join(self.temp_dirs['base'], "word", "comments.xml")
revision_comments_path = os.path.join(self.temp_dirs['revision'], "word", "comments.xml")
output_comments_path = os.path.join(self.temp_dirs['output'], "word", "comments.xml")
# Check if comments files exist
has_base_comments = os.path.exists(base_comments_path)
has_revision_comments = os.path.exists(revision_comments_path)
if not has_revision_comments:
logger.info("No comments found in revision document")
return
# Create a new comments file if needed, or use existing
if has_base_comments:
# Use direct file reading to preserve exact format
with open(base_comments_path, 'rb') as f:
base_content = f.read()
# Create parser that preserves whitespace exactly
parser = etree.XMLParser(remove_blank_text=False, remove_comments=False, resolve_entities=False)
base_comments = etree.fromstring(base_content, parser)
else:
# Create a minimal but correctly formatted comments XML
base_content = (
b'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>\r\n'
b'<w:comments xmlns:w="http://schemas.openxmlformats.org/wordprocessingml/2006/main" '
b'xmlns:w14="http://schemas.microsoft.com/office/word/2010/wordml" '
b'xmlns:w15="http://schemas.microsoft.com/office/word/2012/wordml" '
b'xmlns:r="http://schemas.openxmlformats.org/officeDocument/2006/relationships" '
b'xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006">'
b'</w:comments>'
)
parser = etree.XMLParser(remove_blank_text=False, remove_comments=False, resolve_entities=False)
base_comments = etree.fromstring(base_content, parser)
# Read revision comments with exact preservation of format
with open(revision_comments_path, 'rb') as f:
revision_content = f.read()
parser = etree.XMLParser(remove_blank_text=False, remove_comments=False, resolve_entities=False)
revision_comments = etree.fromstring(revision_content, parser)
# Extract all comments from revision document
comments_to_add = revision_comments.xpath(".//w:comment", namespaces=NAMESPACES)
if not comments_to_add:
logger.info("No comments found in revision document's comments.xml")
return
# Track any added comments for debugging
added_comments = []
# Process each comment from revision
for comment in comments_to_add:
old_id = comment.get(f"{{{NAMESPACES['w']}}}id")
self.max_ids['comments'] += 1
new_id = str(self.max_ids['comments'])
# Create deep copy to avoid modifying original
comment_copy = deepcopy(comment)
# Update the ID
comment_copy.set(f"{{{NAMESPACES['w']}}}id", new_id)
self.id_maps['comments'][old_id] = new_id
# Ensure required attributes exist
required_attrs = ['author', 'date', 'initials']
for attr in required_attrs:
attr_full = f"{{{NAMESPACES['w']}}}{attr}"
if attr_full not in comment_copy.attrib:
if attr == 'author':
comment_copy.set(attr_full, "Unknown")
elif attr == 'date':
now = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
comment_copy.set(attr_full, now)
elif attr == 'initials':
comment_copy.set(attr_full, "??")
# Append to base comments
base_comments.append(comment_copy)
added_comments.append(f"Comment {old_id} -> {new_id}")
# Create output directory
os.makedirs(os.path.dirname(output_comments_path), exist_ok=True)
# Convert back to string with exact formatting Word expects
comments_xml = etree.tostring(
base_comments,
encoding="UTF-8",
xml_declaration=False,
pretty_print=False
)
# Write with explicit XML declaration that Word expects
with open(output_comments_path, 'wb') as f:
f.write(b'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>\r\n')
f.write(comments_xml)
logger.info(f"Merged {len(added_comments)} comments: {', '.join(added_comments)}")
# Make sure the comments relationships file is handled correctly
self.merge_comment_relationships()
def get_paragraph_text(self, paragraph):
"""Extract text content from a paragraph for comparison"""
text = ""
for run in paragraph.xpath(".//w:r/w:t", namespaces=NAMESPACES):
text += run.text or ""
return text
def create_paragraph_map(self, doc_root):
"""Create a map of paragraphs by their textual content"""
para_map = {}
for i, para in enumerate(doc_root.xpath(".//w:p", namespaces=NAMESPACES)):
text = self.get_paragraph_text(para)
# Use hash of text to avoid issues with very long text
text_hash = hash(text)
# Store position and paragraph
if text_hash not in para_map:
para_map[text_hash] = []
para_map[text_hash].append((i, para, text))
return para_map
def find_matching_paragraph(self, para, base_para_map):
"""Find the most similar paragraph in the base document"""
text = self.get_paragraph_text(para)
text_hash = hash(text)
# Direct match by hash
if text_hash in base_para_map and base_para_map[text_hash]:
return base_para_map[text_hash][0] # Return first match
# Try fuzzy matching for similar paragraphs
best_match = None
best_ratio = 0.8 # Threshold for fuzzy matching
for hash_key, para_list in base_para_map.items():
for pos, base_para, base_text in para_list:
ratio = difflib.SequenceMatcher(None, text, base_text).ratio()
if ratio > best_ratio:
best_ratio = ratio
best_match = (pos, base_para, base_text)
return best_match
def update_comment_references(self, element):
"""Update comment references in an element using the ID mapping"""
if not self.id_maps['comments']:
return # No comment mappings to apply
try:
# Handle standard comment references
for comment_ref in element.xpath(".//w:commentReference", namespaces=NAMESPACES):
ref_id = comment_ref.get(f"{{{NAMESPACES['w']}}}id")
if ref_id in self.id_maps['comments']:
comment_ref.set(f"{{{NAMESPACES['w']}}}id", self.id_maps['comments'][ref_id])
# Handle comment range start markers
for comment_range_start in element.xpath(".//w:commentRangeStart", namespaces=NAMESPACES):
ref_id = comment_range_start.get(f"{{{NAMESPACES['w']}}}id")
if ref_id in self.id_maps['comments']:
comment_range_start.set(f"{{{NAMESPACES['w']}}}id", self.id_maps['comments'][ref_id])
# Handle comment range end markers
for comment_range_end in element.xpath(".//w:commentRangeEnd", namespaces=NAMESPACES):
ref_id = comment_range_end.get(f"{{{NAMESPACES['w']}}}id")
if ref_id in self.id_maps['comments']:
comment_range_end.set(f"{{{NAMESPACES['w']}}}id", self.id_maps['comments'][ref_id])
# Handle any other elements with comment IDs
id_attr = f"{{{NAMESPACES['w']}}}id"
for elem in element.xpath(".//*[@w:id]", namespaces=NAMESPACES):
ref_id = elem.get(id_attr)
if ref_id in self.id_maps['comments']:
elem.set(id_attr, self.id_maps['comments'][ref_id])
except Exception as e:
logger.warning(f"Error updating comment references: {e}")
def merge_track_changes(self):
"""Merge track changes from revision document to base document"""
base_doc_path = os.path.join(self.temp_dirs['base'], "word", "document.xml")
revision_doc_path = os.path.join(self.temp_dirs['revision'], "word", "document.xml")
output_doc_path = os.path.join(self.temp_dirs['output'], "word", "document.xml")
try:
# Use direct file reading to preserve exact format
with open(base_doc_path, 'rb') as f:
base_content = f.read()
# Create parser that preserves whitespace exactly
parser = etree.XMLParser(remove_blank_text=False, remove_comments=False, resolve_entities=False)
base_doc = etree.fromstring(base_content, parser)
# Do the same for revision document
with open(revision_doc_path, 'rb') as f:
revision_content = f.read()
revision_doc = etree.fromstring(revision_content, parser)
# Create paragraph maps
base_para_map = self.create_paragraph_map(base_doc)
# Get all paragraphs with track changes
changed_paras = revision_doc.xpath(".//w:p[.//w:ins or .//w:del]", namespaces=NAMESPACES)
logger.info(f"Found {len(changed_paras)} paragraphs with track changes")
# Find body element
base_body = base_doc.find(".//w:body", namespaces=NAMESPACES)
if base_body is None:
logger.error("Could not find body element in base document")
return
# Track unmatched paragraphs
unmatched_paras = []
# Process each paragraph with track changes
for para in changed_paras:
# Create deep copy to avoid modifying original
para_copy = deepcopy(para)
# Update comment references
self.update_comment_references(para_copy)
# Find matching paragraph in base document
match = self.find_matching_paragraph(para_copy, base_para_map)
if match is not None:
# Replace the matching paragraph
pos, base_para, _ = match
parent = base_para.getparent()
if parent is not None:
parent.replace(base_para, para_copy)
logger.info(f"Replaced paragraph at position {pos}")
else:
unmatched_paras.append(para_copy)
else:
unmatched_paras.append(para_copy)
# Handle unmatched paragraphs
if unmatched_paras:
logger.warning(f"Adding {len(unmatched_paras)} unmatched paragraphs to document")
# Try to find section properties
section_props = base_body.find(".//w:sectPr", namespaces=NAMESPACES)
if section_props is not None and section_props.getparent() == base_body:
idx = base_body.index(section_props)
for para in unmatched_paras:
base_body.insert(idx, para)
else:
for para in unmatched_paras:
base_body.append(para)
# Create output directory
os.makedirs(os.path.dirname(output_doc_path), exist_ok=True)
# Convert back to string with exact formatting Word expects
doc_xml = etree.tostring(
base_doc,
encoding="UTF-8",
xml_declaration=False,
pretty_print=False
)
# Write with explicit XML declaration that Word expects
with open(output_doc_path, 'wb') as f:
f.write(b'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>\r\n')
f.write(doc_xml)
logger.info(f"Saved merged document to {output_doc_path}")
except Exception as e:
logger.error(f"Error merging track changes: {e}", exc_info=True)
# Fallback - just copy the base document
shutil.copy(base_doc_path, output_doc_path)
logger.info("Fallback: copied base document as output")
def merge_header_footer(self):
"""Merge track changes in headers and footers"""
# Get list of header and footer files in revision document
revision_word_dir = os.path.join(self.temp_dirs['revision'], "word")
output_word_dir = os.path.join(self.temp_dirs['output'], "word")
# Process headers
for item in os.listdir(revision_word_dir):
if item.startswith("header") or item.startswith("footer"):
revision_path = os.path.join(revision_word_dir, item)
output_path = os.path.join(output_word_dir, item)
# Only process files with track changes
try:
revision_tree = etree.parse(revision_path)
revision_root = revision_tree.getroot()
# Check if there are track changes
has_changes = len(revision_root.xpath(".//*[self::w:ins or self::w:del]", namespaces=NAMESPACES)) > 0
if has_changes:
logger.info(f"Processing track changes in {item}")
# Update comment references
self.update_comment_references(revision_root)
# Save to output
revision_tree.write(output_path, xml_declaration=True, encoding='UTF-8')
except Exception as e:
logger.error(f"Error processing {item}: {e}")
def merge_footnotes_endnotes(self):
"""Merge track changes in footnotes and endnotes"""
for note_type in ['footnotes', 'endnotes']:
base_path = os.path.join(self.temp_dirs['base'], "word", f"{note_type}.xml")
revision_path = os.path.join(self.temp_dirs['revision'], "word", f"{note_type}.xml")
output_path = os.path.join(self.temp_dirs['output'], "word", f"{note_type}.xml")
# Skip if files don't exist
if not os.path.exists(revision_path):
continue
if not os.path.exists(base_path):
# If base doesn't have footnotes/endnotes but revision does, copy the file
shutil.copy(revision_path, output_path)
logger.info(f"Copied {note_type} from revision to output")
continue
# Load XML
base_tree = etree.parse(base_path)
base_root = base_tree.getroot()
revision_tree = etree.parse(revision_path)
revision_root = revision_tree.getroot()
# Get element name (footnote or endnote)
element_name = f"{{{NAMESPACES['w']}}}{note_type[:-1]}" # Remove 's' to get singular
# Find notes with track changes
xpath = f".//{element_name}[.//w:ins or .//w:del]"
changed_notes = revision_root.xpath(xpath, namespaces=NAMESPACES)
if not changed_notes:
continue
logger.info(f"Found {len(changed_notes)} {note_type} with track changes")
# Get maximum ID
max_id = self.max_ids[note_type]
# Process each note with track changes
for note in changed_notes:
old_id = note.get(f"{{{NAMESPACES['w']}}}id")
# Check if this is a special ID (like -1 for separator)
try:
id_int = int(old_id)
if id_int < 0:
# Special ID, keep as is and skip
continue
except ValueError:
# Not a number, skip
continue
# Create new ID
max_id += 1
new_id = str(max_id)
# Update note ID
note.set(f"{{{NAMESPACES['w']}}}id", new_id)
self.id_maps[note_type][old_id] = new_id
# Update comment references
self.update_comment_references(note)
# Append to base
base_root.append(note)
# Save updated file
base_tree.write(output_path, xml_declaration=True, encoding='UTF-8')
logger.info(f"Merged {note_type} with ID mapping: {self.id_maps[note_type]}")
def update_document_relationships(self):
"""Update relationship IDs in the document.xml.rels file"""
base_rels_path = os.path.join(self.temp_dirs['base'], "word", "_rels", "document.xml.rels")
revision_rels_path = os.path.join(self.temp_dirs['revision'], "word", "_rels", "document.xml.rels")
output_rels_path = os.path.join(self.temp_dirs['output'], "word", "_rels", "document.xml.rels")
if not os.path.exists(revision_rels_path):
return
if not os.path.exists(base_rels_path):
# If base doesn't have relationships but revision does, copy the file
os.makedirs(os.path.dirname(output_rels_path), exist_ok=True)
shutil.copy(revision_rels_path, output_rels_path)
logger.info(f"Copied relationship file from revision to output")
return
# Load XML
base_tree = etree.parse(base_rels_path)
base_root = base_tree.getroot()
revision_tree = etree.parse(revision_rels_path)
revision_root = revision_tree.getroot()
# Get existing relationship targets in base
base_targets = {rel.get("Target"): rel for rel in base_root.xpath("//rel:Relationship", namespaces=NAMESPACES)}
# Process relationships in revision
for rel in revision_root.xpath("//rel:Relationship", namespaces=NAMESPACES):
target = rel.get("Target")
rel_type = rel.get("Type")
# Skip if relationship already exists in base
if target in base_targets:
continue
# Create new relationship ID
self.max_ids['rels'] += 1
new_id = f"rId{self.max_ids['rels']}"
# Add relationship to base
rel.set("Id", new_id)
base_root.append(rel)
# Copy related files if needed (like images, embedded objects)
if rel_type.endswith("/image") or rel_type.endswith("/oleObject"):
# Handle media files
source_path = os.path.join(self.temp_dirs['revision'], "word", target)
target_path = os.path.join(self.temp_dirs['output'], "word", target)
if os.path.exists(source_path):
os.makedirs(os.path.dirname(target_path), exist_ok=True)
shutil.copy(source_path, target_path)
logger.info(f"Copied media file from {source_path} to {target_path}")
# Save updated relationships
os.makedirs(os.path.dirname(output_rels_path), exist_ok=True)
base_tree.write(output_rels_path, xml_declaration=True, encoding='UTF-8')
logger.info(f"Updated document relationships")
def merge_comment_relationships(self):
"""Merge relationship files for comments to ensure proper linking"""
base_rels_path = os.path.join(self.temp_dirs['base'], "word", "_rels", "comments.xml.rels")
revision_rels_path = os.path.join(self.temp_dirs['revision'], "word", "_rels", "comments.xml.rels")
output_rels_path = os.path.join(self.temp_dirs['output'], "word", "_rels", "comments.xml.rels")
if not os.path.exists(revision_rels_path):
logger.info("No comment relationships found in revision document")
return
# Create output directory
os.makedirs(os.path.dirname(output_rels_path), exist_ok=True)
if not os.path.exists(base_rels_path):
# Simple case: just copy the relationships file with no changes
# This avoids unnecessary XML parsing/manipulation
shutil.copy(revision_rels_path, output_rels_path)
logger.info("Copied comment relationships file from revision document")
return
# Complex case: need to merge relationship files
try:
# Load base relationship file
with open(base_rels_path, 'rb') as f:
base_rels_content = f.read()
base_rels_tree = etree.fromstring(
base_rels_content,
etree.XMLParser(remove_blank_text=False)
)
# Load revision relationship file
with open(revision_rels_path, 'rb') as f:
revision_rels_content = f.read()
revision_rels_tree = etree.fromstring(
revision_rels_content,
etree.XMLParser(remove_blank_text=False)
)
# Get existing targets to avoid duplicates
base_targets = {}
for rel in base_rels_tree.xpath("//rel:Relationship", namespaces=NAMESPACES):
target = rel.get("Target")
base_targets[target] = rel
# Find max rId in base
max_id = 0
for rel in base_rels_tree.xpath("//rel:Relationship", namespaces=NAMESPACES):
rel_id = rel.get("Id")
if rel_id.startswith("rId"):
try:
id_num = int(rel_id[3:])
max_id = max(max_id, id_num)
except ValueError:
pass
# Process each relationship from revision
for rel in revision_rels_tree.xpath("//rel:Relationship", namespaces=NAMESPACES):
target = rel.get("Target")
rel_type = rel.get("Type")
# Skip if relationship with the same target already exists
if target in base_targets:
continue
# Create new relationship ID
max_id += 1
new_id = f"rId{max_id}"
# Add the relationship
rel_clone = deepcopy(rel)
rel_clone.set("Id", new_id)
base_rels_tree.append(rel_clone)
# Copy related file if needed
if target.startswith("../"):
# External path (like media)
rel_path = target.replace("../", "")
source_path = os.path.join(self.temp_dirs['revision'], "word", rel_path)
target_path = os.path.join(self.temp_dirs['output'], "word", rel_path)
else:
# Internal path
source_path = os.path.join(self.temp_dirs['revision'], "word", target)
target_path = os.path.join(self.temp_dirs['output'], "word", target)
if os.path.exists(source_path):
os.makedirs(os.path.dirname(target_path), exist_ok=True)
shutil.copy(source_path, target_path)
logger.info(f"Copied related file: {source_path} -> {target_path}")
# Format XML properly
rels_xml = etree.tostring(
base_rels_tree,
encoding="UTF-8",
xml_declaration=True,
standalone=True,
pretty_print=False
)
# Ensure proper XML declaration
rels_xml = rels_xml.replace(
b'<?xml version="1.0" encoding="UTF-8"?>',
b'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
)
# Write the output file
with open(output_rels_path, 'wb') as f:
f.write(rels_xml)
logger.info("Merged comment relationships successfully")
except Exception as e:
# Fallback to direct copy if anything goes wrong
logger.error(f"Error merging comment relationships: {e}")
shutil.copy(revision_rels_path, output_rels_path)
logger.info("Copied comment relationships file from revision document as fallback")
def create_output_docx(self):
"""Create the final output docx file"""
# Create a new ZIP file
with zipfile.ZipFile(self.output_docx, 'w', compression=zipfile.ZIP_DEFLATED) as output_zip:
# Walk through the output directory
for root, _, files in os.walk(self.temp_dirs['output']):
for file in files:
file_path = os.path.join(root, file)
# Calculate the archive path (relative to the temp dir)
arcname = os.path.relpath(file_path, self.temp_dirs['output'])
# Add the file to the ZIP
output_zip.write(file_path, arcname)
logger.info(f"Created output document: {self.output_docx}")
def fix_xml_for_word(self, xml_content):
"""Ensure XML content is formatted exactly as Word expects it"""
# Make sure the XML declaration is exactly right
if b'<?xml' in xml_content:
# Replace any XML declaration with Word's preferred format
xml_content = re.sub(
b'<\\?xml[^>]*\\?>',
b'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>',
xml_content
)
else:
# Add declaration if missing
xml_content = b'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>\n' + xml_content
return xml_content
def save_xml_file(self, tree_or_element, output_path):
"""Save XML content with proper Word formatting"""
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# Get the XML content
xml_content = etree.tostring(
tree_or_element,
encoding="UTF-8",
xml_declaration=True,
pretty_print=False
)
# Fix the XML content for Word
xml_content = self.fix_xml_for_word(xml_content)
# Write to file
with open(output_path, 'wb') as f:
f.write(xml_content)
def verify_output_docx(self):
"""Verify that the output document has the expected structure"""
try:
# Check that critical files exist
output_dir = self.temp_dirs['output']
# Required Office Open XML files
required_files = [
"[Content_Types].xml",
"_rels/.rels",
"word/document.xml"
]
for req_file in required_files:
file_path = os.path.join(output_dir, req_file)
if not os.path.exists(file_path):
logger.error(f"Missing required file: {req_file}")
return False
# Check if we have comments
comments_path = os.path.join(output_dir, "word/comments.xml")
if os.path.exists(comments_path):
# Check the comments XML for validity
try:
with open(comments_path, 'rb') as f:
comments_content = f.read()
if b'<w:comments' not in comments_content or not comments_content.startswith(b'<?xml'):
logger.error("Comments XML has invalid structure")
# Fix it by recreating with correct structure
comments_xml = (
b'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>\r\n'
b'<w:comments xmlns:w="http://schemas.openxmlformats.org/wordprocessingml/2006/main" '
b'xmlns:w14="http://schemas.microsoft.com/office/word/2010/wordml" '
b'xmlns:w15="http://schemas.microsoft.com/office/word/2012/wordml" '
b'xmlns:r="http://schemas.openxmlformats.org/officeDocument/2006/relationships">'
)
# Extract comment elements from original
parser = etree.XMLParser(recover=True)
try:
root = etree.fromstring(comments_content, parser)
comments = root.xpath(".//w:comment", namespaces=NAMESPACES)
for comment in comments:
comment_xml = etree.tostring(comment, encoding="UTF-8")
comments_xml += comment_xml
except:
logger.error("Could not extract comments from invalid XML")
# Close the comments tag
comments_xml += b'</w:comments>'
# Write corrected XML
with open(comments_path, 'wb') as f:
f.write(comments_xml)
except Exception as e:
logger.error(f"Error checking comments XML: {e}")
return False
# Check document.xml
document_path = os.path.join(output_dir, "word/document.xml")
try:
with open(document_path, 'rb') as f:
document_content = f.read()
if b'<w:document' not in document_content or not document_content.startswith(b'<?xml'):
logger.error("Document XML has invalid structure")
return False
except Exception as e:
logger.error(f"Error checking document XML: {e}")
return False
return True
except Exception as e:
logger.error(f"Error verifying output document: {e}")
return False
def merge(self):
"""Execute the complete merge process with more robust comment handling"""
try:
logger.info(f"Starting merge process: {self.base_docx} + {self.revision_docx} -> {self.output_docx}")
# Set up temporary directories
self.setup_temp_dirs()
# Extract docx files
self.extract_docx()
# Initialize ID maps - do this first to get existing IDs
self.initialize_id_maps()
# Process track changes first - this is more reliable
self.merge_track_changes()
# Rebuild comments completely instead of merging
self.repair_comments_xml()
# Fix all document references to comments
self.update_document_references()
# Ensure proper content types are registered
self.fix_content_types()
# Process other parts
self.merge_header_footer()
self.merge_footnotes_endnotes()
self.update_document_relationships()
# Create the output docx
self.create_output_docx()
logger.info("Merge completed successfully")
return True
except Exception as e:
logger.error(f"Error during merge: {str(e)}", exc_info=True)
return False
finally:
# Clean up temporary directories
self.cleanup_temp_dirs()
def repair_comments_xml(self):
"""Complete rewrite to fix comment handling"""
# Let's start by simply keeping track of any issues
logger.info("Starting comment repair with new approach")
revision_comments_path = os.path.join(self.temp_dirs['revision'], "word", "comments.xml")
if not os.path.exists(revision_comments_path):
logger.info("No comments file found in revision document")
return
# First, let's COPY the comments.xml file exactly as-is from the revision document
# This preserves all its structure, namespaces, and formatting
output_comments_path = os.path.join(self.temp_dirs['output'], "word", "comments.xml")
# Make directory if it doesn't exist
os.makedirs(os.path.dirname(output_comments_path), exist_ok=True)
# Copy the file directly - this is important to preserve EXACT structure
shutil.copy(revision_comments_path, output_comments_path)
logger.info(f"Copied comments.xml directly from revision document to preserve structure")
# Also copy the comments.xml.rels file if it exists
revision_rels_path = os.path.join(self.temp_dirs['revision'], "word", "_rels", "comments.xml.rels")
if os.path.exists(revision_rels_path):
output_rels_path = os.path.join(self.temp_dirs['output'], "word", "_rels", "comments.xml.rels")
os.makedirs(os.path.dirname(output_rels_path), exist_ok=True)
shutil.copy(revision_rels_path, output_rels_path)
logger.info("Copied comments.xml.rels file")
# Now map the comment IDs (don't modify the comments.xml file!)
try:
# Parse the document to get the comment IDs
tree = etree.parse(output_comments_path)
for comment in tree.xpath("//w:comment", namespaces=NAMESPACES):
comment_id = comment.get(f"{{{NAMESPACES['w']}}}id")
if comment_id:
# Map the ID to itself - we're keeping original IDs!
self.id_maps['comments'][comment_id] = comment_id
logger.info(f"Mapped {len(self.id_maps['comments'])} comment IDs")
except Exception as e:
logger.error(f"Error mapping comment IDs: {e}", exc_info=True)
# Ensure the relationship file in document references comments
self.ensure_comments_relationship()
def ensure_comments_relationship(self):
"""Make sure document has a relationship to comments.xml"""
rels_path = os.path.join(self.temp_dirs['output'], "word", "_rels", "document.xml.rels")
if not os.path.exists(rels_path):
logger.error("Missing document.xml.rels file")
return
try:
# Parse relationships file
tree = etree.parse(rels_path)
root = tree.getroot()
# Check if comments relationship exists
comments_rel_type = "http://schemas.openxmlformats.org/officeDocument/2006/relationships/comments"
has_comments_rel = False
for rel in root.xpath("//rel:Relationship", namespaces=NAMESPACES):
if rel.get("Type") == comments_rel_type:
has_comments_rel = True
break
# Add relationship if it doesn't exist
if not has_comments_rel:
# Find highest rId
max_id = 1
for rel in root.xpath("//rel:Relationship", namespaces=NAMESPACES):
rel_id = rel.get("Id")
if rel_id.startswith("rId"):
try:
id_num = int(rel_id[3:])
max_id = max(max_id, id_num + 1)
except ValueError:
pass
# Create new relationship
new_rel = etree.SubElement(root, f"{{{NAMESPACES['rel']}}}Relationship")
new_rel.set("Id", f"rId{max_id}")
new_rel.set("Type", comments_rel_type)
new_rel.set("Target", "comments.xml")
# Save changes
tree.write(rels_path, xml_declaration=True, encoding="UTF-8")
logger.info("Added comments relationship to document.xml.rels")
except Exception as e:
logger.error(f"Error ensuring comments relationship: {e}", exc_info=True)
def fix_content_types(self):
"""Ensure all required content types are registered"""
content_types_path = os.path.join(self.temp_dirs['output'], "[Content_Types].xml")
if not os.path.exists(content_types_path):
logger.error("Missing [Content_Types].xml file")
return
try:
# Load content types XML
with open(content_types_path, 'rb') as f:
content = f.read()
# Parse with a parser that preserves formatting
parser = etree.XMLParser(remove_blank_text=False)
root = etree.fromstring(content, parser)
# Required content types
required_types = {
"/word/comments.xml": "application/vnd.openxmlformats-officedocument.wordprocessingml.comments+xml"
}
# Get existing overrides
existing_overrides = {}
for override in root.xpath("//*[@PartName]"):
part_name = override.get("PartName")
existing_overrides[part_name] = override
# Check if we need to add comments content type
modified = False
for part_name, content_type in required_types.items():
if part_name not in existing_overrides:
file_path = os.path.join(self.temp_dirs['output'], part_name.lstrip('/'))
if os.path.exists(file_path):
# Add the content type
ns = "{http://schemas.openxmlformats.org/package/2006/content-types}"
override = etree.SubElement(root, f"{ns}Override")
override.set("PartName", part_name)
override.set("ContentType", content_type)
modified = True
logger.info(f"Added content type for {part_name}")
# Save if modified
if modified:
xml_content = etree.tostring(
root,
encoding="UTF-8",
xml_declaration=True
)
# Replace XML declaration with Word's exact format
xml_content = xml_content.replace(
b'<?xml version="1.0" encoding="UTF-8"?>',
b'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
)
with open(content_types_path, 'wb') as f:
f.write(xml_content)
except Exception as e:
logger.error(f"Error fixing content types: {e}", exc_info=True)
def update_document_references(self):
"""Update all document references to comments - now just keeping original IDs"""
# Since we're keeping the original comment IDs, we don't need to update references
# This only becomes relevant if we're moving comments from one document ID space to another
# Just log that we're keeping original references
logger.info("Keeping original comment references (no updates needed)")
# Instead, let's update the document references to include proper comment references
self.ensure_document_comment_references()
def ensure_document_comment_references(self):
"""Make sure document has proper comment reference structure"""
document_path = os.path.join(self.temp_dirs['output'], "word", "document.xml")
if not os.path.exists(document_path):
logger.error("Missing document.xml file")
return
# The approach here is different - we're not trying to change existing references
# Just ensuring the structure is correct for Word to recognize comments
try:
# Check for commentRangeStart/End tags
with open(document_path, 'rb') as f:
content = f.read()
# If the document already has comment references, we're good
if b'commentReference' in content:
logger.info("Document already has comment references")
return
# Otherwise, need to inject comment references
parser = etree.XMLParser(remove_blank_text=False)
root = etree.fromstring(content, parser)
# Get all comment IDs
comment_ids = list(self.id_maps['comments'].keys())
if not comment_ids:
logger.info("No comments to reference")
return
# Find the last paragraph in the body
body = root.find(".//w:body", namespaces=NAMESPACES)
if body is None:
logger.error("Could not find body element")
return
paragraphs = body.findall(".//w:p", namespaces[NAMESPACES])
if not paragraphs:
logger.error("No paragraphs found in document")
return
# Add comment references to last paragraph
last_para = paragraphs[-1]
# Get all runs or create one if needed
runs = last_para.findall(".//w:r", namespaces[NAMESPACES])
if not runs:
# Create a run if none exists
run = etree.SubElement(last_para, f"{{{NAMESPACES['w']}}}r")
else:
run = runs[-1]
# For each comment, add a reference
for comment_id in comment_ids:
ref = etree.SubElement(run, f"{{{NAMESPACES['w']}}}commentReference")
ref.set(f"{{{NAMESPACES['w']}}}id", comment_id)
# Save changes
xml_content = etree.tostring(
root,
encoding="UTF-8",
xml_declaration=True
)
# Replace XML declaration with Word's exact format
xml_content = xml_content.replace(
b'<?xml version="1.0" encoding="UTF-8"?>',
b'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
)
with open(document_path, 'wb') as f:
f.write(xml_content)
logger.info(f"Added {len(comment_ids)} comment references to document")
except Exception as e:
logger.error(f"Error ensuring document comment references: {e}", exc_info=True)
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, base_docx, revision_docx, output_docx)
Purpose: Initialize with paths to the base document, revision document, and output path
Parameters:
base_docx: Parameterrevision_docx: Parameteroutput_docx: Parameter
Returns: None
setup_temp_dirs(self)
Purpose: Set up temporary directories for processing
Returns: None
cleanup_temp_dirs(self)
Purpose: Clean up temporary directories
Returns: None
extract_docx(self)
Purpose: Extract docx files to temporary directories
Returns: None
get_max_id(self, xml_path, xpath, id_attr)
Purpose: Get the maximum ID used in an XML file
Parameters:
xml_path: Parameterxpath: Parameterid_attr: Parameter
Returns: None
initialize_id_maps(self)
Purpose: Initialize ID maps and get maximum IDs from base document
Returns: None
merge_comments(self)
Purpose: Merge comments from revision document to base document
Returns: None
get_paragraph_text(self, paragraph)
Purpose: Extract text content from a paragraph for comparison
Parameters:
paragraph: Parameter
Returns: None
create_paragraph_map(self, doc_root)
Purpose: Create a map of paragraphs by their textual content
Parameters:
doc_root: Parameter
Returns: None
find_matching_paragraph(self, para, base_para_map)
Purpose: Find the most similar paragraph in the base document
Parameters:
para: Parameterbase_para_map: Parameter
Returns: None
update_comment_references(self, element)
Purpose: Update comment references in an element using the ID mapping
Parameters:
element: Parameter
Returns: None
merge_track_changes(self)
Purpose: Merge track changes from revision document to base document
Returns: None
merge_header_footer(self)
Purpose: Merge track changes in headers and footers
Returns: None
merge_footnotes_endnotes(self)
Purpose: Merge track changes in footnotes and endnotes
Returns: None
update_document_relationships(self)
Purpose: Update relationship IDs in the document.xml.rels file
Returns: None
merge_comment_relationships(self)
Purpose: Merge relationship files for comments to ensure proper linking
Returns: None
create_output_docx(self)
Purpose: Create the final output docx file
Returns: None
fix_xml_for_word(self, xml_content)
Purpose: Ensure XML content is formatted exactly as Word expects it
Parameters:
xml_content: Parameter
Returns: None
save_xml_file(self, tree_or_element, output_path)
Purpose: Save XML content with proper Word formatting
Parameters:
tree_or_element: Parameteroutput_path: Parameter
Returns: None
verify_output_docx(self)
Purpose: Verify that the output document has the expected structure
Returns: None
merge(self)
Purpose: Execute the complete merge process with more robust comment handling
Returns: None
repair_comments_xml(self)
Purpose: Complete rewrite to fix comment handling
Returns: None
ensure_comments_relationship(self)
Purpose: Make sure document has a relationship to comments.xml
Returns: None
fix_content_types(self)
Purpose: Ensure all required content types are registered
Returns: None
update_document_references(self)
Purpose: Update all document references to comments - now just keeping original IDs
Returns: None
ensure_document_comment_references(self)
Purpose: Make sure document has proper comment reference structure
Returns: None
Required Imports
import zipfile
import shutil
import os
import tempfile
import re
Usage Example
# Example usage:
# result = DocxMerger(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class DocumentMerger 58.0% similar
-
class EnhancedMeetingMinutesGenerator 52.6% similar
-
class DocumentDetail_v1 49.7% similar
-
class DocumentDetail 48.9% similar
-
class DocumentDetail_v2 48.8% similar