Day 9成果落地 —— Act 阶段战报生成与大屏数据落盘今日目标完成整个 PEAK 框架的最核心闭环Act with Knowledge基于知识采取行动。此时Day 8 的代码已经执行完毕我们的 Python 内存中拥有了一个名为all_hunt_evidence的列表里面装满了诸如round_1_hit_count: 150这样的硬核数据。接下来的任务是将这些数据转化为 LLM 能看懂的 Prompt触发二次推理并将最终的 JSON 战报连同前面的证据一起打包成一个巨大的“企业级 Payload”写入 Splunk 索引。 Step 1: Act 阶段 (喂入证据、二次触发 LLM 与防御性解析)动作解析这一步的代码必须严格粘贴在 Day 8 的for循环结束之后。数据序列化把 Python 里的字典数组转成 JSON 字符串因为大模型只能读懂文本。构建 Prompt赋予 AI “安全总监”的人设要求它根据命中数Hit Counts的逻辑关系如 R1 很高但 R2 为 0说明是误报R1 和 R2 都很高说明是真实攻击给出一个 0-100 的确切风险评分。防弹解析通过try...except json.JSONDecodeError防止大模型因为抽风输出非 JSON 格式而导致整个 5 分钟的调度任务崩溃。 注入核心逻辑代码 (接在 Day 8 代码下方)# # STEP 4: The Act Phase (Day 9 AI Final Qualification)# helper.log_info(Initiating Act Phase: Triggering LLM API for Final Assessment...)# 1. Serialize the collected evidence list into a JSON string to feed the LLMevidence_payloadjson.dumps(all_hunt_evidence,ensure_asciiFalse)# 2. In a production environment, you would construct your messages here# and execute the second API call. For testing the pipeline, we mock the response.# This mock represents the LLM analyzing the evidence_payload and making a decision.mock_act_response { executive_summary: 通过两轮下钻分析假设1(暴力破解)在R2深度关联中发现连续高频请求证实存在凭证填充攻击风险。, threat_qualification: Confirmed Threat, risk_score: 92, recommended_alert_spl: search indexmain CIp* | bin _time span1s | stats count by _time, CIp | where count 10 } # 3. Defensive parsing block to handle potential AI format hallucinationsfinal_report{}try:# Strip whitespace to prevent JSONDecodeError from trailing newlinesfinal_reportjson.loads(mock_act_response.strip())helper.log_info(fAI Assessment Complete. Final Risk Score calculated:{final_report.get(risk_score)})exceptjson.JSONDecodeErrorase:helper.log_error(fCritical error: Failed to parse final report JSON. Detail:{str(e)})# Fallback mechanism to ensure the data pipeline does not breakfinal_report{executive_summary:Error parsing AI response during Act Phase.,threat_qualification:Unknown,risk_score:0,recommended_alert_spl:} Step 2: 数据落盘 (组装上帝视角 Payload 并写入 Splunk)动作解析很多新手写 Splunk 插件习惯用print()或者helper.log_info()输出结果这是大错特错的打印在后台的日志_internal索引根本无法用于企业级大屏展示。组装 Master Payload我们将 Day 7 的蓝图ai_hunting_plan、Day 8 的证据execution_metrics和刚才 Day 9 得到的终极定性final_assessment合并到一个字典中。这就相当于一份包含了起因、经过、结果的完整卷宗。强制指定 Sourcetype使用helper.new_event()时必须将sourcetype显式声明为_json。这会触发 Splunk 底层的自动字段提取机制Auto KV Extraction让复杂的 JSON 树在前端直接变成可被搜索的独立字段。提交写入使用ew.write_event(event)将这条史诗级事件永久打入业务索引如main索引。 注入落盘逻辑代码# # STEP 5: Data Ingestion (Writing the Master Payload to Splunk)# helper.log_info(Assembling Master Hunt Report for index ingestion...)# 1. Assemble the ultimate JSON document containing all three phases of PEAKmaster_payload{event_type:PEAK_Hunting_Report,# CRITICAL: Anchor field for Dashboard searchestimestamp:datetime.datetime.utcnow().isoformat(),target_index:target_index,hunting_plan:ai_hunting_plan,# The original blueprint from Prepare Phaseexecution_metrics:all_hunt_evidence,# The hits and duration from Execute Phasefinal_assessment:final_report# The ultimate AI qualification from Act Phase}# 2. Create a new Splunk event object using the Add-on Builder helper# Setting sourcetype to _json enables native Splunk JSON syntax highlighting and field extractioneventhelper.new_event(sourcehelper.get_input_type(),indextarget_index,sourcetype_json,datajson.dumps(master_payload,ensure_asciiFalse))# 3. Commit the event to the user-specified Splunk data storeew.write_event(event)# Stop the master stopwatch and calculate total execution timetotal_cycle_timeround(time.time()-cycle_start_time,2)helper.log_info(fSUCCESS: Master Hunt Report completely written to Splunk! Total cycle time:{total_cycle_time}seconds.)# 4. Catch-all exception block for the entire Agentic WorkflowexceptExceptionase:helper.log_error(fCritical Failure during Agentic Execution Workflow:{str(e)}) Step 3: 终极极客验证 (全自动闭环的结果确认)代码写完真正的极客绝不相信“理论上能跑”我们必须用 SPL 在前端验证数据是否已完美结构化落地。详细验证操作在 AOB 代码编辑器的右上角点击绿色的Test按钮。观察底部 Output 面板等待打印出绿色的Done。点击右上角 Save 按钮保存代码(不点 Save 代码不会生效)打开一个新的浏览器标签页进入 Splunk 的Search Reporting(搜索与报表) 应用。在搜索框中输入以下极客指令将时间范围调整为 Last 15 minutesindexmain sourcetype_json event_typePEAK_Hunting_Report | rename final_assessment.risk_score as Risk_Score, final_assessment.threat_qualification as Threat_Level, final_assessment.executive_summary as Summary | table _time, target_index, Threat_Level, Risk_Score, Summary | sort - Risk_Score 终极成功标志与意义你将看到什么表格中清晰展现出Risk_Score 92,Threat_Level Confirmed Threat以及详细的中文战报摘要。背后的架构意义由于我们在 Step 5 中设置了sourcetype_jsonSplunk 极其聪明地自动解析了深达三层嵌套的 JSON 结构它自动识别出了final_assessment.risk_score这个字段路径。这种无缝的结构化数据落盘是 Day 16 我们能够用几行 SPL 就画出极其绚丽的高管安全态势大屏的核心保障
09.Day 9:成果落地——Act 阶段战报生成与大屏数据落盘
Day 9成果落地 —— Act 阶段战报生成与大屏数据落盘今日目标完成整个 PEAK 框架的最核心闭环Act with Knowledge基于知识采取行动。此时Day 8 的代码已经执行完毕我们的 Python 内存中拥有了一个名为all_hunt_evidence的列表里面装满了诸如round_1_hit_count: 150这样的硬核数据。接下来的任务是将这些数据转化为 LLM 能看懂的 Prompt触发二次推理并将最终的 JSON 战报连同前面的证据一起打包成一个巨大的“企业级 Payload”写入 Splunk 索引。 Step 1: Act 阶段 (喂入证据、二次触发 LLM 与防御性解析)动作解析这一步的代码必须严格粘贴在 Day 8 的for循环结束之后。数据序列化把 Python 里的字典数组转成 JSON 字符串因为大模型只能读懂文本。构建 Prompt赋予 AI “安全总监”的人设要求它根据命中数Hit Counts的逻辑关系如 R1 很高但 R2 为 0说明是误报R1 和 R2 都很高说明是真实攻击给出一个 0-100 的确切风险评分。防弹解析通过try...except json.JSONDecodeError防止大模型因为抽风输出非 JSON 格式而导致整个 5 分钟的调度任务崩溃。 注入核心逻辑代码 (接在 Day 8 代码下方)# # STEP 4: The Act Phase (Day 9 AI Final Qualification)# helper.log_info(Initiating Act Phase: Triggering LLM API for Final Assessment...)# 1. Serialize the collected evidence list into a JSON string to feed the LLMevidence_payloadjson.dumps(all_hunt_evidence,ensure_asciiFalse)# 2. In a production environment, you would construct your messages here# and execute the second API call. For testing the pipeline, we mock the response.# This mock represents the LLM analyzing the evidence_payload and making a decision.mock_act_response { executive_summary: 通过两轮下钻分析假设1(暴力破解)在R2深度关联中发现连续高频请求证实存在凭证填充攻击风险。, threat_qualification: Confirmed Threat, risk_score: 92, recommended_alert_spl: search indexmain CIp* | bin _time span1s | stats count by _time, CIp | where count 10 } # 3. Defensive parsing block to handle potential AI format hallucinationsfinal_report{}try:# Strip whitespace to prevent JSONDecodeError from trailing newlinesfinal_reportjson.loads(mock_act_response.strip())helper.log_info(fAI Assessment Complete. Final Risk Score calculated:{final_report.get(risk_score)})exceptjson.JSONDecodeErrorase:helper.log_error(fCritical error: Failed to parse final report JSON. Detail:{str(e)})# Fallback mechanism to ensure the data pipeline does not breakfinal_report{executive_summary:Error parsing AI response during Act Phase.,threat_qualification:Unknown,risk_score:0,recommended_alert_spl:} Step 2: 数据落盘 (组装上帝视角 Payload 并写入 Splunk)动作解析很多新手写 Splunk 插件习惯用print()或者helper.log_info()输出结果这是大错特错的打印在后台的日志_internal索引根本无法用于企业级大屏展示。组装 Master Payload我们将 Day 7 的蓝图ai_hunting_plan、Day 8 的证据execution_metrics和刚才 Day 9 得到的终极定性final_assessment合并到一个字典中。这就相当于一份包含了起因、经过、结果的完整卷宗。强制指定 Sourcetype使用helper.new_event()时必须将sourcetype显式声明为_json。这会触发 Splunk 底层的自动字段提取机制Auto KV Extraction让复杂的 JSON 树在前端直接变成可被搜索的独立字段。提交写入使用ew.write_event(event)将这条史诗级事件永久打入业务索引如main索引。 注入落盘逻辑代码# # STEP 5: Data Ingestion (Writing the Master Payload to Splunk)# helper.log_info(Assembling Master Hunt Report for index ingestion...)# 1. Assemble the ultimate JSON document containing all three phases of PEAKmaster_payload{event_type:PEAK_Hunting_Report,# CRITICAL: Anchor field for Dashboard searchestimestamp:datetime.datetime.utcnow().isoformat(),target_index:target_index,hunting_plan:ai_hunting_plan,# The original blueprint from Prepare Phaseexecution_metrics:all_hunt_evidence,# The hits and duration from Execute Phasefinal_assessment:final_report# The ultimate AI qualification from Act Phase}# 2. Create a new Splunk event object using the Add-on Builder helper# Setting sourcetype to _json enables native Splunk JSON syntax highlighting and field extractioneventhelper.new_event(sourcehelper.get_input_type(),indextarget_index,sourcetype_json,datajson.dumps(master_payload,ensure_asciiFalse))# 3. Commit the event to the user-specified Splunk data storeew.write_event(event)# Stop the master stopwatch and calculate total execution timetotal_cycle_timeround(time.time()-cycle_start_time,2)helper.log_info(fSUCCESS: Master Hunt Report completely written to Splunk! Total cycle time:{total_cycle_time}seconds.)# 4. Catch-all exception block for the entire Agentic WorkflowexceptExceptionase:helper.log_error(fCritical Failure during Agentic Execution Workflow:{str(e)}) Step 3: 终极极客验证 (全自动闭环的结果确认)代码写完真正的极客绝不相信“理论上能跑”我们必须用 SPL 在前端验证数据是否已完美结构化落地。详细验证操作在 AOB 代码编辑器的右上角点击绿色的Test按钮。观察底部 Output 面板等待打印出绿色的Done。点击右上角 Save 按钮保存代码(不点 Save 代码不会生效)打开一个新的浏览器标签页进入 Splunk 的Search Reporting(搜索与报表) 应用。在搜索框中输入以下极客指令将时间范围调整为 Last 15 minutesindexmain sourcetype_json event_typePEAK_Hunting_Report | rename final_assessment.risk_score as Risk_Score, final_assessment.threat_qualification as Threat_Level, final_assessment.executive_summary as Summary | table _time, target_index, Threat_Level, Risk_Score, Summary | sort - Risk_Score 终极成功标志与意义你将看到什么表格中清晰展现出Risk_Score 92,Threat_Level Confirmed Threat以及详细的中文战报摘要。背后的架构意义由于我们在 Step 5 中设置了sourcetype_jsonSplunk 极其聪明地自动解析了深达三层嵌套的 JSON 结构它自动识别出了final_assessment.risk_score这个字段路径。这种无缝的结构化数据落盘是 Day 16 我们能够用几行 SPL 就画出极其绚丽的高管安全态势大屏的核心保障