12.Day12:动态上下文提纯——优化 Prompt 与防护机制

12.Day12:动态上下文提纯——优化 Prompt 与防护机制 Day 12: 动态上下文提纯 —— 优化 Prompt 与防护机制今日目标针对真实企业环境中“超大异常日志”撑爆大模型内存Context Window的痛点编写 Python 数据清洗与截断逻辑。同时在 API 请求中引入硬件级的max_tokens限制并在 System Prompt 中加入“极简表述”指令彻底榨干大模型输出中的水分保障系统的绝对稳定与高性价比 架构大纲今天我们要加装哪些“防弹装甲”输入端字符级物理截断 (Payload Truncation)在把rare_logs_payload喂给大模型之前强制设定最大字符数如 6000 字符约 1500 Tokens。一旦超过直接一刀切断并补上[TRUNCATED]提示防止 API 报 400 错。控制端Prompt 极致压榨 (Prompt Distillation)修改大模型的 System Prompt强令其“极度克制Extremely concise”并“限制字数”不准讲废话。输出端硬件级 Token 熔断 (max_tokensBarrier)在请求大模型 API 时强行加上max_tokens参数。即使大模型失控想长篇大论网关也会在阈值处如 800 Tokens强行掐断保护你的 API 余额。 终极实战Day 12 性能优化版全量代码请打开 Add-on Builder 的Define Test编辑器用以下代码覆盖原有代码。请注意观察代码中打有[DAY 12 ...]标记的地方importosimportsysimporttimeimportdatetimeimportjsonimportuuidimportrequestsimportsplunklib.clientasclientimportsplunklib.resultsasresults# # HELPER 1: Execute AI Generated SPL# defexecute_ai_spl(helper,service,spl_query): Execute SPL generated by AI and return the raw result data. spl_queryspl_query.strip()# Force the search prefix to prevent syntax errorsifnotspl_query.startswith(search)andnotspl_query.startswith(|):spl_querysearch spl_query kwargs_oneshot{output_mode:json}helper.log_info(f[Agentic Engine] Executing SPL:{spl_query})try:search_resultsservice.jobs.oneshot(spl_query,**kwargs_oneshot)readerresults.JSONResultsReader(search_results)result_data[resforresinreaderifisinstance(res,dict)]helper.log_info(f[Agentic Engine] SUCCESS: Found{len(result_data)}events.)returnresult_dataexceptExceptionase:helper.log_error(f[Agentic Engine] FAILED execution:{str(e)})return[]# # HELPER 2: Fetch Real Logs (M-ATH Concept)# deffetch_rare_logs(helper,service,target_index): Fetch the most recent rare/anomalous logs from the target index to feed the AI. helper.log_info(Fetching real rare logs for analysis...)# Fetching fresh data. Use cluster only if CPU permits, otherwise use head.splfsearch index{target_index}| head 5 | table _rawtry:results_dataexecute_ai_spl(helper,service,spl)ifnotresults_data:returnNone# Extract the _raw strings and join them into a single text payloadraw_logs[item.get(_raw,)foriteminresults_dataif_rawinitem]payload\n.join(raw_logs)# # [DAY 12 NEW]: Context Distillation (Payload Truncation)# Prevents massive Splunk logs from blowing up the LLM Context Window# MAX_CHARS6000# Roughly equals 1500 Tokensiflen(payload)MAX_CHARS:helper.log_info(fPayload too large ({len(payload)}chars). Truncating to{MAX_CHARS}...)# Slice the string and append a clear signal for the LLMpayloadpayload[:MAX_CHARS]\n\n...[TRUNCATED DUE TO CONTEXT LIMITS. ANALYZE AVAILABLE DATA ONLY.]...# returnpayloadexceptExceptionase:helper.log_error(fFailed to fetch rare logs:{str(e)})returnNone# # HELPER 3: The LLM API Connector# # [DAY 12 MODIFIED]: Added dynamic max_tokens parameter to function signaturedefcall_llm_api(helper,api_key,base_url,model,system_prompt,user_prompt,max_tokens): Establish real HTTP connection to the LLM API and return the JSON response. headers{Authorization:fBearer{api_key},Content-Type:application/json}payload{model:model,messages:[{role:system,content:system_prompt},{role:user,content:user_prompt}],# Mandatory flag for modern LLMs to strictly output JSONresponse_format:{type:json_object},# # [DAY 12 NEW]: Hardware-level output boundary (Token Circuit Breaker)# max_tokens:max_tokens}# Ensure URL formatting is correctendpointbase_urlifbase_url.endswith(/chat/completions)elsef{base_url.rstrip(/)}/chat/completionstry:helper.log_info(fInitiating network request to LLM API:{endpoint}(Max Tokens:{max_tokens}))# 120s timeout ensures deep-thinking models (CoT) have enough timeresponserequests.post(endpoint,headersheaders,jsonpayload,timeout120)response.raise_for_status()response_jsonresponse.json()llm_contentresponse_json[choices][0][message][content]# Extract Token usage for FinOps/Cost Dashboardstotal_tokensresponse_json.get(usage,{}).get(total_tokens,0)helper.log_info(fAPI Call Success. Consumed{total_tokens}tokens.)returnllm_content,total_tokensexceptrequests.exceptions.RequestExceptionase:helper.log_error(fNetwork error during API call:{str(e)})raise# # MAIN WORKFLOW: The Autonomous Agent# defcollect_events(helper,ew): Day 11 Day 12: The Ultimate Live Workflow. Features: Real API Integration, Unix Epoch Time injection, Anti-Hallucination, and Truncation. helper.log_info(PEAK AI Hunter: LIVE MODE INITIALIZED.)cycle_start_timetime.time()# Generate a unique Session ID to stitch the flattened logs togetherhunt_session_idstr(uuid.uuid4())try:# 1. Acquire Splunk Service Sessionsession_keygetattr(helper,session_key,None)orgetattr(helper._input_definition,metadata,{}).get(session_key)ifnotsession_key:raiseValueError(Failed to acquire session_key.)serviceclient.Service(tokensession_key)# 2. Acquire Global Setup Configurations (API credentials)api_keyhelper.get_global_setting(api_key)base_urlhelper.get_global_setting(base_url)model_namehelper.get_global_setting(model_name)target_indexhelper.get_output_index()ormainifnotapi_keyornotbase_url:raiseValueError(API Key or Base URL is missing in Global Settings.)# # PHASE 1: PREPARE (Real LLM Call for Blueprint)# rare_logs_payloadfetch_rare_logs(helper,service,target_index)ifnotrare_logs_payload:helper.log_info(No anomalous logs found to analyze. Terminating cycle early gracefully.)return# # [DAY 12 MODIFIED]: Prompt Distillation - Forcing extreme conciseness# sys_prompt_prepareYou are a Senior Threat Hunter. You MUST reply in JSON format. Be extremely concise. No pleasantries. Schema requires: analysis (string) and hypotheses (array of objects). Each hypothesis must have hypothesis_id, ABLE (Actor, Behavior, Location, Evidence), spl_round_1_validation, and spl_round_2_drilldown.# ANTI-HALLUCINATION FIX (Day 11): Forcing the LLM to strictly use {target_index} parameterusr_prompt_preparefAnalyze these real, rare logs from our environment:\n{rare_logs_payload}\n\nGenerate exactly 2 hunting hypotheses. CRITICAL: For spl_round_1_validation and spl_round_2_drilldown, you MUST strictly start your queries with search index{{target_index}}. Do NOT guess or use real index names! Output ONLY JSON format.helper.log_info(Triggering LLM for Prepare Phase...)# [DAY 12 MODIFIED]: Pass max_tokens1500 for generating SPLsblueprint_text,prep_tokenscall_llm_api(helper,api_key,base_url,model_name,sys_prompt_prepare,usr_prompt_prepare,max_tokens1500)ai_hunting_planjson.loads(blueprint_text.strip())hypothesesai_hunting_plan.get(hypotheses,[])# Write Plan to Splunk IMMEDIATELY (Injecting dynamic Unix Time)ew.write_event(helper.new_event(sourcehelper.get_input_type(),indextarget_index,sourcetype_json,timetime.time(),# THE ULTIMATE TIMEZONE FIX (Day 11)datajson.dumps({session_id:hunt_session_id,event_type:PEAK_Plan,timestamp:round(time.time(),3),content:ai_hunting_plan},ensure_asciiFalse)))# # PHASE 2: EXECUTE (Agentic Splunk Query Loop)# all_hunt_evidence[]fori,hypinenumerate(hypotheses):hyp_starttime.time()spl_r1hyp.get(spl_round_1_validation,).replace({target_index},target_index)spl_r2hyp.get(spl_round_2_drilldown,).replace({target_index},target_index)r1_hitslen(execute_ai_spl(helper,service,spl_r1))r2_hitslen(execute_ai_spl(helper,service,spl_r2))all_hunt_evidence.append({hypothesis_id:hyp.get(hypothesis_id,i1),threat_behavior:hyp.get(ABLE,{}).get(Behavior,Unknown),round_1_hit_count:r1_hits,round_2_hit_count:r2_hits,execution_duration_sec:round(time.time()-hyp_start,2)})# Write Evidence to Splunk IMMEDIATELY (Injecting dynamic Unix Time)ew.write_event(helper.new_event(sourcehelper.get_input_type(),indextarget_index,sourcetype_json,timetime.time(),# THE ULTIMATE TIMEZONE FIX (Day 11)datajson.dumps({session_id:hunt_session_id,event_type:PEAK_Evidence,timestamp:round(time.time(),3),content:all_hunt_evidence},ensure_asciiFalse)))# # PHASE 3: ACT (Real LLM Call for Final Report)# # # [DAY 12 MODIFIED]: Concise prompt for Act Phase (Limits summary length)# sys_prompt_actYou are a Security Director. Output ONLY valid JSON. Keep summaries under 30 words. Keys: executive_summary, threat_qualification (Benign/Suspicious/Confirmed), risk_score (0-100), recommended_alert_spl.usr_prompt_actfHere is the quantitative execution evidence collected by our agent:\n{json.dumps(all_hunt_evidence)}\n\nBased on these hit counts, qualify the threat, assign a risk score, and generate an alert SPL. Reply in JSON format.helper.log_info(Triggering LLM for Act Phase...)# [DAY 12 MODIFIED]: Pass max_tokens800 since this is just a short summaryreport_text,act_tokenscall_llm_api(helper,api_key,base_url,model_name,sys_prompt_act,usr_prompt_act,max_tokens800)try:final_reportjson.loads(report_text.strip())exceptjson.JSONDecodeErrorase:helper.log_error(JSON Truncation in Act Phase. Engaging fallback.)final_report{executive_summary:LLM output truncated.,risk_score:-1,raw:report_text}# Write Final Report to Splunk (Injecting dynamic Unix Time)ew.write_event(helper.new_event(sourcehelper.get_input_type(),indextarget_index,sourcetype_json,timetime.time(),# THE ULTIMATE TIMEZONE FIX (Day 11)datajson.dumps({session_id:hunt_session_id,event_type:PEAK_Final_Report,timestamp:round(time.time(),3),total_tokens_used:prep_tokensact_tokens,content:final_report},ensure_asciiFalse)))helper.log_info(fLIVE CYCLE COMPLETE. Time:{round(time.time()-cycle_start_time,2)}s. Session ID:{hunt_session_id})exceptExceptionase:helper.log_error(fFATAL Pipeline Crash:{str(e)}) 极客验证见证“瘦身”与提速的奇迹今天我们不再纠结于跑通因为我们在 Day 11 已经完美跑通了我们要看疗效、算细账操作步骤与验证将以上代码贴入 AOB 并点击Save。点击Test。在此之前你可以故意造一条几万字的长日志写入到你的main索引里。观察底部的输出日志。你会看到一句极为优雅的安全防线拦截提示[INFO] Payload too large (XXXX chars). Truncating to 6000...切回 Splunk 的 Search 界面执行你的 Dashboard 专属核心 SPLindexmain sourcetype_json event_typePEAK_Plan OR event_typePEAK_Evidence OR event_typePEAK_Final_Report | spath | stats min(timestamp) as Start_Time_Epoch, max(timestamp) as End_Time_Epoch, latest(content.risk_score) as Risk_Score, latest(content.executive_summary) as Summary, sum(content{}.round_1_hit_count) as Total_R1_Hits, sum(content{}.round_2_hit_count) as Total_R2_Hits, sum(total_tokens_used) as Total_Tokens by session_id | eval Execution_Time_Sec round(End_Time_Epoch - Start_Time_Epoch, 2) | eval Start_Time strftime(Start_Time_Epoch, %Y-%m-%d %H:%M:%S) | sort - Start_Time_Epoch | table Start_Time, session_id, Risk_Score, Total_R1_Hits, Total_R2_Hits, Execution_Time_Sec, Total_Tokens, Summary 你的验收指标看看Execution_Time_Sec(执行耗时)以前大模型发散性思考可能需要很长时间现在因为我们强制要求Keep summaries under 30 words并且用max_tokens卡死了输出上限你的平均执行耗时将会变得更加紧凑。看看Total_Tokens(开销成本)由于我们在输入端一刀砍掉了超长日志的废话并且限制了输出字数这个数字将被牢牢控制在最高效的区间。即使线上发生了千万级报错轰炸你也绝不会收到破产账单至此这台引擎不仅能跑而且省油、抗压、永不爆缸赶紧点火测试一下吧