class SharePointFileCloudSync
Orchestrates synchronization of documents from SharePoint to FileCloud, managing the complete sync lifecycle including document retrieval, comparison, upload, and folder structure creation.
/tf/active/vicechatdev/SPFCsync/sync_service.py
10 - 410
complex
Purpose
This class serves as the main coordinator for syncing documents between SharePoint and FileCloud. It initializes and manages client connections to both services, retrieves documents from SharePoint, compares modification dates, downloads content, uploads to FileCloud, and tracks detailed statistics. It supports both single-run and continuous synchronization modes, handles rate limiting and retries, creates empty folder structures, and provides comprehensive logging of all operations.
Source Code
class SharePointFileCloudSync:
"""
Main synchronization class for SharePoint to FileCloud sync.
"""
def __init__(self):
"""Initialize the sync service with configuration."""
# Validate configuration
Config.validate_config()
Config.setup_logging()
self.logger = logging.getLogger(__name__)
# Initialize clients
self.sp_client = SharePointGraphClient(
Config.SHAREPOINT_SITE_URL,
Config.AZURE_CLIENT_ID,
Config.AZURE_CLIENT_SECRET
)
self.fc_client = FileCloudClient(
Config.FILECLOUD_SERVER_URL,
Config.FILECLOUD_USERNAME,
Config.FILECLOUD_PASSWORD
)
self.logger.info("SharePoint to FileCloud sync service initialized")
def sync_documents(self, max_documents: int = None) -> Dict[str, int]:
"""
Perform a full synchronization of documents from SharePoint to FileCloud.
Args:
max_documents: Maximum number of documents to process (for debugging)
Returns:
Dictionary with sync statistics
"""
stats = {
'total_documents': 0,
'total_folders': 0,
'new_uploads': 0,
'updated_files': 0,
'skipped_files': 0,
'skipped_same_date': 0,
'updated_newer_source': 0,
'empty_folders_created': 0,
'errors': 0
}
try:
self.logger.info("Starting SharePoint to FileCloud synchronization")
# Get all documents from SharePoint
# Note: Our Graph client is already connected to the Documents drive,
# so we start from root ("/") rather than using SHAREPOINT_DOCUMENTS_PATH
if max_documents:
print(f"🔍 Limiting to {max_documents} documents for debugging")
sp_documents = []
self.sp_client._get_documents_recursive("/", sp_documents, max_files=max_documents)
else:
sp_documents = self.sp_client.get_all_documents("/")
stats['total_documents'] = len(sp_documents)
self.logger.info(f"Found {len(sp_documents)} documents in SharePoint")
# Create empty folder structure if configured
if Config.CREATE_EMPTY_FOLDERS:
stats['empty_folders_created'] = self._create_empty_folders(sp_documents)
# Process each document
for doc in sp_documents:
try:
result = self._sync_single_document(doc)
# Enhanced statistics tracking
if result == 'skipped_same_date':
stats['skipped_same_date'] += 1
elif result == 'updated_newer_source':
stats['updated_newer_source'] += 1
stats['updated_files'] += 1 # Also count in general updated
else:
stats[result] += 1
except Exception as e:
self.logger.error(f"Error syncing document {doc['name']}: {e}")
stats['errors'] += 1
# Enhanced logging with detailed statistics
self.logger.info(f"Synchronization completed. Detailed stats: {stats}")
if stats['updated_files'] > 0:
self.logger.info(f"Files updated breakdown: Same date skipped: {stats['skipped_same_date']}, Newer source: {stats['updated_newer_source']}")
return stats
except Exception as e:
self.logger.error(f"Error during synchronization: {e}")
stats['errors'] += 1
return stats
def _sync_single_document(self, doc: Dict) -> str:
"""
Sync a single document from SharePoint to FileCloud.
Args:
doc: Document information dictionary from SharePoint
Returns:
String indicating the action taken ('new_uploads', 'updated_files', 'skipped_files')
"""
# Construct FileCloud path
fc_path = self._get_filecloud_path(doc)
# Check if file exists in FileCloud
fc_file_info = self.fc_client.get_file_info(fc_path)
# Parse SharePoint modification date
sp_modified = self._parse_sharepoint_date(doc['modified'])
# Determine if we need to upload
if fc_file_info is None:
# File doesn't exist in FileCloud, upload it
action = 'new_uploads'
self.logger.info(f"New file detected: {doc['name']}")
elif self.fc_client.file_needs_update(sp_modified, fc_file_info):
# File exists but is older, update it
action = 'updated_files'
self.logger.info(f"File update detected: {doc['name']} (SharePoint newer)")
else:
# File is up to date, skip it
self.logger.debug(f"File up to date, skipping: {doc['name']}")
return 'skipped_same_date'
# Download file content from SharePoint
file_content = self._download_document_content(doc)
if file_content is None:
raise Exception(f"Failed to download file content from SharePoint: {doc['name']}")
# Upload to FileCloud
success = self.fc_client.upload_file(file_content, fc_path, sp_modified)
if not success:
raise Exception(f"Failed to upload file to FileCloud: {doc['name']}")
return action
def _create_empty_folders(self, documents: List[Dict]) -> int:
"""
Create empty folder structure in FileCloud based on SharePoint documents.
Args:
documents: List of document information from SharePoint
Returns:
Number of empty folders created
"""
folders_created = 0
unique_folders = set()
# Extract all unique folder paths from documents
for doc in documents:
folder_path = doc.get('folder_path', '/')
if folder_path and folder_path != '/':
# Build the full FileCloud path
fc_folder_path = f"{Config.FILECLOUD_BASE_PATH}/{folder_path}"
fc_folder_path = '/'.join(filter(None, fc_folder_path.split('/')))
if not fc_folder_path.startswith('/'):
fc_folder_path = '/' + fc_folder_path
unique_folders.add(fc_folder_path)
# Create each unique folder
for folder_path in sorted(unique_folders):
try:
if self.fc_client.create_folder(folder_path):
folders_created += 1
self.logger.debug(f"Created empty folder: {folder_path}")
except Exception as e:
self.logger.warning(f"Failed to create folder {folder_path}: {e}")
if folders_created > 0:
self.logger.info(f"Created {folders_created} empty folders in FileCloud")
return folders_created
def _make_download_request_with_retry(self, url, max_retries=3):
"""Make a download request with retry logic for rate limiting"""
headers = {
'Authorization': f'Bearer {self.sp_client.access_token}',
}
for attempt in range(max_retries + 1):
try:
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 200:
return response
elif response.status_code == 429:
# Rate limited - check for Retry-After header
retry_delay = self._get_retry_delay(response, attempt)
self.logger.warning(f"Rate limited on download, waiting {retry_delay}s (attempt {attempt + 1}/{max_retries + 1})")
time.sleep(retry_delay)
continue
elif response.status_code == 401:
# Authentication error - likely expired download URL, don't retry
self.logger.debug(f"Download URL expired for request (status 401) - will fallback to Graph API")
return response
else:
self.logger.warning(f"Download request failed with status {response.status_code}")
return response
except requests.exceptions.RequestException as e:
self.logger.error(f"Request exception during download: {e}")
if attempt < max_retries:
time.sleep(2 ** attempt) # Exponential backoff
continue
else:
return None
return None
def _get_retry_delay(self, response, attempt):
"""Extract retry delay from response headers or use exponential backoff"""
import time
from email.utils import parsedate_to_datetime
retry_after = response.headers.get('Retry-After')
if retry_after:
try:
# Try parsing as seconds
return min(int(retry_after), 60)
except ValueError:
try:
# Try parsing as HTTP date
retry_date = parsedate_to_datetime(retry_after)
delay = (retry_date - datetime.now()).total_seconds()
return max(1, min(delay, 60))
except:
pass
# Fallback to exponential backoff
return min(2 ** attempt, 60)
def _download_document_content(self, doc: Dict) -> Optional[bytes]:
"""
Download document content using the document's download URL.
Args:
doc: Document information dictionary from SharePoint
Returns:
File content as bytes, or None if failed
"""
try:
# Try direct download URL first (these URLs may expire quickly)
download_url = doc.get('download_url')
if download_url and not Config.SKIP_DIRECT_DOWNLOAD:
# Use SharePoint client's retry mechanism for direct downloads
response = self._make_download_request_with_retry(download_url)
if response and response.status_code == 200:
return response.content
elif response and response.status_code == 401:
self.logger.debug(f"Direct download URL expired for {doc['name']} - using fallback method")
else:
self.logger.debug(f"Direct download failed for {doc['name']} - using fallback method")
# Fallback: Use Graph API content endpoint with retry (or primary method if direct downloads are disabled)
headers = {
'Authorization': f'Bearer {self.sp_client.access_token}',
}
graph_id = doc.get('graph_id')
if graph_id:
content_url = f"https://graph.microsoft.com/v1.0/sites/{self.sp_client.site_id}/drives/{self.sp_client.drive_id}/items/{graph_id}/content"
response = self.sp_client._make_request_with_retry('GET', content_url, headers=headers)
if response and response.status_code == 200:
return response.content
else:
self.logger.error(f"Graph API download failed for {doc['name']}")
return None
except Exception as e:
self.logger.error(f"Error downloading file content for {doc['name']}: {e}")
return None
def _get_filecloud_path(self, doc: Dict) -> str:
"""
Generate FileCloud path for a SharePoint document.
Args:
doc: Document information dictionary
Returns:
FileCloud path string
"""
# Use the relative path from SharePoint and combine with FileCloud base path
relative_path = doc.get('relative_path', doc['name'])
# Ensure path starts with base path
if relative_path:
fc_path = f"{Config.FILECLOUD_BASE_PATH}/{relative_path}"
else:
fc_path = f"{Config.FILECLOUD_BASE_PATH}/{doc['name']}"
# Normalize path (remove double slashes, etc.)
fc_path = '/'.join(filter(None, fc_path.split('/')))
if not fc_path.startswith('/'):
fc_path = '/' + fc_path
return fc_path
def _parse_sharepoint_date(self, date_str: str) -> datetime:
"""
Parse SharePoint date string to datetime object.
Args:
date_str: Date string from SharePoint
Returns:
datetime object with UTC timezone
"""
from datetime import timezone
try:
if not date_str:
# Use current time with UTC timezone
return datetime.now(timezone.utc)
# Handle year-only dates like "2024" or "2022"
if date_str.isdigit() and len(date_str) == 4:
# Use January 1st of that year, UTC timezone
return datetime(int(date_str), 1, 1, tzinfo=timezone.utc)
# SharePoint typically returns ISO format dates
if 'T' in date_str:
# Handle different timezone formats
if date_str.endswith('Z'):
date_str = date_str.replace('Z', '+00:00')
elif '+' not in date_str and '-' not in date_str[-6:]:
# No timezone info, assume UTC
date_str = date_str + '+00:00'
parsed_date = datetime.fromisoformat(date_str)
# Ensure we have UTC timezone
if parsed_date.tzinfo is None:
parsed_date = parsed_date.replace(tzinfo=timezone.utc)
return parsed_date
else:
# Fallback parsing for other formats
try:
parsed_date = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
return parsed_date.replace(tzinfo=timezone.utc)
except ValueError:
# Try date only format
return datetime.strptime(date_str, '%Y-%m-%d')
except Exception as e:
self.logger.warning(f"Error parsing date {date_str}: {e}")
return datetime.now()
def run_continuous_sync(self, max_documents: int = None):
"""
Run continuous synchronization at configured intervals.
Args:
max_documents: Optional limit on number of documents to process per cycle
"""
self.logger.info(f"Starting continuous sync with {Config.SYNC_INTERVAL_MINUTES} minute intervals")
if max_documents:
self.logger.info(f"Document limit per cycle: {max_documents}")
while True:
try:
start_time = time.time()
stats = self.sync_documents(max_documents=max_documents)
duration = time.time() - start_time
self.logger.info(f"Sync cycle completed in {duration:.2f} seconds")
# Wait for next sync interval
self.logger.info(f"Waiting {Config.SYNC_INTERVAL_MINUTES} minutes until next sync")
time.sleep(Config.SYNC_INTERVAL_MINUTES * 60)
except KeyboardInterrupt:
self.logger.info("Sync service stopped by user")
break
except Exception as e:
self.logger.error(f"Error in sync cycle: {e}")
# Wait a bit before retrying
time.sleep(60)
def run_single_sync(self, max_documents: int = None) -> Dict[str, int]:
"""
Run a single synchronization cycle.
Args:
max_documents: Maximum number of documents to process (for debugging)
Returns:
Dictionary with sync statistics
"""
return self.sync_documents(max_documents)
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
__init__: No parameters required. The constructor automatically validates configuration from the Config class, sets up logging, and initializes SharePointGraphClient and FileCloudClient instances using credentials and URLs from Config.
Return Value
Instantiation returns a SharePointFileCloudSync object ready to perform synchronization operations. Key method returns: sync_documents() and run_single_sync() return a dictionary with sync statistics including 'total_documents', 'total_folders', 'new_uploads', 'updated_files', 'skipped_files', 'skipped_same_date', 'updated_newer_source', 'empty_folders_created', and 'errors'. run_continuous_sync() does not return (runs indefinitely until interrupted).
Class Interface
Methods
__init__(self)
Purpose: Initialize the sync service by validating configuration, setting up logging, and creating SharePoint and FileCloud client instances
Returns: None - initializes instance with sp_client, fc_client, and logger attributes
sync_documents(self, max_documents: int = None) -> Dict[str, int]
Purpose: Perform a full synchronization of documents from SharePoint to FileCloud with detailed statistics tracking
Parameters:
max_documents: Optional integer to limit the number of documents processed, useful for debugging or testing
Returns: Dictionary with keys: 'total_documents', 'total_folders', 'new_uploads', 'updated_files', 'skipped_files', 'skipped_same_date', 'updated_newer_source', 'empty_folders_created', 'errors' - all integer values
_sync_single_document(self, doc: Dict) -> str
Purpose: Sync a single document from SharePoint to FileCloud by comparing dates, downloading content, and uploading if needed
Parameters:
doc: Dictionary containing document information from SharePoint including 'name', 'modified', 'download_url', 'graph_id', 'folder_path', 'relative_path'
Returns: String indicating action taken: 'new_uploads', 'updated_files', 'skipped_same_date', or 'updated_newer_source'
_create_empty_folders(self, documents: List[Dict]) -> int
Purpose: Create empty folder structure in FileCloud based on folder paths extracted from SharePoint documents
Parameters:
documents: List of document dictionaries from SharePoint, each containing 'folder_path' key
Returns: Integer count of empty folders successfully created in FileCloud
_make_download_request_with_retry(self, url, max_retries=3)
Purpose: Make an HTTP GET request to download a file with automatic retry logic for rate limiting (429) and transient errors
Parameters:
url: String URL to download frommax_retries: Integer maximum number of retry attempts (default 3)
Returns: requests.Response object if successful, None if all retries failed
_get_retry_delay(self, response, attempt)
Purpose: Extract retry delay from HTTP response headers (Retry-After) or calculate exponential backoff delay
Parameters:
response: requests.Response object containing headersattempt: Integer current retry attempt number for exponential backoff calculation
Returns: Integer or float representing seconds to wait before retry, capped at 60 seconds
_download_document_content(self, doc: Dict) -> Optional[bytes]
Purpose: Download document content from SharePoint using direct download URL first, falling back to Graph API content endpoint
Parameters:
doc: Dictionary containing document information including 'download_url', 'graph_id', and 'name'
Returns: Bytes object containing file content if successful, None if download failed
_get_filecloud_path(self, doc: Dict) -> str
Purpose: Generate the full FileCloud path for a SharePoint document by combining base path with relative path
Parameters:
doc: Dictionary containing 'relative_path' or 'name' keys
Returns: String representing normalized FileCloud path starting with '/'
_parse_sharepoint_date(self, date_str: str) -> datetime
Purpose: Parse SharePoint date string into timezone-aware datetime object, handling various formats including ISO, year-only, and date-only
Parameters:
date_str: String date from SharePoint in formats like ISO 8601, 'YYYY', 'YYYY-MM-DD', or 'YYYY-MM-DD HH:MM:SS'
Returns: datetime object with UTC timezone; returns current time if parsing fails
run_continuous_sync(self, max_documents: int = None)
Purpose: Run synchronization continuously at intervals specified by Config.SYNC_INTERVAL_MINUTES until interrupted
Parameters:
max_documents: Optional integer to limit documents processed per cycle
Returns: None - runs indefinitely until KeyboardInterrupt or unrecoverable error
run_single_sync(self, max_documents: int = None) -> Dict[str, int]
Purpose: Run a single synchronization cycle (convenience wrapper for sync_documents)
Parameters:
max_documents: Optional integer to limit the number of documents processed
Returns: Dictionary with sync statistics (same as sync_documents)
Attributes
| Name | Type | Description | Scope |
|---|---|---|---|
logger |
logging.Logger | Logger instance for this class, used to log all sync operations, errors, and statistics | instance |
sp_client |
SharePointGraphClient | Client instance for interacting with SharePoint via Microsoft Graph API, initialized with site URL and Azure credentials | instance |
fc_client |
FileCloudClient | Client instance for interacting with FileCloud API, initialized with server URL and user credentials | instance |
Dependencies
sharepoint_graph_clientfilecloud_clientconfigdatetimeloggingtimerequeststypingemail
Required Imports
from sharepoint_graph_client import SharePointGraphClient
from filecloud_client import FileCloudClient
from config import Config
from datetime import datetime
import logging
import time
import requests
from typing import Dict, List, Optional
Conditional/Optional Imports
These imports are only needed under specific conditions:
from email.utils import parsedate_to_datetime
Condition: used in _get_retry_delay method when parsing HTTP date headers from rate-limited responses
Required (conditional)from datetime import timezone
Condition: used in _parse_sharepoint_date method for timezone-aware datetime objects
Required (conditional)Usage Example
# Single synchronization run
from sharepoint_filecloud_sync import SharePointFileCloudSync
# Initialize the sync service (reads from Config)
sync_service = SharePointFileCloudSync()
# Run a single sync cycle
stats = sync_service.run_single_sync()
print(f"Synced {stats['new_uploads']} new files, updated {stats['updated_files']} files")
# Or run with document limit for testing
stats = sync_service.run_single_sync(max_documents=10)
# Run continuous sync (runs indefinitely)
sync_service.run_continuous_sync()
# Or continuous with document limit
sync_service.run_continuous_sync(max_documents=50)
Best Practices
- Always ensure Config is properly set up with all required credentials and URLs before instantiating the class
- Use max_documents parameter during testing/debugging to limit the scope of synchronization
- Monitor the returned statistics dictionary to track sync performance and identify issues
- The class handles rate limiting automatically with exponential backoff, but be aware of API quotas
- For production use, run_continuous_sync() is preferred as it handles errors gracefully and continues running
- The class creates clients in __init__, so instantiate once and reuse for multiple sync operations
- Empty folder creation is controlled by Config.CREATE_EMPTY_FOLDERS - enable if folder structure preservation is important
- Direct download URLs may expire quickly; the class automatically falls back to Graph API if they fail
- All methods use the logger for detailed operation tracking - configure logging appropriately
- The class is stateless between sync operations - each sync_documents() call is independent
- Handle KeyboardInterrupt to gracefully stop continuous sync mode
- Date comparison uses UTC timezone - ensure SharePoint and FileCloud times are correctly interpreted
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class SharePointClient 74.7% similar
-
class SyncDiagnostics 73.6% similar
-
function main_v10 72.3% similar
-
function main_v17 69.7% similar
-
class Config_v2 68.9% similar