apache/incubator-seata

optimize:优化seata-server日志采集配置

Open

#4648 opened on May 28, 2022

View on GitHub
 (1 comment) (2 reactions) (0 assignees)Java (25,960 stars) (8,878 forks)batch import
good first issue

Description

Why you need it?

Is your feature request related to a problem? Please describe in details 目前将seata-server日志采集到Kafka或者logstash需要手动修改logbcak-spring.xml,当需要采集到kafka时,kafka生产者的一些配置也需要在kafka-appender.xml,配置较为分散,不好统一管理,可以考虑将配置统一收到logging.extend管理。

How it could be?

A clear and concise description of what you want to happen. You can explain more about input of the feature, and output of it. 配置统一管理后,如果需要开启或关闭日志采集功能,不需要在手动修改logbcak-spring.xml,只需修改application.yml重新打包即可,也可以通过java -jar -Dxxx 开启或关闭日志上报功能,如果接入配置中心,只需修改配置中心配置,在重启seata-server服务即可,配置管理将十分方便。

Other related information

Add any other context or screenshots about the feature request here. 可以参照spring boot整合logback原理实现,伪代码如下:

package io.seata.server.logging.logback;

import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
import com.github.danielwegener.logback.kafka.KafkaAppender;
import io.seata.common.util.StringUtils;
import io.seata.server.logging.logback.appender.EnhancedLogstashEncoder;
import net.logstash.logback.appender.LogstashTcpSocketAppender;
import org.slf4j.ILoggerFactory;
import org.slf4j.impl.StaticLoggerBinder;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.context.event.ApplicationEnvironmentPreparedEvent;
import org.springframework.boot.context.logging.LoggingApplicationListener;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.GenericApplicationListener;
import org.springframework.core.ResolvableType;
import org.springframework.core.env.ConfigurableEnvironment;

import java.util.Objects;

/**
 * @author wlx
 * @date 2022/5/27 11:18 下午
 */
public class SeataLogbackLoggingExtendListener implements GenericApplicationListener {


    private static final String ROOT = "ROOT";

    private static final String KAFKA_TOPIC = "logging.extend.kafka-appender.topic";

    private static final String KAFKA_BOOTSTRAP_SERVERS = "logging.extend.kafka-appender.bootstrap-servers";

    private static final String LOGSTASH_DESTINATION = "logging.extend.logstash-appender.destination";

    private static final String APP_NAME = "spring.application.name";

    private static final String DEFAULT_APP_NAME = "seata-server";

    private static final String LOGSTASH = "LOGSTASH";

    private static final String KAFKA = "KAFKA";

    private static final String KAFKA_KAFKA_PATTERN = "{\n" +
            "    \"@timestamp\": \"%d{yyyy-MM-dd HH:mm:ss.SSS}\",\n" +
            "    \"level\":\"%p\",\n" +
            "    \"app_name\":\"${APPLICATION_NAME:-seata-server}\",\n" +
            "    \"PORT\": ${RPC_PORT:-0},\n" +
            "    \"thread_name\": \"%t\",\n" +
            "    \"logger_name\": \"%logger\",\n" +
            "    \"X-TX-XID\": \"%X{X-TX-XID:-}\",\n" +
            "    \"X-TX-BRANCH-ID\": \"%X{X-TX-BRANCH-ID:-}\",\n" +
            "    \"message\": \"%m\",\n" +
            "    \"stack_trace\": \"%wex\"\n" +
            "}";


    @Override
    public boolean supportsSourceType(Class<?> sourceType) {
        if (Objects.isNull(sourceType)) {
            return false;
        } else {
            return SpringApplication.class.isAssignableFrom(sourceType)
                    || ApplicationContext.class.isAssignableFrom(sourceType);
        }
    }

    @Override
    public boolean supportsEventType(ResolvableType eventType) {
        Class<?> typeRawClass = eventType.getRawClass();
        if (Objects.isNull(typeRawClass)) {
            return false;
        } else {
            return ApplicationEnvironmentPreparedEvent.class.isAssignableFrom(typeRawClass);
        }
    }

    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        if (event instanceof ApplicationEnvironmentPreparedEvent) {
            ILoggerFactory loggerFactory = StaticLoggerBinder.getSingleton().getLoggerFactory();
            LoggerContext loggerContext = (LoggerContext) loggerFactory;

            ConfigurableEnvironment environment = ((ApplicationEnvironmentPreparedEvent) event).getEnvironment();
            // 添加LogstashAppender
            appendLogstashAppender(loggerContext, environment);
            // 添加KafkaAppender
            appendKafkaAppender(loggerContext, environment);

        }
    }

    @Override
    public int getOrder() {
        return LoggingApplicationListener.DEFAULT_ORDER + 2;
    }

    @SuppressWarnings({"rawtypes", "unchecked"})
    private void appendKafkaAppender(LoggerContext loggerContext, ConfigurableEnvironment environment) {
        String kafkaBootstrapServer = environment.getProperty(KAFKA_BOOTSTRAP_SERVERS);
        String kafkaTopic = environment.getProperty(KAFKA_TOPIC);
        if (StringUtils.isNullOrEmpty(kafkaBootstrapServer) || StringUtils.isNullOrEmpty(kafkaTopic)) {
            return;
        }
        Logger root = loggerContext.getLogger(ROOT);

        KafkaAppender kafkaAppender = new KafkaAppender();
        kafkaAppender.setContext(loggerContext);
        kafkaAppender.setName(KAFKA);
        kafkaAppender.setTopic(kafkaTopic);
        kafkaAppender.setKeyingStrategy(new com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy());
        kafkaAppender.setDeliveryStrategy(new com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy());
        kafkaAppender.addProducerConfigValue("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS);
        kafkaAppender.addProducerConfigValue("acks", 0);
        kafkaAppender.addProducerConfigValue("linger.ms", 1000);
        kafkaAppender.addProducerConfigValue("max.block.ms", 0);

        PatternLayoutEncoder encoder = new PatternLayoutEncoder();
        encoder.setPattern(KAFKA_KAFKA_PATTERN);
        encoder.setContext(loggerContext);
        kafkaAppender.setEncoder(encoder);
        encoder.start();
        kafkaAppender.start();

        root.addAppender(kafkaAppender);
    }

    private void appendLogstashAppender(LoggerContext loggerContext, ConfigurableEnvironment environment) {
        String logstashDestination = environment.getProperty(LOGSTASH_DESTINATION);
        if (StringUtils.isNullOrEmpty(logstashDestination)) {
            return;
        }
        Logger root = loggerContext.getLogger(ROOT);
        LogstashTcpSocketAppender logstashTcpSocketAppender = new LogstashTcpSocketAppender();
        logstashTcpSocketAppender.setName(LOGSTASH);
        logstashTcpSocketAppender.addDestination(logstashDestination);
        logstashTcpSocketAppender.setContext(loggerContext);
        EnhancedLogstashEncoder encoder = new EnhancedLogstashEncoder();
        String appName = environment.getProperty(APP_NAME, DEFAULT_APP_NAME);
        encoder.setCustomFields(appName);
        encoder.setExcludeProvider("net.logstash.logback.composite.LogstashVersionJsonProvider");
        encoder.setExcludeProvider("net.logstash.logback.composite.loggingevent.JsonMessageJsonProvider");
        encoder.setExcludeProvider("net.logstash.logback.composite.loggingevent.TagsJsonProvider");
        encoder.setExcludeProvider("net.logstash.logback.composite.loggingevent.LogstashMarkersJsonProvider");
        encoder.setExcludeProvider("net.logstash.logback.composite.loggingevent.ArgumentsJsonProvider");
        encoder.setContext(loggerContext);
        encoder.start();
        logstashTcpSocketAppender.setEncoder(encoder);

        logstashTcpSocketAppender.start();
        root.addAppender(logstashTcpSocketAppender);
    }


}

Contributor guide