Python实战3分钟极简调用FOFA API实现资产自动化搜索在网络安全和资产测绘领域FOFA作为全球领先的搜索引擎其API接口的调用能力已经成为安全工程师的必备技能。本文将带你用最精简的Python代码快速实现从认证到结果保存的完整流程无需复杂框架只需3分钟即可完成核心功能集成。1. 环境准备与基础配置在开始编写代码前我们需要确保开发环境已经就绪。Python 3.6版本是必须的因为我们会使用到f-string等现代语法特性。打开终端用以下命令检查Python版本python --version接下来安装唯一必需的第三方库requests。虽然标准库中的urllib也能完成HTTP请求但requests的API设计更加人性化pip install requests获取FOFA API凭证是第一步。登录FOFA平台后在个人中心可以找到两个关键信息Email注册时使用的邮箱地址API Key系统生成的唯一身份验证密钥提示API Key具有账户全权限请妥善保管避免泄露。建议在代码中使用环境变量而非硬编码方式存储。2. 核心代码实现解析让我们直接切入最精简的实现方案。以下代码完整实现了从搜索到结果保存的全流程包含详细注释说明每个关键步骤import requests import base64 import sys # 配置区 - 替换为你的实际凭证 EMAIL your_emailexample.com API_KEY your_api_key_here API_URL https://fofa.info/api/v1/search/all def fofa_search(query, size100): 执行FOFA搜索并返回结果列表 # Base64编码查询语句 query_b64 base64.b64encode(query.encode()).decode() # 构造请求参数 params { email: EMAIL, key: API_KEY, qbase64: query_b64, size: size # 控制返回结果数量 } # 发送GET请求并处理响应 response requests.get(API_URL, paramsparams) if response.status_code 200: return response.json().get(results, []) else: print(f请求失败状态码{response.status_code}) return [] if __name__ __main__: # 从命令行参数获取搜索语句 if len(sys.argv) 2: print(Usage: python fofa_search.py 查询语句) sys.exit(1) search_query sys.argv[1] results fofa_search(search_query) # 结果处理与保存 output_file f{search_query.replace( , _)}_results.txt with open(output_file, w) as f: for item in results: f.write(f{item[0]}\n) # 假设每项第一个元素是IP/域名 print(f搜索完成共找到{len(results)}条结果已保存至{output_file})这段代码的核心优势在于参数化设计通过命令行参数接收查询语句提高复用性错误处理基础的状态码检查确保程序健壮性结果持久化自动生成与查询相关的文件名保存结果3. 高级功能扩展基础功能实现后我们可以进一步优化代码的实用性和可靠性。以下是三个值得添加的增强功能3.1 结果分页处理FOFA API默认返回100条结果虽然可以通过size参数调整但超过10000条时需要分页处理。以下是实现分页的代码片段def get_all_results(query, max_results10000): 获取所有分页结果 all_results [] page 1 while True: query_b64 base64.b64encode(f{query} page{page}.encode()).decode() params { email: EMAIL, key: API_KEY, qbase64: query_b64, size: 1000, page: page } response requests.get(API_URL, paramsparams) if response.status_code ! 200: break data response.json() if not data[results]: break all_results.extend(data[results]) if len(all_results) max_results: break page 1 return all_results[:max_results]3.2 结果格式多样化输出除了文本文件我们还可以支持JSON和CSV格式输出方便后续分析import csv import json def save_results(results, filename, formattxt): 按指定格式保存结果 if format json: with open(f{filename}.json, w) as f: json.dump(results, f, indent2) elif format csv: with open(f{filename}.csv, w, newline) as f: writer csv.writer(f) writer.writerow([IP, Port, Protocol]) # 示例表头 for item in results: writer.writerow(item[:3]) # 假设前三个字段是IP、端口、协议 else: # 默认文本格式 with open(f{filename}.txt, w) as f: for item in results: f.write(f{ | .join(map(str, item))}\n)3.3 环境变量配置管理安全最佳实践是避免在代码中硬编码敏感信息。使用python-dotenv管理凭证pip install python-dotenv创建.env文件FOFA_EMAILyour_emailexample.com FOFA_API_KEYyour_api_key_here修改代码配置部分from dotenv import load_dotenv import os load_dotenv() EMAIL os.getenv(FOFA_EMAIL) API_KEY os.getenv(FOFA_API_KEY)4. 实战应用场景示例掌握了基础API调用后让我们看几个实际应用场景中的代码示例4.1 批量查询与聚合以下脚本可以批量查询多个目标并聚合结果queries [ titleApache Tomcat, servernginx, port3306 ] all_results {} for query in queries: results fofa_search(query) all_results[query] results print(f查询 {query} 找到 {len(results)} 条结果) # 保存聚合结果 with open(aggregated_results.json, w) as f: json.dump(all_results, f, indent2)4.2 定时监控与告警结合schedule库实现定时监控新增资产import schedule import time def monitor_new_assets(): baseline set() query domainexample.com def check(): nonlocal baseline current {item[0] for item in fofa_search(query)} new_assets current - baseline if new_assets: print(f发现 {len(new_assets)} 个新增资产:) for asset in new_assets: print(asset) baseline current # 首次运行建立基线 baseline {item[0] for item in fofa_search(query)} print(f初始基线包含 {len(baseline)} 个资产) # 每30分钟检查一次 schedule.every(30).minutes.do(check) while True: schedule.run_pending() time.sleep(1) # 注意实际生产环境应使用更可靠的调度系统如Celery或APScheduler4.3 可视化分析使用pandas和matplotlib对结果进行简单分析import pandas as pd import matplotlib.pyplot as plt def analyze_ports(results): 分析端口分布情况 df pd.DataFrame(results, columns[ip, port, protocol, *[fattr_{i} for i in range(len(results[0])-3)]]) df[port] df[port].astype(int) # 统计前10个最常见端口 top_ports df[port].value_counts().head(10) # 绘制柱状图 plt.figure(figsize(10, 6)) top_ports.plot(kindbar) plt.title(Top 10 Ports Distribution) plt.xlabel(Port Number) plt.ylabel(Count) plt.savefig(port_distribution.png) plt.close() return top_ports # 使用示例 results fofa_search(ip192.168.1.0/24) port_stats analyze_ports(results) print(port_stats)
Python实战:3分钟搞定FOFA API资产搜索(附完整代码)
Python实战3分钟极简调用FOFA API实现资产自动化搜索在网络安全和资产测绘领域FOFA作为全球领先的搜索引擎其API接口的调用能力已经成为安全工程师的必备技能。本文将带你用最精简的Python代码快速实现从认证到结果保存的完整流程无需复杂框架只需3分钟即可完成核心功能集成。1. 环境准备与基础配置在开始编写代码前我们需要确保开发环境已经就绪。Python 3.6版本是必须的因为我们会使用到f-string等现代语法特性。打开终端用以下命令检查Python版本python --version接下来安装唯一必需的第三方库requests。虽然标准库中的urllib也能完成HTTP请求但requests的API设计更加人性化pip install requests获取FOFA API凭证是第一步。登录FOFA平台后在个人中心可以找到两个关键信息Email注册时使用的邮箱地址API Key系统生成的唯一身份验证密钥提示API Key具有账户全权限请妥善保管避免泄露。建议在代码中使用环境变量而非硬编码方式存储。2. 核心代码实现解析让我们直接切入最精简的实现方案。以下代码完整实现了从搜索到结果保存的全流程包含详细注释说明每个关键步骤import requests import base64 import sys # 配置区 - 替换为你的实际凭证 EMAIL your_emailexample.com API_KEY your_api_key_here API_URL https://fofa.info/api/v1/search/all def fofa_search(query, size100): 执行FOFA搜索并返回结果列表 # Base64编码查询语句 query_b64 base64.b64encode(query.encode()).decode() # 构造请求参数 params { email: EMAIL, key: API_KEY, qbase64: query_b64, size: size # 控制返回结果数量 } # 发送GET请求并处理响应 response requests.get(API_URL, paramsparams) if response.status_code 200: return response.json().get(results, []) else: print(f请求失败状态码{response.status_code}) return [] if __name__ __main__: # 从命令行参数获取搜索语句 if len(sys.argv) 2: print(Usage: python fofa_search.py 查询语句) sys.exit(1) search_query sys.argv[1] results fofa_search(search_query) # 结果处理与保存 output_file f{search_query.replace( , _)}_results.txt with open(output_file, w) as f: for item in results: f.write(f{item[0]}\n) # 假设每项第一个元素是IP/域名 print(f搜索完成共找到{len(results)}条结果已保存至{output_file})这段代码的核心优势在于参数化设计通过命令行参数接收查询语句提高复用性错误处理基础的状态码检查确保程序健壮性结果持久化自动生成与查询相关的文件名保存结果3. 高级功能扩展基础功能实现后我们可以进一步优化代码的实用性和可靠性。以下是三个值得添加的增强功能3.1 结果分页处理FOFA API默认返回100条结果虽然可以通过size参数调整但超过10000条时需要分页处理。以下是实现分页的代码片段def get_all_results(query, max_results10000): 获取所有分页结果 all_results [] page 1 while True: query_b64 base64.b64encode(f{query} page{page}.encode()).decode() params { email: EMAIL, key: API_KEY, qbase64: query_b64, size: 1000, page: page } response requests.get(API_URL, paramsparams) if response.status_code ! 200: break data response.json() if not data[results]: break all_results.extend(data[results]) if len(all_results) max_results: break page 1 return all_results[:max_results]3.2 结果格式多样化输出除了文本文件我们还可以支持JSON和CSV格式输出方便后续分析import csv import json def save_results(results, filename, formattxt): 按指定格式保存结果 if format json: with open(f{filename}.json, w) as f: json.dump(results, f, indent2) elif format csv: with open(f{filename}.csv, w, newline) as f: writer csv.writer(f) writer.writerow([IP, Port, Protocol]) # 示例表头 for item in results: writer.writerow(item[:3]) # 假设前三个字段是IP、端口、协议 else: # 默认文本格式 with open(f{filename}.txt, w) as f: for item in results: f.write(f{ | .join(map(str, item))}\n)3.3 环境变量配置管理安全最佳实践是避免在代码中硬编码敏感信息。使用python-dotenv管理凭证pip install python-dotenv创建.env文件FOFA_EMAILyour_emailexample.com FOFA_API_KEYyour_api_key_here修改代码配置部分from dotenv import load_dotenv import os load_dotenv() EMAIL os.getenv(FOFA_EMAIL) API_KEY os.getenv(FOFA_API_KEY)4. 实战应用场景示例掌握了基础API调用后让我们看几个实际应用场景中的代码示例4.1 批量查询与聚合以下脚本可以批量查询多个目标并聚合结果queries [ titleApache Tomcat, servernginx, port3306 ] all_results {} for query in queries: results fofa_search(query) all_results[query] results print(f查询 {query} 找到 {len(results)} 条结果) # 保存聚合结果 with open(aggregated_results.json, w) as f: json.dump(all_results, f, indent2)4.2 定时监控与告警结合schedule库实现定时监控新增资产import schedule import time def monitor_new_assets(): baseline set() query domainexample.com def check(): nonlocal baseline current {item[0] for item in fofa_search(query)} new_assets current - baseline if new_assets: print(f发现 {len(new_assets)} 个新增资产:) for asset in new_assets: print(asset) baseline current # 首次运行建立基线 baseline {item[0] for item in fofa_search(query)} print(f初始基线包含 {len(baseline)} 个资产) # 每30分钟检查一次 schedule.every(30).minutes.do(check) while True: schedule.run_pending() time.sleep(1) # 注意实际生产环境应使用更可靠的调度系统如Celery或APScheduler4.3 可视化分析使用pandas和matplotlib对结果进行简单分析import pandas as pd import matplotlib.pyplot as plt def analyze_ports(results): 分析端口分布情况 df pd.DataFrame(results, columns[ip, port, protocol, *[fattr_{i} for i in range(len(results[0])-3)]]) df[port] df[port].astype(int) # 统计前10个最常见端口 top_ports df[port].value_counts().head(10) # 绘制柱状图 plt.figure(figsize(10, 6)) top_ports.plot(kindbar) plt.title(Top 10 Ports Distribution) plt.xlabel(Port Number) plt.ylabel(Count) plt.savefig(port_distribution.png) plt.close() return top_ports # 使用示例 results fofa_search(ip192.168.1.0/24) port_stats analyze_ports(results) print(port_stats)