从零到自动化手把手教你用PythonRequests库构建Redfish服务器巡检脚本当你面对机房中成排的服务器时是否曾想过如何高效完成日常健康检查传统的人工巡检不仅耗时耗力还容易遗漏关键指标。本文将带你用Python的Requests库构建一个生产级Redfish巡检脚本实现从手动测试到自动化运维的跨越。Redfish作为现代服务器管理的RESTful接口标准提供了对硬件状态的全面监控能力。与IPMI等传统协议相比它的JSON数据格式和HTTP协议让自动化集成变得异常简单。我们将从实战角度出发设计一个可扩展的脚本框架涵盖会话管理、异常处理和报告生成等关键环节。1. 环境准备与基础配置在开始编码前我们需要准备好开发环境和必要的依赖库。建议使用Python 3.8版本它能提供最佳的库兼容性和性能表现。核心依赖库安装pip install requests2.28.1 # 稳定版的HTTP客户端库 pip install rich13.3.1 # 终端美化输出 pip install python-dotenv1.0.0 # 环境变量管理创建项目目录结构/redfish-monitor ├── config/ │ └── .env # 存储敏感信息 ├── libs/ │ ├── auth.py # 认证模块 │ └── utils.py # 工具函数 ├── reports/ # 巡检报告输出目录 └── monitor.py # 主程序入口提示使用dotenv管理凭证信息避免将密码硬编码在脚本中。在.env文件中配置REDFISH_HOST192.168.1.100 REDFISH_USERadmin REDFISH_PASSYourSecurePassword2. Redfish会话管理与认证Redfish API采用基于Token的认证机制我们需要妥善处理会话生命周期。以下代码展示了如何封装认证过程# libs/auth.py import os from dotenv import load_dotenv import requests from requests.exceptions import RequestException load_dotenv() class RedfishSession: def __init__(self): self.base_url fhttps://{os.getenv(REDFISH_HOST)} self.session requests.Session() self.token None def login(self): auth_url f{self.base_url}/redfish/v1/SessionService/Sessions try: response self.session.post( auth_url, json{ UserName: os.getenv(REDFISH_USER), Password: os.getenv(REDFISH_PASS) }, headers{Content-Type: application/json}, verifyFalse # 生产环境应使用有效证书 ) response.raise_for_status() self.token response.headers.get(X-Auth-Token) return True except RequestException as e: print(f认证失败: {str(e)}) return False def get(self, endpoint): if not self.token: raise ValueError(请先调用login()方法获取认证token) return self.session.get( f{self.base_url}{endpoint}, headers{X-Auth-Token: self.token} )关键点说明使用会话对象保持TCP连接复用正确处理HTTPS证书验证演示环境禁用验证集中管理认证Token完善的错误处理机制3. 核心巡检功能实现3.1 资产信息收集服务器硬件信息是巡检的基础数据以下代码展示了如何获取并解析系统信息# monitor.py from libs.auth import RedfishSession from rich.console import Console from rich.table import Table console Console() def get_system_info(session): response session.get(/redfish/v1/Systems/1) data response.json() table Table(title服务器资产信息) table.add_column(属性, stylecyan) table.add_column(值, stylemagenta) for key in [Manufacturer, Model, SerialNumber, PowerState]: table.add_row(key, str(data.get(key, N/A))) console.print(table) return data典型输出示例┏━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┓ ┃ 属性 ┃ 值 ┃ ┡━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━┩ │ Manufacturer │ Dell Inc. │ │ Model │ PowerEdge R740 │ │ SerialNumber │ ABC1234567 │ │ PowerState │ On │ └───────────────┴───────────────────┘3.2 健康状态检查服务器健康状态需要检查多个子系统我们设计一个综合检查函数def check_health_status(session): endpoints { 温度传感器: /redfish/v1/Chassis/1/Thermal, 电源状态: /redfish/v1/Chassis/1/Power, 存储健康: /redfish/v1/Systems/1/Storage, 日志状态: /redfish/v1/Systems/1/LogServices } health_report {} for name, endpoint in endpoints.items(): try: resp session.get(endpoint) health_report[name] { status: 正常 if resp.status_code 200 else 异常, data: resp.json() } except Exception as e: health_report[name] {status: 检查失败, error: str(e)} return health_report注意实际生产环境中应该为每个检查项设置更精细的状态判断逻辑而不仅仅是HTTP状态码。4. 异常处理与重试机制网络请求不可避免地会遇到各种异常我们需要构建健壮的错误处理系统# libs/utils.py import time from functools import wraps def retry(max_attempts3, delay1): def decorator(func): wraps(func) def wrapper(*args, **kwargs): last_error None for attempt in range(max_attempts): try: return func(*args, **kwargs) except Exception as e: last_error e if attempt max_attempts - 1: time.sleep(delay * (attempt 1)) raise last_error return wrapper return decorator应用重试机制的API调用示例retry(max_attempts3, delay2) def safe_api_call(session, endpoint): response session.get(endpoint) response.raise_for_status() return response.json()常见异常处理策略网络超时指数退避重试认证失效自动重新登录速率限制动态调整请求间隔数据校验响应内容格式验证5. 报告生成与输出自动化巡检的价值在于能够生成可操作的报告我们使用Python的json和csv模块实现多格式输出import json import csv from datetime import datetime def generate_report(data, formatconsole): timestamp datetime.now().strftime(%Y%m%d_%H%M%S) filename freports/server_check_{timestamp} if format json: with open(f{filename}.json, w) as f: json.dump(data, f, indent2) elif format csv: with open(f{filename}.csv, w, newline) as f: writer csv.writer(f) writer.writerow([检查项, 状态, 详情]) for item, result in data.items(): writer.writerow([item, result[status], str(result.get(data, ))]) else: console.print([bold]巡检结果摘要:[/bold]) for item, result in data.items(): status_style green if result[status] 正常 else red console.print(f{item}: [{status_style}]{result[status]}[/{status_style}])报告输出选项对比表格式类型适用场景优点缺点控制台即时查看实时反馈彩色高亮不便存档JSON后续程序处理结构完整保留全部信息文件体积较大CSV导入Excel或BI工具表格友好兼容性强嵌套数据需要扁平化6. 完整脚本集成将各模块组合成可执行的主程序# monitor.py def main(): # 初始化会话 session RedfishSession() if not session.login(): console.print([red]认证失败请检查凭证和网络连接[/red]) return # 执行巡检任务 console.print([bold blue]开始服务器巡检...[/bold blue]) system_info get_system_info(session) health_status check_health_status(session) # 生成报告 console.print(\n[bold green]巡检完成[/bold green]) generate_report(health_status) # 清理会话 session.session.close() if __name__ __main__: main()典型执行流程加载环境配置建立Redfish会话收集系统信息检查各子系统状态生成可视化报告安全关闭会话7. 进阶优化方向7.1 性能优化技巧当需要监控大量服务器时可以考虑以下优化手段# 使用连接池提高性能 adapter requests.adapters.HTTPAdapter( pool_connections10, pool_maxsize10, max_retries3 ) session.mount(https://, adapter)并发检查实现方案from concurrent.futures import ThreadPoolExecutor def batch_check(servers): with ThreadPoolExecutor(max_workers5) as executor: futures [] for server in servers: futures.append(executor.submit(check_single_server, server)) results [] for future in as_completed(futures): try: results.append(future.result()) except Exception as e: print(f检查失败: {str(e)}) return results7.2 可观测性增强添加监控指标和日志记录import logging from prometheus_client import Counter, Gauge # 指标定义 REQUESTS_TOTAL Counter(redfish_requests_total, API请求总数, [endpoint]) ERRORS_TOTAL Counter(redfish_errors_total, 错误请求数) RESPONSE_TIME Gauge(redfish_response_seconds, API响应时间) # 日志配置 logging.basicConfig( filenameredfish_monitor.log, levellogging.INFO, format%(asctime)s - %(levelname)s - %(message)s ) def instrumented_request(session, endpoint): start_time time.time() try: response session.get(endpoint) REQUESTS_TOTAL.labels(endpointendpoint).inc() return response except Exception as e: ERRORS_TOTAL.inc() logging.error(f请求{endpoint}失败: {str(e)}) raise finally: RESPONSE_TIME.set(time.time() - start_time)7.3 容器化部署使用Docker封装巡检脚本# Dockerfile FROM python:3.8-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD [python, monitor.py]对应的docker-compose配置version: 3.8 services: redfish-monitor: build: . environment: - REDFISH_HOST${REDFISH_HOST} - REDFISH_USER${REDFISH_USER} - REDFISH_PASS${REDFISH_PASS} volumes: - ./reports:/app/reports restart: unless-stopped
从零到自动化:手把手教你用Python+Requests库构建Redfish服务器巡检脚本
从零到自动化手把手教你用PythonRequests库构建Redfish服务器巡检脚本当你面对机房中成排的服务器时是否曾想过如何高效完成日常健康检查传统的人工巡检不仅耗时耗力还容易遗漏关键指标。本文将带你用Python的Requests库构建一个生产级Redfish巡检脚本实现从手动测试到自动化运维的跨越。Redfish作为现代服务器管理的RESTful接口标准提供了对硬件状态的全面监控能力。与IPMI等传统协议相比它的JSON数据格式和HTTP协议让自动化集成变得异常简单。我们将从实战角度出发设计一个可扩展的脚本框架涵盖会话管理、异常处理和报告生成等关键环节。1. 环境准备与基础配置在开始编码前我们需要准备好开发环境和必要的依赖库。建议使用Python 3.8版本它能提供最佳的库兼容性和性能表现。核心依赖库安装pip install requests2.28.1 # 稳定版的HTTP客户端库 pip install rich13.3.1 # 终端美化输出 pip install python-dotenv1.0.0 # 环境变量管理创建项目目录结构/redfish-monitor ├── config/ │ └── .env # 存储敏感信息 ├── libs/ │ ├── auth.py # 认证模块 │ └── utils.py # 工具函数 ├── reports/ # 巡检报告输出目录 └── monitor.py # 主程序入口提示使用dotenv管理凭证信息避免将密码硬编码在脚本中。在.env文件中配置REDFISH_HOST192.168.1.100 REDFISH_USERadmin REDFISH_PASSYourSecurePassword2. Redfish会话管理与认证Redfish API采用基于Token的认证机制我们需要妥善处理会话生命周期。以下代码展示了如何封装认证过程# libs/auth.py import os from dotenv import load_dotenv import requests from requests.exceptions import RequestException load_dotenv() class RedfishSession: def __init__(self): self.base_url fhttps://{os.getenv(REDFISH_HOST)} self.session requests.Session() self.token None def login(self): auth_url f{self.base_url}/redfish/v1/SessionService/Sessions try: response self.session.post( auth_url, json{ UserName: os.getenv(REDFISH_USER), Password: os.getenv(REDFISH_PASS) }, headers{Content-Type: application/json}, verifyFalse # 生产环境应使用有效证书 ) response.raise_for_status() self.token response.headers.get(X-Auth-Token) return True except RequestException as e: print(f认证失败: {str(e)}) return False def get(self, endpoint): if not self.token: raise ValueError(请先调用login()方法获取认证token) return self.session.get( f{self.base_url}{endpoint}, headers{X-Auth-Token: self.token} )关键点说明使用会话对象保持TCP连接复用正确处理HTTPS证书验证演示环境禁用验证集中管理认证Token完善的错误处理机制3. 核心巡检功能实现3.1 资产信息收集服务器硬件信息是巡检的基础数据以下代码展示了如何获取并解析系统信息# monitor.py from libs.auth import RedfishSession from rich.console import Console from rich.table import Table console Console() def get_system_info(session): response session.get(/redfish/v1/Systems/1) data response.json() table Table(title服务器资产信息) table.add_column(属性, stylecyan) table.add_column(值, stylemagenta) for key in [Manufacturer, Model, SerialNumber, PowerState]: table.add_row(key, str(data.get(key, N/A))) console.print(table) return data典型输出示例┏━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┓ ┃ 属性 ┃ 值 ┃ ┡━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━┩ │ Manufacturer │ Dell Inc. │ │ Model │ PowerEdge R740 │ │ SerialNumber │ ABC1234567 │ │ PowerState │ On │ └───────────────┴───────────────────┘3.2 健康状态检查服务器健康状态需要检查多个子系统我们设计一个综合检查函数def check_health_status(session): endpoints { 温度传感器: /redfish/v1/Chassis/1/Thermal, 电源状态: /redfish/v1/Chassis/1/Power, 存储健康: /redfish/v1/Systems/1/Storage, 日志状态: /redfish/v1/Systems/1/LogServices } health_report {} for name, endpoint in endpoints.items(): try: resp session.get(endpoint) health_report[name] { status: 正常 if resp.status_code 200 else 异常, data: resp.json() } except Exception as e: health_report[name] {status: 检查失败, error: str(e)} return health_report注意实际生产环境中应该为每个检查项设置更精细的状态判断逻辑而不仅仅是HTTP状态码。4. 异常处理与重试机制网络请求不可避免地会遇到各种异常我们需要构建健壮的错误处理系统# libs/utils.py import time from functools import wraps def retry(max_attempts3, delay1): def decorator(func): wraps(func) def wrapper(*args, **kwargs): last_error None for attempt in range(max_attempts): try: return func(*args, **kwargs) except Exception as e: last_error e if attempt max_attempts - 1: time.sleep(delay * (attempt 1)) raise last_error return wrapper return decorator应用重试机制的API调用示例retry(max_attempts3, delay2) def safe_api_call(session, endpoint): response session.get(endpoint) response.raise_for_status() return response.json()常见异常处理策略网络超时指数退避重试认证失效自动重新登录速率限制动态调整请求间隔数据校验响应内容格式验证5. 报告生成与输出自动化巡检的价值在于能够生成可操作的报告我们使用Python的json和csv模块实现多格式输出import json import csv from datetime import datetime def generate_report(data, formatconsole): timestamp datetime.now().strftime(%Y%m%d_%H%M%S) filename freports/server_check_{timestamp} if format json: with open(f{filename}.json, w) as f: json.dump(data, f, indent2) elif format csv: with open(f{filename}.csv, w, newline) as f: writer csv.writer(f) writer.writerow([检查项, 状态, 详情]) for item, result in data.items(): writer.writerow([item, result[status], str(result.get(data, ))]) else: console.print([bold]巡检结果摘要:[/bold]) for item, result in data.items(): status_style green if result[status] 正常 else red console.print(f{item}: [{status_style}]{result[status]}[/{status_style}])报告输出选项对比表格式类型适用场景优点缺点控制台即时查看实时反馈彩色高亮不便存档JSON后续程序处理结构完整保留全部信息文件体积较大CSV导入Excel或BI工具表格友好兼容性强嵌套数据需要扁平化6. 完整脚本集成将各模块组合成可执行的主程序# monitor.py def main(): # 初始化会话 session RedfishSession() if not session.login(): console.print([red]认证失败请检查凭证和网络连接[/red]) return # 执行巡检任务 console.print([bold blue]开始服务器巡检...[/bold blue]) system_info get_system_info(session) health_status check_health_status(session) # 生成报告 console.print(\n[bold green]巡检完成[/bold green]) generate_report(health_status) # 清理会话 session.session.close() if __name__ __main__: main()典型执行流程加载环境配置建立Redfish会话收集系统信息检查各子系统状态生成可视化报告安全关闭会话7. 进阶优化方向7.1 性能优化技巧当需要监控大量服务器时可以考虑以下优化手段# 使用连接池提高性能 adapter requests.adapters.HTTPAdapter( pool_connections10, pool_maxsize10, max_retries3 ) session.mount(https://, adapter)并发检查实现方案from concurrent.futures import ThreadPoolExecutor def batch_check(servers): with ThreadPoolExecutor(max_workers5) as executor: futures [] for server in servers: futures.append(executor.submit(check_single_server, server)) results [] for future in as_completed(futures): try: results.append(future.result()) except Exception as e: print(f检查失败: {str(e)}) return results7.2 可观测性增强添加监控指标和日志记录import logging from prometheus_client import Counter, Gauge # 指标定义 REQUESTS_TOTAL Counter(redfish_requests_total, API请求总数, [endpoint]) ERRORS_TOTAL Counter(redfish_errors_total, 错误请求数) RESPONSE_TIME Gauge(redfish_response_seconds, API响应时间) # 日志配置 logging.basicConfig( filenameredfish_monitor.log, levellogging.INFO, format%(asctime)s - %(levelname)s - %(message)s ) def instrumented_request(session, endpoint): start_time time.time() try: response session.get(endpoint) REQUESTS_TOTAL.labels(endpointendpoint).inc() return response except Exception as e: ERRORS_TOTAL.inc() logging.error(f请求{endpoint}失败: {str(e)}) raise finally: RESPONSE_TIME.set(time.time() - start_time)7.3 容器化部署使用Docker封装巡检脚本# Dockerfile FROM python:3.8-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD [python, monitor.py]对应的docker-compose配置version: 3.8 services: redfish-monitor: build: . environment: - REDFISH_HOST${REDFISH_HOST} - REDFISH_USER${REDFISH_USER} - REDFISH_PASS${REDFISH_PASS} volumes: - ./reports:/app/reports restart: unless-stopped