日志聚合与ELK Stack:构建统一日志管理平台

日志聚合与ELK Stack:构建统一日志管理平台 日志聚合与ELK Stack构建统一日志管理平台一、日志管理概述1.1 为什么需要日志聚合在微服务架构中日志管理面临挑战分散存储日志分布在多个服务和容器中格式不统一各服务日志格式各异查询困难无法跨服务检索日志性能问题日志写入影响应用性能存储成本日志量大存储成本高1.2 ELK Stack架构┌─────────────────────────────────────────────────────────────────────────┐ │ ELK Stack架构 │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ 服务A │ │ 服务B │ │ 服务C │ │ 服务D │ │ │ │ 日志 │ │ 日志 │ │ 日志 │ │ 日志 │ │ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │ │ │ │ │ │ │ └──────────────┼──────────────┼──────────────┘ │ │ │ │ │ ▼ │ │ ┌───────────────┐ │ │ │ Logstash │ │ │ │ Beats │ │ │ │ (采集器) │ │ │ └───────┬───────┘ │ │ │ │ │ ▼ │ │ ┌───────────────┐ │ │ │ Elasticsearch │ │ │ │ (搜索引擎) │ │ │ └───────┬───────┘ │ │ │ │ │ ▼ │ │ ┌───────────────┐ │ │ │ Kibana │ │ │ │ (可视化) │ │ │ └───────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┘1.3 日志类型类型描述示例应用日志业务日志订单创建、用户登录系统日志框架日志Spring启动、Hibernate访问日志HTTP请求请求路径、响应时间错误日志异常信息StackTrace审计日志安全相关登录、权限变更二、Logback配置2.1 Logback基础配置?xml version1.0 encodingUTF-8? configuration scantrue scanPeriod30 seconds !-- 定义属性 -- property nameLOG_PATH value/var/log/myapp/ property nameAPP_NAME valueuser-service/ !-- Console Appender -- appender nameCONSOLE classch.qos.logback.core.ConsoleAppender encoder pattern%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n/pattern /encoder /appender !-- File Appender -- appender nameFILE classch.qos.logback.core.rolling.RollingFileAppender file${LOG_PATH}/${APP_NAME}.log/file rollingPolicy classch.qos.logback.core.rolling.TimeBasedRollingPolicy fileNamePattern${LOG_PATH}/${APP_NAME}.%d{yyyy-MM-dd}.%i.log.gz/fileNamePattern timeBasedFileNamingAndTriggeringPolicy classch.qos.logback.core.rolling.SizeAndTimeBasedFNATP maxFileSize100MB/maxFileSize /timeBasedFileNamingAndTriggeringPolicy maxHistory30/maxHistory totalSizeCap10GB/totalSizeCap /rollingPolicy encoder pattern%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n/pattern /encoder /appender !-- 异步 Appender -- appender nameASYNC_FILE classch.qos.logback.classic.AsyncAppender appender-ref refFILE/ queueSize512/queueSize discardingThreshold0/discardingThreshold includeCallerDatafalse/includeCallerData /appender !-- 开发环境配置 -- springProfile namedev root levelDEBUG appender-ref refCONSOLE/ /root logger namecom.example levelDEBUG/ /springProfile !-- 生产环境配置 -- springProfile nameprod root levelINFO appender-ref refASYNC_FILE/ /root logger namecom.example levelINFO/ logger nameorg.springframework levelWARN/ logger nameorg.hibernate levelWARN/ /springProfile /configuration2.2 JSON格式配置?xml version1.0 encodingUTF-8? configuration appender nameJSON_CONSOLE classch.qos.logback.core.ConsoleAppender encoder classch.qos.logback.core.encoder.LayoutWrappingEncoder layout classch.qos.logback.contrib.json.classic.JsonLayout timestampFormatyyyy-MM-ddTHH:mm:ss.SSSZ/timestampFormat timestampFormatTimezoneIdUTC/timestampFormatTimezoneId appendLineSeparatortrue/appendLineSeparator jsonFormatter classch.qos.logback.contrib.jackson.JacksonJsonFormatter prettyPrintfalse/prettyPrint /jsonFormatter /layout /encoder /appender !-- 自定义JSON布局 -- appender nameCUSTOM_JSON classch.qos.logback.core.ConsoleAppender encoder classch.qos.logback.core.encoder.LayoutWrappingEncoder layout classcom.example.log.CustomJsonLayout appNameuser-service/appName environment${ENVIRONMENT:unknown}/environment /layout /encoder /appender /configuration2.3 自定义JSON布局public class CustomJsonLayout extends JsonLayout { private String appName; private String environment; Override protected void addCustomFieldsToJsonMap(MapString, Object map, ILoggingEvent event) { map.put(appName, appName); map.put(environment, environment); map.put(host, getHostName()); map.put(thread, event.getThreadName()); map.put(level, event.getLevel().toString()); map.put(logger, event.getLoggerName()); map.put(message, event.getFormattedMessage()); if (event.getThrowableProxy() ! null) { map.put(exception, formatThrowable(event.getThrowableProxy())); } MapString, String mdc event.getMDCPropertyMap(); if (mdc ! null !mdc.isEmpty()) { map.put(mdc, mdc); } } private String getHostName() { try { return InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException e) { return unknown; } } }三、Logstash配置3.1 Logstash管道配置input { beats { port 5044 host 0.0.0.0 } tcp { port 5000 codec json_lines } redis { host redis port 6379 data_type list key logstash } } filter { if [logger_name] ~ /^com\.example\./ { json { source message target parsed } date { match [timestamp, ISO8601] target timestamp } mutate { remove_field [message, timestamp] } if [parsed][traceId] { mutate { add_field { traceId %{[parsed][traceId]} } } } if [parsed][spanId] { mutate { add_field { spanId %{[parsed][spanId]} } } } } if [level] ERROR { mutate { add_tag [error] } } geoip { source clientIp target geoip } useragent { source userAgent target ua } } output { elasticsearch { hosts [elasticsearch:9200] index app-logs-%{YYYY.MM.dd} document_type _doc ilm_enabled true ilm_rollover_alias app-logs ilm_pattern 000001 ilm_policy logs-policy template_name app-logs template_overwrite true } stdout { codec rubydebug } }3.2 多服务日志处理input { beats { port 5044 host 0.0.0.0 type app-logs } } filter { # 根据日志来源设置索引 if [log][file][path] ~ /user-service/ { mutate { add_field { service user-service } } } else if [log][file][path] ~ /order-service/ { mutate { add_field { service order-service } } } else if [log][file][path] ~ /payment-service/ { mutate { add_field { service payment-service } } } # JSON解析 json { source message target parsed skip_on_invalid_json true } # 提取请求信息 if [parsed][requestId] { mutate { add_field { requestId %{[parsed][requestId]} userId %{[parsed][userId]} } } } # 错误日志处理 if [level] ERROR or [parsed][level] ERROR { mutate { add_tag [error] } if [parsed][exception] { mutate { add_field { exceptionType %{[parsed][exception][type]} exceptionMessage %{[parsed][exception][message]} } } } } } output { elasticsearch { hosts [elasticsearch:9200] index %{[service]}-logs-%{YYYY.MM.dd} } }四、Elasticsearch配置4.1 Elasticsearch集群配置# docker-compose.yml version: 3.8 services: elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0 environment: - node.namees01 - cluster.namedocker-cluster - discovery.seed_hostses02,es03 - bootstrap.memory_locktrue - xpack.security.enabledtrue - xpack.security.enrollment.enabledtrue - ELASTIC_PASSWORD${ELASTIC_PASSWORD} - ES_JAVA_OPTS-Xms2g -Xmx2g ulimits: memlock: soft: -1 hard: -1 volumes: - es_data01:/usr/share/elasticsearch/data ports: - 9200:9200 - 9300:9300 networks: - elastic elasticsearch2: image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0 environment: - node.namees02 - cluster.namedocker-cluster - discovery.seed_hostses01,es03 - bootstrap.memory_locktrue - ES_JAVA_OPTS-Xms2g -Xmx2g ulimits: memlock: soft: -1 hard: -1 volumes: - es_data02:/usr/share/elasticsearch/data networks: - elastic volumes: es_data01: es_data02: networks: elastic: driver: bridge4.2 索引生命周期管理PUT _ilm/policy/logs-policy { policy: { phases: { hot: { min_age: 0ms, actions: { rollover: { max_primary_shard_size: 50gb, max_age: 1d }, set_priority: { priority: 100 } } }, warm: { min_age: 7d, actions: { shrink: { number_of_shards: 1 }, forcemerge: { max_num_segments: 1 }, set_priority: { priority: 50 } } }, cold: { min_age: 30d, actions: { set_priority: { priority: 0 }, freeze: {} } }, delete: { min_age: 90d, actions: { delete: {} } } } } }4.3 索引模板配置PUT _index_template/app-logs { index_patterns: [app-logs-*], template: { settings: { number_of_shards: 3, number_of_replicas: 1, index.lifecycle.name: logs-policy, index.lifecycle.rollover_alias: app-logs }, mappings: { properties: { timestamp: { type: date }, level: { type: keyword }, service: { type: keyword }, logger: { type: keyword }, message: { type: text, fields: { keyword: { type: keyword, ignore_above: 256 } } }, traceId: { type: keyword }, spanId: { type: keyword }, requestId: { type: keyword }, userId: { type: keyword }, exception: { type: object, properties: { type: { type: keyword }, message: { type: text }, stackTrace: { type: text } } }, geoip: { type: object, properties: { city_name: { type: keyword }, country_name: { type: keyword }, location: { type: geo_point } } } } } }, priority: 100 }五、Kibana配置5.1 索引模式配置# Kibana中创建索引模式 Settings Index Patterns Create index pattern 支持的索引模式 - app-logs-* - user-service-logs-* - order-service-logs-*5.2 可视化仪表盘// 日志级别分布饼图 { title: 日志级别分布, type: pie, params: { addLegend: true, addTooltip: true, isDonut: true }, aggs: [ { id: 1, type: count, schema: metric }, { id: 2, type: terms, schema: segment, params: { field: level, orderBy: 1, order: desc, size: 10 } } ] } // 错误日志趋势图 { title: 错误日志趋势, type: line, params: { addLegend: true, addTooltip: true, categoryAxes: [ { id: CategoryAxis-1, type: category, position: bottom } ], valueAxes: [ { id: ValueAxis-1, name: LeftAxis-1, type: value, position: left } ] }, aggs: [ { id: 1, type: count, schema: metric }, { id: 2, type: date_histogram, schema: segment, params: { field: timestamp, interval: auto } } ], filter: [ { query: { term: { level: ERROR } } } ] }5.3 告警规则PUT _ watcher/watch/log-error-alert { trigger: { schedule: { interval: 5m } }, input: { search: { request: { indices: [app-logs-*], body: { query: { bool: { must: [ { range: { timestamp: { gte: now-5m } } }, { term: { level: ERROR } } ] } }, aggs: { by_service: { terms: { field: service, size: 10 } } } } } } }, condition: { compare: { ctx.payload.hits.total.value: { gt: 10 } } }, actions: { log_error_alert: { logging: { level: warn, text: High error rate detected: {{ctx.payload.hits.total.value}} errors in the last 5 minutes } }, slack_notification: { slack: { message: { to: [#alerts], text: Error Alert: {{ctx.payload.hits.total.value}} errors detected, attachments: [ { color: danger, fields: [ { title: Error Count, value: {{ctx.payload.hits.total.value}}, short: true }, { title: Time Range, value: Last 5 minutes, short: true } ] } ] } } } } }六、日志关联分析6.1 Trace关联Component public class TraceLoggingFilter extends OncePerRequestFilter { private static final Logger log LoggerFactory.getLogger(TraceLoggingFilter.class); Autowired private Tracer tracer; Override protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain chain) throws ServletException, IOException { Span currentSpan tracer.currentSpan(); MDC.put(traceId, currentSpan.context().traceId()); MDC.put(spanId, currentSpan.context().spanId()); try { chain.doFilter(request, response); } finally { MDC.remove(traceId); MDC.remove(spanId); } } }6.2 MDC配置Component public class MdcLoggingFilter extends OncePerRequestFilter { Override protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain chain) throws ServletException, IOException { MDC.put(requestId, UUID.randomUUID().toString()); MDC.put(remoteAddr, request.getRemoteAddr()); MDC.put(userAgent, request.getHeader(User-Agent)); try { chain.doFilter(request, response); } finally { MDC.clear(); } } }6.3 日志聚合查询// 跨服务追踪查询 GET app-logs-*/_search { query: { bool: { must: [ { term: { traceId: abc123def456 } } ] } }, sort: [ { timestamp: asc } ], aggs: { by_service: { terms: { field: service }, aggs: { errors: { filter: { term: { level: ERROR } } } } } } }七、Filebeat配置7.1 Filebeat采集配置filebeat.inputs: - type: log enabled: true paths: - /var/log/myapp/*.log json: keys_under_root: true add_error_key: true message_key: message fields: service: user-service environment: ${ENVIRONMENT:unknown} fields_under_root: true - type: container enabled: true paths: - /var/lib/docker/containers/*/*.log processors: - add_kubernetes_metadata: host: ${NODE_NAME} matchers: - logs_path: logs_path: /var/lib/docker/containers/ processors: - add_host_metadata: when.not.contains.tags: forwarded - add_cloud_metadata: ~ - add_docker_metadata: ~ - decode_json_fields: fields: [message] target: overwrite_keys: true add_error_key: true output.logstash: hosts: [logstash:5044] logging.level: info logging.to_files: true logging.files: path: /var/log/filebeat name: filebeat keepfiles: 7 permissions: 06447.2 Kubernetes部署apiVersion: v1 kind: ConfigMap metadata: name: filebeat-config data: filebeat.yml: | filebeat.inputs: - type: container enabled: true paths: - /var/log/containers/*.log processors: - add_kubernetes_metadata: host: ${NODE_NAME} matchers: - logs_path: logs_path: /var/log/containers/ - add_docker_metadata: host: unix:///var/run/docker.sock output.elasticsearch: hosts: [${ELASTICSEARCH_HOST:elasticsearch}:9200] index: app-logs-%{yyyy.MM.dd} setup.kibana: host: ${KIBANA_HOST:kibana}:5601八、日志最佳实践8.1 日志级别使用级别使用场景示例ERROR错误影响功能异常、连接失败WARN警告可能有问题重试、超时INFO重要业务事件登录、下单、支付DEBUG开发调试信息参数、返回值TRACE详细追踪信息方法入口、循环8.2 日志规范// 好的日志实践 Service Slf4j public class UserService { public User createUser(CreateUserRequest request) { log.info(Creating user: username{}, email{}, request.getUsername(), request.getEmail()); try { User user userRepository.save(user); log.info(User created successfully: userId{}, user.getId()); return user; } catch (DuplicateKeyException e) { log.warn(User creation failed - duplicate: username{}, request.getUsername()); throw new UserAlreadyExistsException(request.getUsername()); } catch (Exception e) { log.error(User creation failed: username{}, request.getUsername(), e); throw new UserCreationException(e); } } }8.3 敏感数据处理Component public class SensitiveDataFilter extends Filter { private static final SetString SENSITIVE_KEYS Set.of( password, token, secret, apiKey, creditCard ); Override protected void doFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain) throws ServletException, IOException { ContentCachingRequestWrapper wrappedRequest new ContentCachingRequestWrapper(request); chain.doFilter(wrappedRequest, response); byte[] content wrappedRequest.getContentAsByteArray(); String requestBody new String(content); String maskedBody maskSensitiveData(requestBody); log.debug(Request body: {}, maskedBody); } private String maskSensitiveData(String data) { for (String key : SENSITIVE_KEYS) { data data.replaceAll( \ key \\\s*:\\s*\[^\]*\, \ key \: \***\ ); } return data; } }九、异常日志收集9.1 全局异常处理RestControllerAdvice Slf4j public class GlobalExceptionHandler { ExceptionHandler(Exception.class) public ResponseEntityErrorResponse handleException(Exception e, HttpServletRequest request) { ErrorResponse error ErrorResponse.builder() .timestamp(LocalDateTime.now()) .status(HttpStatus.INTERNAL_SERVER_ERROR.value()) .error(Internal Server Error) .message(e.getMessage()) .path(request.getRequestURI()) .traceId(getCurrentTraceId()) .build(); log.error(Exception occurred: path{}, traceId{}, request.getRequestURI(), error.getTraceId(), e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(error); } ExceptionHandler(BusinessException.class) public ResponseEntityErrorResponse handleBusinessException( BusinessException e, HttpServletRequest request) { log.warn(Business exception: code{}, message{}, path{}, e.getCode(), e.getMessage(), request.getRequestURI()); ErrorResponse error ErrorResponse.builder() .timestamp(LocalDateTime.now()) .status(HttpStatus.BAD_REQUEST.value()) .error(Business Error) .code(e.getCode()) .message(e.getMessage()) .path(request.getRequestURI()) .traceId(getCurrentTraceId()) .build(); return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(error); } }9.2 异步日志收集Component public class AsyncLogCollector { private final BlockingQueueLogEvent logQueue new LinkedBlockingQueue(10000); PostConstruct public void init() { Executors.newSingleThreadExecutor().submit(this::processLogs); } public void collect(LogEvent event) { if (!logQueue.offer(event)) { log.warn(Log queue full, dropping log event); } } private void processLogs() { while (true) { try { LogEvent event logQueue.take(); sendToElasticsearch(event); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } } }十、总结日志聚合是构建可观测系统的重要组成部分通过本文的介绍你可以日志管理概述为什么需要日志聚合及ELK Stack架构Logback配置基础配置、JSON格式、自定义布局Logstash配置管道配置、多服务日志处理Elasticsearch配置集群配置、索引生命周期管理、索引模板Kibana配置索引模式、可视化仪表盘、告警规则日志关联分析Trace关联、MDC配置、跨服务追踪Filebeat配置日志采集、Kubernetes部署最佳实践日志级别、日志规范、敏感数据处理一个完善的日志管理平台可以帮助快速定位问题、监控系统健康状况、满足审计合规要求。