Python自动化办公新思路用Microsoft Graph API OAuth2批量处理Outlook邮件附完整代码每天早晨打开Outlook面对堆积如山的未读邮件你是否感到力不从心对于需要处理大量邮件的职场人士来说手动分类、归档和回复邮件不仅耗时耗力还容易出错。幸运的是Python与Microsoft Graph API的结合为我们提供了一种全新的自动化解决方案。想象一下当你还在通勤路上时你的Python脚本已经自动完成了以下工作筛选出重要客户的邮件并优先标记将会议邀请自动同步到日历把带有附件的邮件分类存储甚至根据邮件内容自动生成日报。这一切都可以通过Microsoft Graph API实现。1. Microsoft Graph API超越传统邮件协议的新选择在自动化邮件处理领域开发者通常面临三种选择IMAP、POP3和Microsoft Graph API。让我们通过一个对比表格来了解它们的差异特性IMAP/POP3Microsoft Graph API认证方式基础认证OAuth 2.0功能范围基本邮件操作完整Office 365生态集成安全性较低企业级安全数据访问粒度有限精细控制扩展性受限高度可扩展开发复杂度简单中等Microsoft Graph API的最大优势在于它提供了对Office 365生态系统的完整访问权限。通过这个统一的API端点我们不仅可以处理邮件还能访问日历、联系人、OneDrive等资源实现真正的办公自动化。提示对于企业级应用Microsoft Graph API的OAuth 2.0认证机制比传统的用户名/密码认证更安全也更符合现代应用开发规范。2. OAuth 2.0认证安全访问的基石构建自动化邮件处理工具的第一步是建立安全的认证流程。以下是配置OAuth 2.0的关键步骤Azure应用注册登录Azure门户创建新注册设置重定向URI为https://login.microsoftonline.com/common/oauth2/nativeclient记下应用程序(客户端)ID和目录(租户)ID权限配置添加Mail.Read、Mail.ReadWrite等邮件相关权限根据需求可能还需要User.Read等基础权限客户端密钥创建在证书和密码部分创建新客户端密码妥善保存生成的密钥值只显示一次下面是一个获取访问令牌的Python代码示例import requests import json def get_access_token(client_id, client_secret, tenant_id): token_url fhttps://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token headers {Content-Type: application/x-www-form-urlencoded} data { client_id: client_id, scope: https://graph.microsoft.com/.default, client_secret: client_secret, grant_type: client_credentials } response requests.post(token_url, headersheaders, datadata) return response.json().get(access_token) # 使用示例 access_token get_access_token( client_id你的客户端ID, client_secret你的客户端密钥, tenant_id你的租户ID )注意在实际应用中应该实现令牌刷新机制避免频繁重新认证。访问令牌通常有1小时的有限期可以使用刷新令牌获取新的访问令牌。3. 邮件处理实战从基础到高级应用掌握了认证机制后让我们深入探讨几个实用的邮件处理场景。首先我们需要了解Microsoft Graph API中邮件相关的基本端点获取邮件列表/me/messages获取特定邮件/me/messages/{id}搜索邮件/me/messages?$search...发送邮件/me/sendMail3.1 批量获取并分类邮件以下代码展示了如何获取最近50封未读邮件并按发件人域名分类def categorize_emails(access_token): endpoint https://graph.microsoft.com/v1.0/me/messages headers { Authorization: fBearer {access_token}, Accept: application/json } params { $top: 50, $filter: isRead eq false, $select: sender,subject,receivedDateTime } response requests.get(endpoint, headersheaders, paramsparams) emails response.json().get(value, []) categorized {} for email in emails: sender_email email[sender][emailAddress][address] domain sender_email.split()[-1] if domain not in categorized: categorized[domain] [] categorized[domain].append({ subject: email[subject], received: email[receivedDateTime] }) return categorized3.2 智能附件处理对于带有附件的邮件我们可以自动下载并分类存储附件。以下是一个实现示例import os from datetime import datetime def process_attachments(access_token, output_dirattachments): if not os.path.exists(output_dir): os.makedirs(output_dir) endpoint https://graph.microsoft.com/v1.0/me/messages headers {Authorization: fBearer {access_token}} params { $filter: hasAttachments eq true, $select: id,subject,attachments } response requests.get(endpoint, headersheaders, paramsparams) messages response.json().get(value, []) for message in messages: msg_id message[id] subject message[subject] # 获取附件详情 attachments_endpoint fhttps://graph.microsoft.com/v1.0/me/messages/{msg_id}/attachments attachments requests.get(attachments_endpoint, headersheaders).json().get(value, []) for attachment in attachments: if attachment[odata.type] #microsoft.graph.fileAttachment: file_name attachment[name] file_content requests.get( fhttps://graph.microsoft.com/v1.0/me/messages/{msg_id}/attachments/{attachment[id]}/$value, headersheaders ).content # 按日期和主题创建目录 today datetime.now().strftime(%Y-%m-%d) save_dir os.path.join(output_dir, today, subject) os.makedirs(save_dir, exist_okTrue) # 保存附件 with open(os.path.join(save_dir, file_name), wb) as f: f.write(file_content)4. 构建企业级邮件自动化系统将上述功能组合起来我们可以创建一个完整的邮件自动化处理系统。以下是系统可能包含的模块邮件监控模块定期检查新邮件根据规则自动标记重要邮件实时通知紧急邮件智能分类模块基于机器学习自动分类邮件识别客户咨询、内部沟通、系统通知等自动应用合适的标签和文件夹自动响应模块对常见问题自动生成回复处理会议邀请和日程安排发送定期报告和摘要数据分析模块生成邮件处理效率报告分析沟通模式和响应时间识别工作流程中的瓶颈4.1 系统架构设计一个健壮的企业级邮件自动化系统应该考虑以下方面错误处理网络波动、API限制、认证过期等情况日志记录详细记录系统操作便于排查问题性能优化批量处理、异步操作、缓存机制安全考虑敏感数据加密、最小权限原则下面是一个简单的系统状态监控实现import logging from datetime import datetime, timedelta class EmailAutomationSystem: def __init__(self, client_id, client_secret, tenant_id): self.client_id client_id self.client_secret client_secret self.tenant_id tenant_id self.access_token None self.token_expiry None self.logger self._setup_logger() def _setup_logger(self): logger logging.getLogger(EmailAutomation) logger.setLevel(logging.INFO) handler logging.FileHandler(automation.log) formatter logging.Formatter(%(asctime)s - %(levelname)s - %(message)s) handler.setFormatter(formatter) logger.addHandler(handler) return logger def _refresh_token(self): if self.access_token and datetime.now() self.token_expiry: return self.logger.info(Refreshing access token) self.access_token get_access_token( self.client_id, self.client_secret, self.tenant_id ) self.token_expiry datetime.now() timedelta(minutes50) def run(self): try: self._refresh_token() # 执行邮件处理任务 self.process_emails() self.logger.info(Processing completed successfully) except Exception as e: self.logger.error(fError occurred: {str(e)}, exc_infoTrue) def process_emails(self): # 实现具体的邮件处理逻辑 pass5. 高级技巧与最佳实践在长期使用Microsoft Graph API进行邮件自动化的过程中我总结出以下几点经验API调用优化使用$select只请求需要的字段利用$filter减少传输数据量考虑批量操作减少API调用次数错误处理策略实现指数退避重试机制区分临时错误和永久错误监控429状态码请求过多安全增强措施定期轮换客户端密钥使用Azure Key Vault存储敏感信息实施最小权限原则性能监控跟踪API响应时间记录失败请求设置警报阈值对于需要处理大量邮件的场景可以考虑以下优化代码from concurrent.futures import ThreadPoolExecutor import time def batch_process_emails(access_token, message_ids, workers4): def process_single_email(msg_id): endpoint fhttps://graph.microsoft.com/v1.0/me/messages/{msg_id} headers {Authorization: fBearer {access_token}} for attempt in range(3): try: response requests.patch( endpoint, headersheaders, json{isRead: True} ) if response.status_code 429: retry_after int(response.headers.get(Retry-After, 5)) time.sleep(retry_after) continue response.raise_for_status() return True except Exception as e: time.sleep(2 ** attempt) return False with ThreadPoolExecutor(max_workersworkers) as executor: results list(executor.map(process_single_email, message_ids)) return sum(results) / len(results)在实际项目中我发现合理设置workers参数对性能影响很大。通常4-8个 worker 能在API限制和吞吐量之间取得良好平衡。
Python自动化办公新思路:用Microsoft Graph API + OAuth2批量处理Outlook邮件(附完整代码)
Python自动化办公新思路用Microsoft Graph API OAuth2批量处理Outlook邮件附完整代码每天早晨打开Outlook面对堆积如山的未读邮件你是否感到力不从心对于需要处理大量邮件的职场人士来说手动分类、归档和回复邮件不仅耗时耗力还容易出错。幸运的是Python与Microsoft Graph API的结合为我们提供了一种全新的自动化解决方案。想象一下当你还在通勤路上时你的Python脚本已经自动完成了以下工作筛选出重要客户的邮件并优先标记将会议邀请自动同步到日历把带有附件的邮件分类存储甚至根据邮件内容自动生成日报。这一切都可以通过Microsoft Graph API实现。1. Microsoft Graph API超越传统邮件协议的新选择在自动化邮件处理领域开发者通常面临三种选择IMAP、POP3和Microsoft Graph API。让我们通过一个对比表格来了解它们的差异特性IMAP/POP3Microsoft Graph API认证方式基础认证OAuth 2.0功能范围基本邮件操作完整Office 365生态集成安全性较低企业级安全数据访问粒度有限精细控制扩展性受限高度可扩展开发复杂度简单中等Microsoft Graph API的最大优势在于它提供了对Office 365生态系统的完整访问权限。通过这个统一的API端点我们不仅可以处理邮件还能访问日历、联系人、OneDrive等资源实现真正的办公自动化。提示对于企业级应用Microsoft Graph API的OAuth 2.0认证机制比传统的用户名/密码认证更安全也更符合现代应用开发规范。2. OAuth 2.0认证安全访问的基石构建自动化邮件处理工具的第一步是建立安全的认证流程。以下是配置OAuth 2.0的关键步骤Azure应用注册登录Azure门户创建新注册设置重定向URI为https://login.microsoftonline.com/common/oauth2/nativeclient记下应用程序(客户端)ID和目录(租户)ID权限配置添加Mail.Read、Mail.ReadWrite等邮件相关权限根据需求可能还需要User.Read等基础权限客户端密钥创建在证书和密码部分创建新客户端密码妥善保存生成的密钥值只显示一次下面是一个获取访问令牌的Python代码示例import requests import json def get_access_token(client_id, client_secret, tenant_id): token_url fhttps://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token headers {Content-Type: application/x-www-form-urlencoded} data { client_id: client_id, scope: https://graph.microsoft.com/.default, client_secret: client_secret, grant_type: client_credentials } response requests.post(token_url, headersheaders, datadata) return response.json().get(access_token) # 使用示例 access_token get_access_token( client_id你的客户端ID, client_secret你的客户端密钥, tenant_id你的租户ID )注意在实际应用中应该实现令牌刷新机制避免频繁重新认证。访问令牌通常有1小时的有限期可以使用刷新令牌获取新的访问令牌。3. 邮件处理实战从基础到高级应用掌握了认证机制后让我们深入探讨几个实用的邮件处理场景。首先我们需要了解Microsoft Graph API中邮件相关的基本端点获取邮件列表/me/messages获取特定邮件/me/messages/{id}搜索邮件/me/messages?$search...发送邮件/me/sendMail3.1 批量获取并分类邮件以下代码展示了如何获取最近50封未读邮件并按发件人域名分类def categorize_emails(access_token): endpoint https://graph.microsoft.com/v1.0/me/messages headers { Authorization: fBearer {access_token}, Accept: application/json } params { $top: 50, $filter: isRead eq false, $select: sender,subject,receivedDateTime } response requests.get(endpoint, headersheaders, paramsparams) emails response.json().get(value, []) categorized {} for email in emails: sender_email email[sender][emailAddress][address] domain sender_email.split()[-1] if domain not in categorized: categorized[domain] [] categorized[domain].append({ subject: email[subject], received: email[receivedDateTime] }) return categorized3.2 智能附件处理对于带有附件的邮件我们可以自动下载并分类存储附件。以下是一个实现示例import os from datetime import datetime def process_attachments(access_token, output_dirattachments): if not os.path.exists(output_dir): os.makedirs(output_dir) endpoint https://graph.microsoft.com/v1.0/me/messages headers {Authorization: fBearer {access_token}} params { $filter: hasAttachments eq true, $select: id,subject,attachments } response requests.get(endpoint, headersheaders, paramsparams) messages response.json().get(value, []) for message in messages: msg_id message[id] subject message[subject] # 获取附件详情 attachments_endpoint fhttps://graph.microsoft.com/v1.0/me/messages/{msg_id}/attachments attachments requests.get(attachments_endpoint, headersheaders).json().get(value, []) for attachment in attachments: if attachment[odata.type] #microsoft.graph.fileAttachment: file_name attachment[name] file_content requests.get( fhttps://graph.microsoft.com/v1.0/me/messages/{msg_id}/attachments/{attachment[id]}/$value, headersheaders ).content # 按日期和主题创建目录 today datetime.now().strftime(%Y-%m-%d) save_dir os.path.join(output_dir, today, subject) os.makedirs(save_dir, exist_okTrue) # 保存附件 with open(os.path.join(save_dir, file_name), wb) as f: f.write(file_content)4. 构建企业级邮件自动化系统将上述功能组合起来我们可以创建一个完整的邮件自动化处理系统。以下是系统可能包含的模块邮件监控模块定期检查新邮件根据规则自动标记重要邮件实时通知紧急邮件智能分类模块基于机器学习自动分类邮件识别客户咨询、内部沟通、系统通知等自动应用合适的标签和文件夹自动响应模块对常见问题自动生成回复处理会议邀请和日程安排发送定期报告和摘要数据分析模块生成邮件处理效率报告分析沟通模式和响应时间识别工作流程中的瓶颈4.1 系统架构设计一个健壮的企业级邮件自动化系统应该考虑以下方面错误处理网络波动、API限制、认证过期等情况日志记录详细记录系统操作便于排查问题性能优化批量处理、异步操作、缓存机制安全考虑敏感数据加密、最小权限原则下面是一个简单的系统状态监控实现import logging from datetime import datetime, timedelta class EmailAutomationSystem: def __init__(self, client_id, client_secret, tenant_id): self.client_id client_id self.client_secret client_secret self.tenant_id tenant_id self.access_token None self.token_expiry None self.logger self._setup_logger() def _setup_logger(self): logger logging.getLogger(EmailAutomation) logger.setLevel(logging.INFO) handler logging.FileHandler(automation.log) formatter logging.Formatter(%(asctime)s - %(levelname)s - %(message)s) handler.setFormatter(formatter) logger.addHandler(handler) return logger def _refresh_token(self): if self.access_token and datetime.now() self.token_expiry: return self.logger.info(Refreshing access token) self.access_token get_access_token( self.client_id, self.client_secret, self.tenant_id ) self.token_expiry datetime.now() timedelta(minutes50) def run(self): try: self._refresh_token() # 执行邮件处理任务 self.process_emails() self.logger.info(Processing completed successfully) except Exception as e: self.logger.error(fError occurred: {str(e)}, exc_infoTrue) def process_emails(self): # 实现具体的邮件处理逻辑 pass5. 高级技巧与最佳实践在长期使用Microsoft Graph API进行邮件自动化的过程中我总结出以下几点经验API调用优化使用$select只请求需要的字段利用$filter减少传输数据量考虑批量操作减少API调用次数错误处理策略实现指数退避重试机制区分临时错误和永久错误监控429状态码请求过多安全增强措施定期轮换客户端密钥使用Azure Key Vault存储敏感信息实施最小权限原则性能监控跟踪API响应时间记录失败请求设置警报阈值对于需要处理大量邮件的场景可以考虑以下优化代码from concurrent.futures import ThreadPoolExecutor import time def batch_process_emails(access_token, message_ids, workers4): def process_single_email(msg_id): endpoint fhttps://graph.microsoft.com/v1.0/me/messages/{msg_id} headers {Authorization: fBearer {access_token}} for attempt in range(3): try: response requests.patch( endpoint, headersheaders, json{isRead: True} ) if response.status_code 429: retry_after int(response.headers.get(Retry-After, 5)) time.sleep(retry_after) continue response.raise_for_status() return True except Exception as e: time.sleep(2 ** attempt) return False with ThreadPoolExecutor(max_workersworkers) as executor: results list(executor.map(process_single_email, message_ids)) return sum(results) / len(results)在实际项目中我发现合理设置workers参数对性能影响很大。通常4-8个 worker 能在API限制和吞吐量之间取得良好平衡。