日志聚合方案完全指南前言在微服务架构中日志分散在各个服务实例中聚合日志对于问题排查和系统监控至关重要。本文将详细介绍日志聚合方案的设计与实现包括日志收集、存储和查询。一、日志聚合架构┌─────────────────────────────────────────────────────┐ │ Centralized Logging Architecture │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Service A│ │ Service B│ │ Service C│ │ │ └─────┬────┘ └─────┬────┘ └─────┬────┘ │ │ │ │ │ │ │ └─────────────┼─────────────┘ │ │ │ │ │ ┌───────▼───────┐ │ │ │ Fluentd/ │ │ │ │ Filebeat │ │ │ └───────┬───────┘ │ │ │ │ │ ┌───────▼───────┐ │ │ │ Elasticsearch │ │ │ │ (Store) │ │ │ └───────┬───────┘ │ │ │ │ │ ┌───────▼───────┐ │ │ │ Kibana │ │ │ │ (Visualize) │ │ │ └───────────────┘ │ └─────────────────────────────────────────────────────┘二、ELK Stack配置2.1 Elasticsearch配置# docker-compose.yml version: 3.8 services: elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0 container_name: elasticsearch environment: - node.nameelasticsearch - cluster.namedocker-cluster - bootstrap.memory_locktrue - discovery.typesingle-node - ES_JAVA_OPTS-Xms2g -Xmx2g - xpack.security.enabledtrue - xpack.security.http.ssl.enabledfalse ulimits: memlock: soft: -1 hard: -1 volumes: - es_data:/usr/share/elasticsearch/data ports: - 9200:9200 - 9300:9300 networks: - elastic volumes: es_data: driver: local networks: elastic: driver: bridge2.2 Logstash配置# logstash.conf input { beats { port 5044 host 0.0.0.0 } tcp { port 5000 codec json_lines } } filter { if [fields][service] order-service { grok { match { message %{TIMESTAMP_ISO8601:timestamp} \[%{DATA:thread}\] %{LOGLEVEL:level} %{DATA:class} - %{GREEDYDATA:log_message} } } } date { match [ timestamp, ISO8601 ] target timestamp } mutate { add_field { [metadata][index] microservices } } if [level] ERROR { mutate { add_tag [ error ] } } } output { if [metadata][index] microservices { elasticsearch { hosts [elasticsearch:9200] index microservices-%{YYYY.MM.dd} user elastic password ${ELASTIC_PASSWORD} } } stdout { codec rubydebug } }2.3 Filebeat配置# filebeat.yml filebeat.inputs: - type: log enabled: true paths: - /var/log/microservices/*.log json.keys_under_root: true json.add_error_key: true json.message_key: message fields: service: microservices environment: ${ENVIRONMENT:production} fields_under_root: true processors: - add_host_metadata: when.not.contains.tags: forwarded - add_cloud_metadata: ~ - add_docker_metadata: ~ - add_kubernetes_metadata: ~ output.elasticsearch: hosts: [elasticsearch:9200] username: ${ELASTICSEARCH_USERNAME} password: ${ELASTICSEARCH_PASSWORD} index: microservices-%{yyyy.MM.dd} setup.kibana: host: kibana:5601 setup.ilm.enabled: auto setup.ilm.rollover_alias: microservices setup.ilm.pattern: {now/d}-000001 setup.ilm.policy_name: microservices-policy三、日志收集实现3.1 Logback日志配置?xml version1.0 encodingUTF-8? configuration springProperty scopecontext nameappName sourcespring.application.name/ appender nameJSON_CONSOLE classch.qos.logback.core.ConsoleAppender encoder classnet.logstash.logback.encoder.LogstashEncoder includeMdcKeyNametraceId/includeMdcKeyName includeMdcKeyNamespanId/includeMdcKeyName includeMdcKeyNameuserId/includeMdcKeyName includeMdcKeyNamerequestId/includeMdcKeyName customFields{service:${appName},version:${BUILD_VERSION}}/customFields shortenedLoggerNameLength36/shortenedLoggerNameLength /encoder /appender appender nameJSON_FILE classch.qos.logback.core.rolling.RollingFileAppender file/var/log/microservices/${appName}.log/file rollingPolicy classch.qos.logback.core.rolling.TimeBasedRollingPolicy fileNamePattern/var/log/microservices/${appName}.%d{yyyy-MM-dd}.log/fileNamePattern maxHistory30/maxHistory totalSizeCap10GB/totalSizeCap /rollingPolicy encoder classnet.logstash.logback.encoder.LogstashEncoder includeMdcKeyNametraceId/includeMdcKeyName includeMdcKeyNameuserId/includeMdcKeyName customFields{service:${appName}}/customFields /encoder /appender root levelINFO appender-ref refJSON_CONSOLE/ appender-ref refJSON_FILE/ /root logger namecom.example levelDEBUG/ logger nameorg.springframework levelINFO/ logger nameorg.hibernate levelWARN/ /configuration3.2 MDC追踪Configuration public class MDCLoggingFilter { Bean public FilterRegistrationBeanMDCFilter mdcFilter() { FilterRegistrationBeanMDCFilter registration new FilterRegistrationBean(); registration.setFilter(new MDCFilter()); registration.addUrlPatterns(/api/*); registration.setOrder(Ordered.HIGHEST_PRECEDENCE 1); return registration; } } public class MDCFilter implements Filter { Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { HttpServletRequest httpRequest (HttpServletRequest) request; HttpServletResponse httpResponse (HttpServletResponse) response; String traceId httpRequest.getHeader(X-Trace-Id); if (traceId null) { traceId UUID.randomUUID().toString(); } MDC.put(traceId, traceId); MDC.put(spanId, generateSpanId()); MDC.put(requestUri, httpRequest.getRequestURI()); MDC.put(requestMethod, httpRequest.getMethod()); MDC.put(remoteAddr, httpRequest.getRemoteAddr()); long startTime System.currentTimeMillis(); httpResponse.setHeader(X-Trace-Id, traceId); try { chain.doFilter(request, response); } finally { long duration System.currentTimeMillis() - startTime; MDC.put(duration, String.valueOf(duration)); if (duration 1000) { MDC.put(slowRequest, true); } log.info(Request completed); MDC.clear(); } } }四、Kibana配置4.1 索引模式配置{ attributes: { title: microservices-*, timeFieldName: timestamp, fields: [], runtimeFieldMap: {}, fieldFormatMap: {\duration\:{\id\:\number\,\params\:{\pattern\:\00:00:00.000\,\normalizedTz\:{\tzOffset\:0,\label\:\UTC\}}}}, sourceFilters: [], typeMeta: {} } }4.2 日志查询{ query: { bool: { must: [ { match_phrase: { service: order-service } }, { range: { timestamp: { gte: now-1h, lte: now } } }, { match: { level: ERROR } } ], should: [ { wildcard: { message: *payment* } } ] } } }五、总结日志聚合是微服务可观测性的重要组成部分。通过ELK Stack构建集中式日志系统可以实现统一的日志收集、存储和查询大大提升问题排查效率。
日志聚合方案完全指南
日志聚合方案完全指南前言在微服务架构中日志分散在各个服务实例中聚合日志对于问题排查和系统监控至关重要。本文将详细介绍日志聚合方案的设计与实现包括日志收集、存储和查询。一、日志聚合架构┌─────────────────────────────────────────────────────┐ │ Centralized Logging Architecture │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Service A│ │ Service B│ │ Service C│ │ │ └─────┬────┘ └─────┬────┘ └─────┬────┘ │ │ │ │ │ │ │ └─────────────┼─────────────┘ │ │ │ │ │ ┌───────▼───────┐ │ │ │ Fluentd/ │ │ │ │ Filebeat │ │ │ └───────┬───────┘ │ │ │ │ │ ┌───────▼───────┐ │ │ │ Elasticsearch │ │ │ │ (Store) │ │ │ └───────┬───────┘ │ │ │ │ │ ┌───────▼───────┐ │ │ │ Kibana │ │ │ │ (Visualize) │ │ │ └───────────────┘ │ └─────────────────────────────────────────────────────┘二、ELK Stack配置2.1 Elasticsearch配置# docker-compose.yml version: 3.8 services: elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0 container_name: elasticsearch environment: - node.nameelasticsearch - cluster.namedocker-cluster - bootstrap.memory_locktrue - discovery.typesingle-node - ES_JAVA_OPTS-Xms2g -Xmx2g - xpack.security.enabledtrue - xpack.security.http.ssl.enabledfalse ulimits: memlock: soft: -1 hard: -1 volumes: - es_data:/usr/share/elasticsearch/data ports: - 9200:9200 - 9300:9300 networks: - elastic volumes: es_data: driver: local networks: elastic: driver: bridge2.2 Logstash配置# logstash.conf input { beats { port 5044 host 0.0.0.0 } tcp { port 5000 codec json_lines } } filter { if [fields][service] order-service { grok { match { message %{TIMESTAMP_ISO8601:timestamp} \[%{DATA:thread}\] %{LOGLEVEL:level} %{DATA:class} - %{GREEDYDATA:log_message} } } } date { match [ timestamp, ISO8601 ] target timestamp } mutate { add_field { [metadata][index] microservices } } if [level] ERROR { mutate { add_tag [ error ] } } } output { if [metadata][index] microservices { elasticsearch { hosts [elasticsearch:9200] index microservices-%{YYYY.MM.dd} user elastic password ${ELASTIC_PASSWORD} } } stdout { codec rubydebug } }2.3 Filebeat配置# filebeat.yml filebeat.inputs: - type: log enabled: true paths: - /var/log/microservices/*.log json.keys_under_root: true json.add_error_key: true json.message_key: message fields: service: microservices environment: ${ENVIRONMENT:production} fields_under_root: true processors: - add_host_metadata: when.not.contains.tags: forwarded - add_cloud_metadata: ~ - add_docker_metadata: ~ - add_kubernetes_metadata: ~ output.elasticsearch: hosts: [elasticsearch:9200] username: ${ELASTICSEARCH_USERNAME} password: ${ELASTICSEARCH_PASSWORD} index: microservices-%{yyyy.MM.dd} setup.kibana: host: kibana:5601 setup.ilm.enabled: auto setup.ilm.rollover_alias: microservices setup.ilm.pattern: {now/d}-000001 setup.ilm.policy_name: microservices-policy三、日志收集实现3.1 Logback日志配置?xml version1.0 encodingUTF-8? configuration springProperty scopecontext nameappName sourcespring.application.name/ appender nameJSON_CONSOLE classch.qos.logback.core.ConsoleAppender encoder classnet.logstash.logback.encoder.LogstashEncoder includeMdcKeyNametraceId/includeMdcKeyName includeMdcKeyNamespanId/includeMdcKeyName includeMdcKeyNameuserId/includeMdcKeyName includeMdcKeyNamerequestId/includeMdcKeyName customFields{service:${appName},version:${BUILD_VERSION}}/customFields shortenedLoggerNameLength36/shortenedLoggerNameLength /encoder /appender appender nameJSON_FILE classch.qos.logback.core.rolling.RollingFileAppender file/var/log/microservices/${appName}.log/file rollingPolicy classch.qos.logback.core.rolling.TimeBasedRollingPolicy fileNamePattern/var/log/microservices/${appName}.%d{yyyy-MM-dd}.log/fileNamePattern maxHistory30/maxHistory totalSizeCap10GB/totalSizeCap /rollingPolicy encoder classnet.logstash.logback.encoder.LogstashEncoder includeMdcKeyNametraceId/includeMdcKeyName includeMdcKeyNameuserId/includeMdcKeyName customFields{service:${appName}}/customFields /encoder /appender root levelINFO appender-ref refJSON_CONSOLE/ appender-ref refJSON_FILE/ /root logger namecom.example levelDEBUG/ logger nameorg.springframework levelINFO/ logger nameorg.hibernate levelWARN/ /configuration3.2 MDC追踪Configuration public class MDCLoggingFilter { Bean public FilterRegistrationBeanMDCFilter mdcFilter() { FilterRegistrationBeanMDCFilter registration new FilterRegistrationBean(); registration.setFilter(new MDCFilter()); registration.addUrlPatterns(/api/*); registration.setOrder(Ordered.HIGHEST_PRECEDENCE 1); return registration; } } public class MDCFilter implements Filter { Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { HttpServletRequest httpRequest (HttpServletRequest) request; HttpServletResponse httpResponse (HttpServletResponse) response; String traceId httpRequest.getHeader(X-Trace-Id); if (traceId null) { traceId UUID.randomUUID().toString(); } MDC.put(traceId, traceId); MDC.put(spanId, generateSpanId()); MDC.put(requestUri, httpRequest.getRequestURI()); MDC.put(requestMethod, httpRequest.getMethod()); MDC.put(remoteAddr, httpRequest.getRemoteAddr()); long startTime System.currentTimeMillis(); httpResponse.setHeader(X-Trace-Id, traceId); try { chain.doFilter(request, response); } finally { long duration System.currentTimeMillis() - startTime; MDC.put(duration, String.valueOf(duration)); if (duration 1000) { MDC.put(slowRequest, true); } log.info(Request completed); MDC.clear(); } } }四、Kibana配置4.1 索引模式配置{ attributes: { title: microservices-*, timeFieldName: timestamp, fields: [], runtimeFieldMap: {}, fieldFormatMap: {\duration\:{\id\:\number\,\params\:{\pattern\:\00:00:00.000\,\normalizedTz\:{\tzOffset\:0,\label\:\UTC\}}}}, sourceFilters: [], typeMeta: {} } }4.2 日志查询{ query: { bool: { must: [ { match_phrase: { service: order-service } }, { range: { timestamp: { gte: now-1h, lte: now } } }, { match: { level: ERROR } } ], should: [ { wildcard: { message: *payment* } } ] } } }五、总结日志聚合是微服务可观测性的重要组成部分。通过ELK Stack构建集中式日志系统可以实现统一的日志收集、存储和查询大大提升问题排查效率。