🔍 Code Extractor

class DocxMerger

Maturity: 15

A class named DocxMerger

File:
/tf/active/vicechatdev/word_merge.py
Lines:
41 - 1150
Complexity:
moderate

Purpose

No detailed description available

Source Code

class DocxMerger:
    def __init__(self, base_docx, revision_docx, output_docx):
        """Initialize with paths to the base document, revision document, and output path"""
        self.base_docx = base_docx
        self.revision_docx = revision_docx
        self.output_docx = output_docx
        self.temp_dirs = {}
        self.id_maps = {
            'comments': {},
            'bookmarks': {},
            'rels': {},
            'footnotes': {},
            'endnotes': {}
        }
        
        # Track maximum IDs
        self.max_ids = {
            'comments': 0,
            'bookmarks': 0,
            'rels': 0,
            'footnotes': 0,
            'endnotes': 0
        }
        
        # Register all namespaces for proper XML handling
        for prefix, uri in NAMESPACES.items():
            etree.register_namespace(prefix, uri)
    
    def setup_temp_dirs(self):
        """Set up temporary directories for processing"""
        for name in ['base', 'revision', 'output']:
            self.temp_dirs[name] = tempfile.mkdtemp(prefix=f"docx_merge_{name}_")
            logger.info(f"Created temporary directory: {self.temp_dirs[name]}")
    
    def cleanup_temp_dirs(self):
        """Clean up temporary directories"""
        for name, path in self.temp_dirs.items():
            if os.path.exists(path):
                shutil.rmtree(path)
                logger.info(f"Removed temporary directory: {path}")
    
    def extract_docx(self):
        """Extract docx files to temporary directories"""
        with zipfile.ZipFile(self.base_docx, 'r') as zip_ref:
            zip_ref.extractall(self.temp_dirs['base'])
            logger.info(f"Extracted base document to {self.temp_dirs['base']}")
            
        with zipfile.ZipFile(self.revision_docx, 'r') as zip_ref:
            zip_ref.extractall(self.temp_dirs['revision'])
            logger.info(f"Extracted revision document to {self.temp_dirs['revision']}")
        
        # Copy base document to output directory
        shutil.copytree(self.temp_dirs['base'], self.temp_dirs['output'], dirs_exist_ok=True)
        logger.info(f"Created output document structure in {self.temp_dirs['output']}")
    
    def get_max_id(self, xml_path, xpath, id_attr):
        """Get the maximum ID used in an XML file"""
        if not os.path.exists(xml_path):
            return 0
            
        tree = etree.parse(xml_path)
        root = tree.getroot()
        
        max_id = 0
        for item in root.xpath(xpath, namespaces=NAMESPACES):
            try:
                # Handle attributes with or without namespace prefix
                if '}' in id_attr:
                    item_id = int(item.get(id_attr))
                else:
                    item_id = int(item.get(f"{{{NAMESPACES['w']}}}{id_attr}"))
                max_id = max(max_id, item_id)
            except (ValueError, TypeError):
                pass
        
        return max_id
    
    def initialize_id_maps(self):
        """Initialize ID maps and get maximum IDs from base document"""
        # Comments
        comments_path = os.path.join(self.temp_dirs['base'], "word", "comments.xml")
        if os.path.exists(comments_path):
            self.max_ids['comments'] = self.get_max_id(
                comments_path, 
                ".//w:comment", 
                "id"
            )
            
        # Bookmarks
        document_path = os.path.join(self.temp_dirs['base'], "word", "document.xml")
        if os.path.exists(document_path):
            self.max_ids['bookmarks'] = self.get_max_id(
                document_path, 
                ".//w:bookmarkStart", 
                "id"
            )
        
        # Footnotes
        footnotes_path = os.path.join(self.temp_dirs['base'], "word", "footnotes.xml")
        if os.path.exists(footnotes_path):
            self.max_ids['footnotes'] = self.get_max_id(
                footnotes_path, 
                ".//w:footnote", 
                "id"
            )
            
        # Endnotes
        endnotes_path = os.path.join(self.temp_dirs['base'], "word", "endnotes.xml")
        if os.path.exists(endnotes_path):
            self.max_ids['endnotes'] = self.get_max_id(
                endnotes_path, 
                ".//w:endnote", 
                "id"
            )
        
        # Relationships
        rels_path = os.path.join(self.temp_dirs['base'], "word", "_rels", "document.xml.rels")
        if os.path.exists(rels_path):
            tree = etree.parse(rels_path)
            root = tree.getroot()
            
            for rel in root.xpath("//rel:Relationship", namespaces=NAMESPACES):
                rel_id = rel.get("Id")
                if rel_id.startswith("rId"):
                    try:
                        id_num = int(rel_id[3:])
                        self.max_ids['rels'] = max(self.max_ids['rels'], id_num)
                    except ValueError:
                        pass
        
        logger.info(f"Initialized ID maps with max IDs: {self.max_ids}")
    
    def merge_comments(self):
        """Merge comments from revision document to base document"""
        base_comments_path = os.path.join(self.temp_dirs['base'], "word", "comments.xml")
        revision_comments_path = os.path.join(self.temp_dirs['revision'], "word", "comments.xml")
        output_comments_path = os.path.join(self.temp_dirs['output'], "word", "comments.xml")
        
        # Check if comments files exist
        has_base_comments = os.path.exists(base_comments_path)
        has_revision_comments = os.path.exists(revision_comments_path)
        
        if not has_revision_comments:
            logger.info("No comments found in revision document")
            return
        
        # Create a new comments file if needed, or use existing
        if has_base_comments:
            # Use direct file reading to preserve exact format
            with open(base_comments_path, 'rb') as f:
                base_content = f.read()
            
            # Create parser that preserves whitespace exactly
            parser = etree.XMLParser(remove_blank_text=False, remove_comments=False, resolve_entities=False)
            base_comments = etree.fromstring(base_content, parser)
        else:
            # Create a minimal but correctly formatted comments XML
            base_content = (
                b'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>\r\n'
                b'<w:comments xmlns:w="http://schemas.openxmlformats.org/wordprocessingml/2006/main" '
                b'xmlns:w14="http://schemas.microsoft.com/office/word/2010/wordml" '
                b'xmlns:w15="http://schemas.microsoft.com/office/word/2012/wordml" '
                b'xmlns:r="http://schemas.openxmlformats.org/officeDocument/2006/relationships" '
                b'xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006">'
                b'</w:comments>'
            )
            parser = etree.XMLParser(remove_blank_text=False, remove_comments=False, resolve_entities=False)
            base_comments = etree.fromstring(base_content, parser)
        
        # Read revision comments with exact preservation of format
        with open(revision_comments_path, 'rb') as f:
            revision_content = f.read()
        
        parser = etree.XMLParser(remove_blank_text=False, remove_comments=False, resolve_entities=False)
        revision_comments = etree.fromstring(revision_content, parser)
        
        # Extract all comments from revision document
        comments_to_add = revision_comments.xpath(".//w:comment", namespaces=NAMESPACES)
        if not comments_to_add:
            logger.info("No comments found in revision document's comments.xml")
            return
        
        # Track any added comments for debugging
        added_comments = []
        
        # Process each comment from revision
        for comment in comments_to_add:
            old_id = comment.get(f"{{{NAMESPACES['w']}}}id")
            self.max_ids['comments'] += 1
            new_id = str(self.max_ids['comments'])
            
            # Create deep copy to avoid modifying original
            comment_copy = deepcopy(comment)
            
            # Update the ID
            comment_copy.set(f"{{{NAMESPACES['w']}}}id", new_id)
            self.id_maps['comments'][old_id] = new_id
            
            # Ensure required attributes exist
            required_attrs = ['author', 'date', 'initials']
            for attr in required_attrs:
                attr_full = f"{{{NAMESPACES['w']}}}{attr}"
                if attr_full not in comment_copy.attrib:
                    if attr == 'author':
                        comment_copy.set(attr_full, "Unknown")
                    elif attr == 'date':
                        now = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
                        comment_copy.set(attr_full, now)
                    elif attr == 'initials':
                        comment_copy.set(attr_full, "??")
            
            # Append to base comments
            base_comments.append(comment_copy)
            added_comments.append(f"Comment {old_id} -> {new_id}")
        
        # Create output directory
        os.makedirs(os.path.dirname(output_comments_path), exist_ok=True)
        
        # Convert back to string with exact formatting Word expects
        comments_xml = etree.tostring(
            base_comments,
            encoding="UTF-8",
            xml_declaration=False,
            pretty_print=False
        )
        
        # Write with explicit XML declaration that Word expects
        with open(output_comments_path, 'wb') as f:
            f.write(b'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>\r\n')
            f.write(comments_xml)
        
        logger.info(f"Merged {len(added_comments)} comments: {', '.join(added_comments)}")
        
        # Make sure the comments relationships file is handled correctly
        self.merge_comment_relationships()
    
    def get_paragraph_text(self, paragraph):
        """Extract text content from a paragraph for comparison"""
        text = ""
        for run in paragraph.xpath(".//w:r/w:t", namespaces=NAMESPACES):
            text += run.text or ""
        return text
    
    def create_paragraph_map(self, doc_root):
        """Create a map of paragraphs by their textual content"""
        para_map = {}
        for i, para in enumerate(doc_root.xpath(".//w:p", namespaces=NAMESPACES)):
            text = self.get_paragraph_text(para)
            # Use hash of text to avoid issues with very long text
            text_hash = hash(text)
            # Store position and paragraph
            if text_hash not in para_map:
                para_map[text_hash] = []
            para_map[text_hash].append((i, para, text))
        return para_map
    
    def find_matching_paragraph(self, para, base_para_map):
        """Find the most similar paragraph in the base document"""
        text = self.get_paragraph_text(para)
        text_hash = hash(text)
        
        # Direct match by hash
        if text_hash in base_para_map and base_para_map[text_hash]:
            return base_para_map[text_hash][0]  # Return first match
        
        # Try fuzzy matching for similar paragraphs
        best_match = None
        best_ratio = 0.8  # Threshold for fuzzy matching
        
        for hash_key, para_list in base_para_map.items():
            for pos, base_para, base_text in para_list:
                ratio = difflib.SequenceMatcher(None, text, base_text).ratio()
                if ratio > best_ratio:
                    best_ratio = ratio
                    best_match = (pos, base_para, base_text)
        
        return best_match
    
    def update_comment_references(self, element):
        """Update comment references in an element using the ID mapping"""
        if not self.id_maps['comments']:
            return  # No comment mappings to apply
        
        try:
            # Handle standard comment references
            for comment_ref in element.xpath(".//w:commentReference", namespaces=NAMESPACES):
                ref_id = comment_ref.get(f"{{{NAMESPACES['w']}}}id")
                if ref_id in self.id_maps['comments']:
                    comment_ref.set(f"{{{NAMESPACES['w']}}}id", self.id_maps['comments'][ref_id])
            
            # Handle comment range start markers
            for comment_range_start in element.xpath(".//w:commentRangeStart", namespaces=NAMESPACES):
                ref_id = comment_range_start.get(f"{{{NAMESPACES['w']}}}id")
                if ref_id in self.id_maps['comments']:
                    comment_range_start.set(f"{{{NAMESPACES['w']}}}id", self.id_maps['comments'][ref_id])
            
            # Handle comment range end markers
            for comment_range_end in element.xpath(".//w:commentRangeEnd", namespaces=NAMESPACES):
                ref_id = comment_range_end.get(f"{{{NAMESPACES['w']}}}id")
                if ref_id in self.id_maps['comments']:
                    comment_range_end.set(f"{{{NAMESPACES['w']}}}id", self.id_maps['comments'][ref_id])
            
            # Handle any other elements with comment IDs
            id_attr = f"{{{NAMESPACES['w']}}}id"
            for elem in element.xpath(".//*[@w:id]", namespaces=NAMESPACES):
                ref_id = elem.get(id_attr)
                if ref_id in self.id_maps['comments']:
                    elem.set(id_attr, self.id_maps['comments'][ref_id])
        
        except Exception as e:
            logger.warning(f"Error updating comment references: {e}")
    
    def merge_track_changes(self):
        """Merge track changes from revision document to base document"""
        base_doc_path = os.path.join(self.temp_dirs['base'], "word", "document.xml")
        revision_doc_path = os.path.join(self.temp_dirs['revision'], "word", "document.xml")
        output_doc_path = os.path.join(self.temp_dirs['output'], "word", "document.xml")
        
        try:
            # Use direct file reading to preserve exact format
            with open(base_doc_path, 'rb') as f:
                base_content = f.read()
            
            # Create parser that preserves whitespace exactly
            parser = etree.XMLParser(remove_blank_text=False, remove_comments=False, resolve_entities=False)
            base_doc = etree.fromstring(base_content, parser)
            
            # Do the same for revision document
            with open(revision_doc_path, 'rb') as f:
                revision_content = f.read()
            
            revision_doc = etree.fromstring(revision_content, parser)
            
            # Create paragraph maps
            base_para_map = self.create_paragraph_map(base_doc)
            
            # Get all paragraphs with track changes
            changed_paras = revision_doc.xpath(".//w:p[.//w:ins or .//w:del]", namespaces=NAMESPACES)
            logger.info(f"Found {len(changed_paras)} paragraphs with track changes")
            
            # Find body element
            base_body = base_doc.find(".//w:body", namespaces=NAMESPACES)
            if base_body is None:
                logger.error("Could not find body element in base document")
                return
            
            # Track unmatched paragraphs
            unmatched_paras = []
            
            # Process each paragraph with track changes
            for para in changed_paras:
                # Create deep copy to avoid modifying original
                para_copy = deepcopy(para)
                
                # Update comment references
                self.update_comment_references(para_copy)
                
                # Find matching paragraph in base document
                match = self.find_matching_paragraph(para_copy, base_para_map)
                
                if match is not None:
                    # Replace the matching paragraph
                    pos, base_para, _ = match
                    parent = base_para.getparent()
                    if parent is not None:
                        parent.replace(base_para, para_copy)
                        logger.info(f"Replaced paragraph at position {pos}")
                    else:
                        unmatched_paras.append(para_copy)
                else:
                    unmatched_paras.append(para_copy)
            
            # Handle unmatched paragraphs
            if unmatched_paras:
                logger.warning(f"Adding {len(unmatched_paras)} unmatched paragraphs to document")
                
                # Try to find section properties
                section_props = base_body.find(".//w:sectPr", namespaces=NAMESPACES)
                
                if section_props is not None and section_props.getparent() == base_body:
                    idx = base_body.index(section_props)
                    for para in unmatched_paras:
                        base_body.insert(idx, para)
                else:
                    for para in unmatched_paras:
                        base_body.append(para)
            
            # Create output directory
            os.makedirs(os.path.dirname(output_doc_path), exist_ok=True)
            
            # Convert back to string with exact formatting Word expects
            doc_xml = etree.tostring(
                base_doc,
                encoding="UTF-8",
                xml_declaration=False,
                pretty_print=False
            )
            
            # Write with explicit XML declaration that Word expects
            with open(output_doc_path, 'wb') as f:
                f.write(b'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>\r\n')
                f.write(doc_xml)
            
            logger.info(f"Saved merged document to {output_doc_path}")
            
        except Exception as e:
            logger.error(f"Error merging track changes: {e}", exc_info=True)
            # Fallback - just copy the base document
            shutil.copy(base_doc_path, output_doc_path)
            logger.info("Fallback: copied base document as output")
    
    def merge_header_footer(self):
        """Merge track changes in headers and footers"""
        # Get list of header and footer files in revision document
        revision_word_dir = os.path.join(self.temp_dirs['revision'], "word")
        output_word_dir = os.path.join(self.temp_dirs['output'], "word")
        
        # Process headers
        for item in os.listdir(revision_word_dir):
            if item.startswith("header") or item.startswith("footer"):
                revision_path = os.path.join(revision_word_dir, item)
                output_path = os.path.join(output_word_dir, item)
                
                # Only process files with track changes
                try:
                    revision_tree = etree.parse(revision_path)
                    revision_root = revision_tree.getroot()
                    
                    # Check if there are track changes
                    has_changes = len(revision_root.xpath(".//*[self::w:ins or self::w:del]", namespaces=NAMESPACES)) > 0
                    
                    if has_changes:
                        logger.info(f"Processing track changes in {item}")
                        
                        # Update comment references
                        self.update_comment_references(revision_root)
                        
                        # Save to output
                        revision_tree.write(output_path, xml_declaration=True, encoding='UTF-8')
                except Exception as e:
                    logger.error(f"Error processing {item}: {e}")
    
    def merge_footnotes_endnotes(self):
        """Merge track changes in footnotes and endnotes"""
        for note_type in ['footnotes', 'endnotes']:
            base_path = os.path.join(self.temp_dirs['base'], "word", f"{note_type}.xml")
            revision_path = os.path.join(self.temp_dirs['revision'], "word", f"{note_type}.xml")
            output_path = os.path.join(self.temp_dirs['output'], "word", f"{note_type}.xml")
            
            # Skip if files don't exist
            if not os.path.exists(revision_path):
                continue
                
            if not os.path.exists(base_path):
                # If base doesn't have footnotes/endnotes but revision does, copy the file
                shutil.copy(revision_path, output_path)
                logger.info(f"Copied {note_type} from revision to output")
                continue
            
            # Load XML
            base_tree = etree.parse(base_path)
            base_root = base_tree.getroot()
            
            revision_tree = etree.parse(revision_path)
            revision_root = revision_tree.getroot()
            
            # Get element name (footnote or endnote)
            element_name = f"{{{NAMESPACES['w']}}}{note_type[:-1]}"  # Remove 's' to get singular
            
            # Find notes with track changes
            xpath = f".//{element_name}[.//w:ins or .//w:del]"
            changed_notes = revision_root.xpath(xpath, namespaces=NAMESPACES)
            
            if not changed_notes:
                continue
                
            logger.info(f"Found {len(changed_notes)} {note_type} with track changes")
            
            # Get maximum ID
            max_id = self.max_ids[note_type]
            
            # Process each note with track changes
            for note in changed_notes:
                old_id = note.get(f"{{{NAMESPACES['w']}}}id")
                
                # Check if this is a special ID (like -1 for separator)
                try:
                    id_int = int(old_id)
                    if id_int < 0:
                        # Special ID, keep as is and skip
                        continue
                except ValueError:
                    # Not a number, skip
                    continue
                
                # Create new ID
                max_id += 1
                new_id = str(max_id)
                
                # Update note ID
                note.set(f"{{{NAMESPACES['w']}}}id", new_id)
                self.id_maps[note_type][old_id] = new_id
                
                # Update comment references
                self.update_comment_references(note)
                
                # Append to base
                base_root.append(note)
            
            # Save updated file
            base_tree.write(output_path, xml_declaration=True, encoding='UTF-8')
            logger.info(f"Merged {note_type} with ID mapping: {self.id_maps[note_type]}")
    
    def update_document_relationships(self):
        """Update relationship IDs in the document.xml.rels file"""
        base_rels_path = os.path.join(self.temp_dirs['base'], "word", "_rels", "document.xml.rels")
        revision_rels_path = os.path.join(self.temp_dirs['revision'], "word", "_rels", "document.xml.rels")
        output_rels_path = os.path.join(self.temp_dirs['output'], "word", "_rels", "document.xml.rels")
        
        if not os.path.exists(revision_rels_path):
            return
            
        if not os.path.exists(base_rels_path):
            # If base doesn't have relationships but revision does, copy the file
            os.makedirs(os.path.dirname(output_rels_path), exist_ok=True)
            shutil.copy(revision_rels_path, output_rels_path)
            logger.info(f"Copied relationship file from revision to output")
            return
        
        # Load XML
        base_tree = etree.parse(base_rels_path)
        base_root = base_tree.getroot()
        
        revision_tree = etree.parse(revision_rels_path)
        revision_root = revision_tree.getroot()
        
        # Get existing relationship targets in base
        base_targets = {rel.get("Target"): rel for rel in base_root.xpath("//rel:Relationship", namespaces=NAMESPACES)}
        
        # Process relationships in revision
        for rel in revision_root.xpath("//rel:Relationship", namespaces=NAMESPACES):
            target = rel.get("Target")
            rel_type = rel.get("Type")
            
            # Skip if relationship already exists in base
            if target in base_targets:
                continue
            
            # Create new relationship ID
            self.max_ids['rels'] += 1
            new_id = f"rId{self.max_ids['rels']}"
            
            # Add relationship to base
            rel.set("Id", new_id)
            base_root.append(rel)
            
            # Copy related files if needed (like images, embedded objects)
            if rel_type.endswith("/image") or rel_type.endswith("/oleObject"):
                # Handle media files
                source_path = os.path.join(self.temp_dirs['revision'], "word", target)
                target_path = os.path.join(self.temp_dirs['output'], "word", target)
                
                if os.path.exists(source_path):
                    os.makedirs(os.path.dirname(target_path), exist_ok=True)
                    shutil.copy(source_path, target_path)
                    logger.info(f"Copied media file from {source_path} to {target_path}")
        
        # Save updated relationships
        os.makedirs(os.path.dirname(output_rels_path), exist_ok=True)
        base_tree.write(output_rels_path, xml_declaration=True, encoding='UTF-8')
        logger.info(f"Updated document relationships")
    
    def merge_comment_relationships(self):
        """Merge relationship files for comments to ensure proper linking"""
        base_rels_path = os.path.join(self.temp_dirs['base'], "word", "_rels", "comments.xml.rels")
        revision_rels_path = os.path.join(self.temp_dirs['revision'], "word", "_rels", "comments.xml.rels")
        output_rels_path = os.path.join(self.temp_dirs['output'], "word", "_rels", "comments.xml.rels")
        
        if not os.path.exists(revision_rels_path):
            logger.info("No comment relationships found in revision document")
            return
        
        # Create output directory
        os.makedirs(os.path.dirname(output_rels_path), exist_ok=True)
        
        if not os.path.exists(base_rels_path):
            # Simple case: just copy the relationships file with no changes
            # This avoids unnecessary XML parsing/manipulation
            shutil.copy(revision_rels_path, output_rels_path)
            logger.info("Copied comment relationships file from revision document")
            return
        
        # Complex case: need to merge relationship files
        try:
            # Load base relationship file
            with open(base_rels_path, 'rb') as f:
                base_rels_content = f.read()
            
            base_rels_tree = etree.fromstring(
                base_rels_content, 
                etree.XMLParser(remove_blank_text=False)
            )
            
            # Load revision relationship file
            with open(revision_rels_path, 'rb') as f:
                revision_rels_content = f.read()
            
            revision_rels_tree = etree.fromstring(
                revision_rels_content,
                etree.XMLParser(remove_blank_text=False)
            )
            
            # Get existing targets to avoid duplicates
            base_targets = {}
            for rel in base_rels_tree.xpath("//rel:Relationship", namespaces=NAMESPACES):
                target = rel.get("Target")
                base_targets[target] = rel
            
            # Find max rId in base
            max_id = 0
            for rel in base_rels_tree.xpath("//rel:Relationship", namespaces=NAMESPACES):
                rel_id = rel.get("Id")
                if rel_id.startswith("rId"):
                    try:
                        id_num = int(rel_id[3:])
                        max_id = max(max_id, id_num)
                    except ValueError:
                        pass
            
            # Process each relationship from revision
            for rel in revision_rels_tree.xpath("//rel:Relationship", namespaces=NAMESPACES):
                target = rel.get("Target")
                rel_type = rel.get("Type")
                
                # Skip if relationship with the same target already exists
                if target in base_targets:
                    continue
                
                # Create new relationship ID
                max_id += 1
                new_id = f"rId{max_id}"
                
                # Add the relationship
                rel_clone = deepcopy(rel)
                rel_clone.set("Id", new_id)
                base_rels_tree.append(rel_clone)
                
                # Copy related file if needed
                if target.startswith("../"):
                    # External path (like media)
                    rel_path = target.replace("../", "")
                    source_path = os.path.join(self.temp_dirs['revision'], "word", rel_path)
                    target_path = os.path.join(self.temp_dirs['output'], "word", rel_path)
                else:
                    # Internal path
                    source_path = os.path.join(self.temp_dirs['revision'], "word", target)
                    target_path = os.path.join(self.temp_dirs['output'], "word", target)
                
                if os.path.exists(source_path):
                    os.makedirs(os.path.dirname(target_path), exist_ok=True)
                    shutil.copy(source_path, target_path)
                    logger.info(f"Copied related file: {source_path} -> {target_path}")
            
            # Format XML properly
            rels_xml = etree.tostring(
                base_rels_tree, 
                encoding="UTF-8", 
                xml_declaration=True, 
                standalone=True,
                pretty_print=False
            )
            
            # Ensure proper XML declaration
            rels_xml = rels_xml.replace(
                b'<?xml version="1.0" encoding="UTF-8"?>', 
                b'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
            )
            
            # Write the output file
            with open(output_rels_path, 'wb') as f:
                f.write(rels_xml)
            
            logger.info("Merged comment relationships successfully")
            
        except Exception as e:
            # Fallback to direct copy if anything goes wrong
            logger.error(f"Error merging comment relationships: {e}")
            shutil.copy(revision_rels_path, output_rels_path)
            logger.info("Copied comment relationships file from revision document as fallback")
    
    def create_output_docx(self):
        """Create the final output docx file"""
        # Create a new ZIP file
        with zipfile.ZipFile(self.output_docx, 'w', compression=zipfile.ZIP_DEFLATED) as output_zip:
            # Walk through the output directory
            for root, _, files in os.walk(self.temp_dirs['output']):
                for file in files:
                    file_path = os.path.join(root, file)
                    # Calculate the archive path (relative to the temp dir)
                    arcname = os.path.relpath(file_path, self.temp_dirs['output'])
                    # Add the file to the ZIP
                    output_zip.write(file_path, arcname)
        
        logger.info(f"Created output document: {self.output_docx}")
    
    def fix_xml_for_word(self, xml_content):
        """Ensure XML content is formatted exactly as Word expects it"""
        # Make sure the XML declaration is exactly right
        if b'<?xml' in xml_content:
            # Replace any XML declaration with Word's preferred format
            xml_content = re.sub(
                b'<\\?xml[^>]*\\?>',
                b'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>',
                xml_content
            )
        else:
            # Add declaration if missing
            xml_content = b'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>\n' + xml_content
        
        return xml_content

    def save_xml_file(self, tree_or_element, output_path):
        """Save XML content with proper Word formatting"""
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        
        # Get the XML content
        xml_content = etree.tostring(
            tree_or_element,
            encoding="UTF-8",
            xml_declaration=True,
            pretty_print=False
        )
        
        # Fix the XML content for Word
        xml_content = self.fix_xml_for_word(xml_content)
        
        # Write to file
        with open(output_path, 'wb') as f:
            f.write(xml_content)
    
    def verify_output_docx(self):
        """Verify that the output document has the expected structure"""
        try:
            # Check that critical files exist
            output_dir = self.temp_dirs['output']
            
            # Required Office Open XML files
            required_files = [
                "[Content_Types].xml",
                "_rels/.rels",
                "word/document.xml"
            ]
            
            for req_file in required_files:
                file_path = os.path.join(output_dir, req_file)
                if not os.path.exists(file_path):
                    logger.error(f"Missing required file: {req_file}")
                    return False
            
            # Check if we have comments
            comments_path = os.path.join(output_dir, "word/comments.xml")
            if os.path.exists(comments_path):
                # Check the comments XML for validity
                try:
                    with open(comments_path, 'rb') as f:
                        comments_content = f.read()
                    
                    if b'<w:comments' not in comments_content or not comments_content.startswith(b'<?xml'):
                        logger.error("Comments XML has invalid structure")
                        # Fix it by recreating with correct structure
                        comments_xml = (
                            b'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>\r\n'
                            b'<w:comments xmlns:w="http://schemas.openxmlformats.org/wordprocessingml/2006/main" '
                            b'xmlns:w14="http://schemas.microsoft.com/office/word/2010/wordml" '
                            b'xmlns:w15="http://schemas.microsoft.com/office/word/2012/wordml" '
                            b'xmlns:r="http://schemas.openxmlformats.org/officeDocument/2006/relationships">'
                        )
                        
                        # Extract comment elements from original
                        parser = etree.XMLParser(recover=True)
                        try:
                            root = etree.fromstring(comments_content, parser)
                            comments = root.xpath(".//w:comment", namespaces=NAMESPACES)
                            
                            for comment in comments:
                                comment_xml = etree.tostring(comment, encoding="UTF-8")
                                comments_xml += comment_xml
                        except:
                            logger.error("Could not extract comments from invalid XML")
                        
                        # Close the comments tag
                        comments_xml += b'</w:comments>'
                        
                        # Write corrected XML
                        with open(comments_path, 'wb') as f:
                            f.write(comments_xml)
                except Exception as e:
                    logger.error(f"Error checking comments XML: {e}")
                    return False
            
            # Check document.xml
            document_path = os.path.join(output_dir, "word/document.xml")
            try:
                with open(document_path, 'rb') as f:
                    document_content = f.read()
                
                if b'<w:document' not in document_content or not document_content.startswith(b'<?xml'):
                    logger.error("Document XML has invalid structure")
                    return False
            except Exception as e:
                logger.error(f"Error checking document XML: {e}")
                return False
            
            return True
        
        except Exception as e:
            logger.error(f"Error verifying output document: {e}")
            return False
    
    def merge(self):
        """Execute the complete merge process with more robust comment handling"""
        try:
            logger.info(f"Starting merge process: {self.base_docx} + {self.revision_docx} -> {self.output_docx}")
            
            # Set up temporary directories
            self.setup_temp_dirs()
            
            # Extract docx files
            self.extract_docx()
            
            # Initialize ID maps - do this first to get existing IDs
            self.initialize_id_maps()
            
            # Process track changes first - this is more reliable
            self.merge_track_changes()
            
            # Rebuild comments completely instead of merging
            self.repair_comments_xml()
            
            # Fix all document references to comments
            self.update_document_references()
            
            # Ensure proper content types are registered
            self.fix_content_types()
            
            # Process other parts
            self.merge_header_footer()
            self.merge_footnotes_endnotes()
            self.update_document_relationships()
            
            # Create the output docx
            self.create_output_docx()
            
            logger.info("Merge completed successfully")
            return True
            
        except Exception as e:
            logger.error(f"Error during merge: {str(e)}", exc_info=True)
            return False
            
        finally:
            # Clean up temporary directories
            self.cleanup_temp_dirs()

    def repair_comments_xml(self):
        """Complete rewrite to fix comment handling"""
        
        # Let's start by simply keeping track of any issues
        logger.info("Starting comment repair with new approach")
        
        revision_comments_path = os.path.join(self.temp_dirs['revision'], "word", "comments.xml")
        if not os.path.exists(revision_comments_path):
            logger.info("No comments file found in revision document")
            return
        
        # First, let's COPY the comments.xml file exactly as-is from the revision document
        # This preserves all its structure, namespaces, and formatting
        output_comments_path = os.path.join(self.temp_dirs['output'], "word", "comments.xml")
        
        # Make directory if it doesn't exist
        os.makedirs(os.path.dirname(output_comments_path), exist_ok=True)
        
        # Copy the file directly - this is important to preserve EXACT structure
        shutil.copy(revision_comments_path, output_comments_path)
        logger.info(f"Copied comments.xml directly from revision document to preserve structure")
        
        # Also copy the comments.xml.rels file if it exists
        revision_rels_path = os.path.join(self.temp_dirs['revision'], "word", "_rels", "comments.xml.rels")
        if os.path.exists(revision_rels_path):
            output_rels_path = os.path.join(self.temp_dirs['output'], "word", "_rels", "comments.xml.rels")
            os.makedirs(os.path.dirname(output_rels_path), exist_ok=True)
            shutil.copy(revision_rels_path, output_rels_path)
            logger.info("Copied comments.xml.rels file")
        
        # Now map the comment IDs (don't modify the comments.xml file!)
        try:
            # Parse the document to get the comment IDs
            tree = etree.parse(output_comments_path)
            for comment in tree.xpath("//w:comment", namespaces=NAMESPACES):
                comment_id = comment.get(f"{{{NAMESPACES['w']}}}id")
                if comment_id:
                    # Map the ID to itself - we're keeping original IDs!
                    self.id_maps['comments'][comment_id] = comment_id
                    
            logger.info(f"Mapped {len(self.id_maps['comments'])} comment IDs")
        except Exception as e:
            logger.error(f"Error mapping comment IDs: {e}", exc_info=True)
        
        # Ensure the relationship file in document references comments
        self.ensure_comments_relationship()

    def ensure_comments_relationship(self):
        """Make sure document has a relationship to comments.xml"""
        rels_path = os.path.join(self.temp_dirs['output'], "word", "_rels", "document.xml.rels")
        
        if not os.path.exists(rels_path):
            logger.error("Missing document.xml.rels file")
            return
        
        try:
            # Parse relationships file
            tree = etree.parse(rels_path)
            root = tree.getroot()
            
            # Check if comments relationship exists
            comments_rel_type = "http://schemas.openxmlformats.org/officeDocument/2006/relationships/comments"
            has_comments_rel = False
            
            for rel in root.xpath("//rel:Relationship", namespaces=NAMESPACES):
                if rel.get("Type") == comments_rel_type:
                    has_comments_rel = True
                    break
            
            # Add relationship if it doesn't exist
            if not has_comments_rel:
                # Find highest rId
                max_id = 1
                for rel in root.xpath("//rel:Relationship", namespaces=NAMESPACES):
                    rel_id = rel.get("Id")
                    if rel_id.startswith("rId"):
                        try:
                            id_num = int(rel_id[3:])
                            max_id = max(max_id, id_num + 1)
                        except ValueError:
                            pass
                
                # Create new relationship
                new_rel = etree.SubElement(root, f"{{{NAMESPACES['rel']}}}Relationship")
                new_rel.set("Id", f"rId{max_id}")
                new_rel.set("Type", comments_rel_type)
                new_rel.set("Target", "comments.xml")
                
                # Save changes
                tree.write(rels_path, xml_declaration=True, encoding="UTF-8")
                logger.info("Added comments relationship to document.xml.rels")
        
        except Exception as e:
            logger.error(f"Error ensuring comments relationship: {e}", exc_info=True)

    def fix_content_types(self):
        """Ensure all required content types are registered"""
        content_types_path = os.path.join(self.temp_dirs['output'], "[Content_Types].xml")
        
        if not os.path.exists(content_types_path):
            logger.error("Missing [Content_Types].xml file")
            return
        
        try:
            # Load content types XML
            with open(content_types_path, 'rb') as f:
                content = f.read()
            
            # Parse with a parser that preserves formatting
            parser = etree.XMLParser(remove_blank_text=False)
            root = etree.fromstring(content, parser)
            
            # Required content types
            required_types = {
                "/word/comments.xml": "application/vnd.openxmlformats-officedocument.wordprocessingml.comments+xml"
            }
            
            # Get existing overrides
            existing_overrides = {}
            for override in root.xpath("//*[@PartName]"):
                part_name = override.get("PartName")
                existing_overrides[part_name] = override
            
            # Check if we need to add comments content type
            modified = False
            for part_name, content_type in required_types.items():
                if part_name not in existing_overrides:
                    file_path = os.path.join(self.temp_dirs['output'], part_name.lstrip('/'))
                    if os.path.exists(file_path):
                        # Add the content type
                        ns = "{http://schemas.openxmlformats.org/package/2006/content-types}"
                        override = etree.SubElement(root, f"{ns}Override")
                        override.set("PartName", part_name)
                        override.set("ContentType", content_type)
                        modified = True
                        logger.info(f"Added content type for {part_name}")
            
            # Save if modified
            if modified:
                xml_content = etree.tostring(
                    root,
                    encoding="UTF-8",
                    xml_declaration=True
                )
                
                # Replace XML declaration with Word's exact format
                xml_content = xml_content.replace(
                    b'<?xml version="1.0" encoding="UTF-8"?>',
                    b'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
                )
                
                with open(content_types_path, 'wb') as f:
                    f.write(xml_content)
        
        except Exception as e:
            logger.error(f"Error fixing content types: {e}", exc_info=True)

    def update_document_references(self):
        """Update all document references to comments - now just keeping original IDs"""
        # Since we're keeping the original comment IDs, we don't need to update references
        # This only becomes relevant if we're moving comments from one document ID space to another
        
        # Just log that we're keeping original references
        logger.info("Keeping original comment references (no updates needed)")
        
        # Instead, let's update the document references to include proper comment references
        self.ensure_document_comment_references()

    def ensure_document_comment_references(self):
        """Make sure document has proper comment reference structure"""
        document_path = os.path.join(self.temp_dirs['output'], "word", "document.xml")
        
        if not os.path.exists(document_path):
            logger.error("Missing document.xml file")
            return
        
        # The approach here is different - we're not trying to change existing references
        # Just ensuring the structure is correct for Word to recognize comments
        try:
            # Check for commentRangeStart/End tags
            with open(document_path, 'rb') as f:
                content = f.read()
            
            # If the document already has comment references, we're good
            if b'commentReference' in content:
                logger.info("Document already has comment references")
                return
            
            # Otherwise, need to inject comment references
            parser = etree.XMLParser(remove_blank_text=False)
            root = etree.fromstring(content, parser)
            
            # Get all comment IDs
            comment_ids = list(self.id_maps['comments'].keys())
            
            if not comment_ids:
                logger.info("No comments to reference")
                return
            
            # Find the last paragraph in the body
            body = root.find(".//w:body", namespaces=NAMESPACES)
            if body is None:
                logger.error("Could not find body element")
                return
            
            paragraphs = body.findall(".//w:p", namespaces[NAMESPACES])
            if not paragraphs:
                logger.error("No paragraphs found in document")
                return
            
            # Add comment references to last paragraph
            last_para = paragraphs[-1]
            
            # Get all runs or create one if needed
            runs = last_para.findall(".//w:r", namespaces[NAMESPACES])
            if not runs:
                # Create a run if none exists
                run = etree.SubElement(last_para, f"{{{NAMESPACES['w']}}}r")
            else:
                run = runs[-1]
            
            # For each comment, add a reference
            for comment_id in comment_ids:
                ref = etree.SubElement(run, f"{{{NAMESPACES['w']}}}commentReference")
                ref.set(f"{{{NAMESPACES['w']}}}id", comment_id)
            
            # Save changes
            xml_content = etree.tostring(
                root,
                encoding="UTF-8",
                xml_declaration=True
            )
            
            # Replace XML declaration with Word's exact format
            xml_content = xml_content.replace(
                b'<?xml version="1.0" encoding="UTF-8"?>',
                b'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
            )
            
            with open(document_path, 'wb') as f:
                f.write(xml_content)
            
            logger.info(f"Added {len(comment_ids)} comment references to document")
        
        except Exception as e:
            logger.error(f"Error ensuring document comment references: {e}", exc_info=True)

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, base_docx, revision_docx, output_docx)

Purpose: Initialize with paths to the base document, revision document, and output path

Parameters:

  • base_docx: Parameter
  • revision_docx: Parameter
  • output_docx: Parameter

Returns: None

setup_temp_dirs(self)

Purpose: Set up temporary directories for processing

Returns: None

cleanup_temp_dirs(self)

Purpose: Clean up temporary directories

Returns: None

extract_docx(self)

Purpose: Extract docx files to temporary directories

Returns: None

get_max_id(self, xml_path, xpath, id_attr)

Purpose: Get the maximum ID used in an XML file

Parameters:

  • xml_path: Parameter
  • xpath: Parameter
  • id_attr: Parameter

Returns: None

initialize_id_maps(self)

Purpose: Initialize ID maps and get maximum IDs from base document

Returns: None

merge_comments(self)

Purpose: Merge comments from revision document to base document

Returns: None

get_paragraph_text(self, paragraph)

Purpose: Extract text content from a paragraph for comparison

Parameters:

  • paragraph: Parameter

Returns: None

create_paragraph_map(self, doc_root)

Purpose: Create a map of paragraphs by their textual content

Parameters:

  • doc_root: Parameter

Returns: None

find_matching_paragraph(self, para, base_para_map)

Purpose: Find the most similar paragraph in the base document

Parameters:

  • para: Parameter
  • base_para_map: Parameter

Returns: None

update_comment_references(self, element)

Purpose: Update comment references in an element using the ID mapping

Parameters:

  • element: Parameter

Returns: None

merge_track_changes(self)

Purpose: Merge track changes from revision document to base document

Returns: None

merge_header_footer(self)

Purpose: Merge track changes in headers and footers

Returns: None

merge_footnotes_endnotes(self)

Purpose: Merge track changes in footnotes and endnotes

Returns: None

update_document_relationships(self)

Purpose: Update relationship IDs in the document.xml.rels file

Returns: None

merge_comment_relationships(self)

Purpose: Merge relationship files for comments to ensure proper linking

Returns: None

create_output_docx(self)

Purpose: Create the final output docx file

Returns: None

fix_xml_for_word(self, xml_content)

Purpose: Ensure XML content is formatted exactly as Word expects it

Parameters:

  • xml_content: Parameter

Returns: None

save_xml_file(self, tree_or_element, output_path)

Purpose: Save XML content with proper Word formatting

Parameters:

  • tree_or_element: Parameter
  • output_path: Parameter

Returns: None

verify_output_docx(self)

Purpose: Verify that the output document has the expected structure

Returns: None

merge(self)

Purpose: Execute the complete merge process with more robust comment handling

Returns: None

repair_comments_xml(self)

Purpose: Complete rewrite to fix comment handling

Returns: None

ensure_comments_relationship(self)

Purpose: Make sure document has a relationship to comments.xml

Returns: None

fix_content_types(self)

Purpose: Ensure all required content types are registered

Returns: None

update_document_references(self)

Purpose: Update all document references to comments - now just keeping original IDs

Returns: None

ensure_document_comment_references(self)

Purpose: Make sure document has proper comment reference structure

Returns: None

Required Imports

import zipfile
import shutil
import os
import tempfile
import re

Usage Example

# Example usage:
# result = DocxMerger(bases)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class DocumentMerger 58.0% similar

    A class that merges PDF documents with audit trail pages, combining an original PDF with an audit page and updating metadata to reflect the audit process.

    From: /tf/active/vicechatdev/document_auditor/src/document_merger.py
  • class EnhancedMeetingMinutesGenerator 52.6% similar

    A class named EnhancedMeetingMinutesGenerator

    From: /tf/active/vicechatdev/leexi/enhanced_meeting_minutes_generator.py
  • class DocumentDetail_v1 49.7% similar

    Document detail view component

    From: /tf/active/vicechatdev/document_detail_old.py
  • class DocumentDetail 48.9% similar

    Document detail view component

    From: /tf/active/vicechatdev/document_detail_backup.py
  • class DocumentDetail_v2 48.8% similar

    Document detail view component

    From: /tf/active/vicechatdev/CDocs/ui/document_detail.py
← Back to Browse