package org.rocketmq.spring.boot.config;

import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.rocketmq.spring.boot.common.AbstractRocketMqConsumer;
import org.rocketmq.spring.boot.common.DefaultRocketMqProducer;
import org.rocketmq.spring.boot.common.RocketMqConsumerMBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.util.CollectionUtils;

@EnableConfigurationProperties({RocketMqProperties.class})
@Configuration
@ConditionalOnClass({DefaultMQPushConsumer.class})
/* loaded from: input_file:org/rocketmq/spring/boot/config/RocketMqAutoConfiguration.class */
public class RocketMqAutoConfiguration {
    private static final Logger log = LoggerFactory.getLogger(RocketMqAutoConfiguration.class);

    @Resource
    private RocketMqProperties rocketMqProperties;

    @ConditionalOnMissingBean({DefaultMQProducer.class})
    @ConditionalOnClass({DefaultMQProducer.class})
    @Bean
    public DefaultMQProducer mqProducer() {
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
        defaultMQProducer.setProducerGroup(this.rocketMqProperties.getProducerGroupName());
        defaultMQProducer.setNamesrvAddr(this.rocketMqProperties.getNameServer());
        defaultMQProducer.setSendMsgTimeout(this.rocketMqProperties.getProducerSendMsgTimeout());
        defaultMQProducer.setRetryTimesWhenSendFailed(this.rocketMqProperties.getProducerRetryTimesWhenSendFailed());
        defaultMQProducer.setRetryTimesWhenSendAsyncFailed(this.rocketMqProperties.getProducerRetryTimesWhenSendAsyncFailed());
        defaultMQProducer.setMaxMessageSize(this.rocketMqProperties.getProducerMaxMessageSize());
        defaultMQProducer.setCompressMsgBodyOverHowmuch(this.rocketMqProperties.getProducerCompressMsgBodyOverHowMuch());
        defaultMQProducer.setRetryAnotherBrokerWhenNotStoreOK(this.rocketMqProperties.isProducerRetryAnotherBrokerWhenNotStoreOk());
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("producer shutdown");
            defaultMQProducer.shutdown();
            log.info("producer has shutdown");
        }));
        try {
            defaultMQProducer.start();
            log.info("rocketmq producer started, nameserver:{}, group:{}", this.rocketMqProperties.getNameServer(), this.rocketMqProperties.getProducerGroupName());
        } catch (MQClientException e) {
            log.error("producer start error, nameserver:{}, group:{}", new Object[]{this.rocketMqProperties.getNameServer(), this.rocketMqProperties.getProducerGroupName(), e});
        }
        return defaultMQProducer;
    }

    @ConditionalOnMissingBean(name = {"defaultRocketMqProducer"})
    @ConditionalOnBean({DefaultMQProducer.class})
    @Bean(destroyMethod = "destroy")
    public DefaultRocketMqProducer defaultRocketMqProducer(@Qualifier("mqProducer") DefaultMQProducer defaultMQProducer) {
        DefaultRocketMqProducer defaultRocketMqProducer = new DefaultRocketMqProducer();
        defaultRocketMqProducer.setProducer(defaultMQProducer);
        return defaultRocketMqProducer;
    }

    @ConditionalOnMissingBean
    @ConditionalOnBean({AbstractRocketMqConsumer.class})
    @Bean
    @Order
    public RocketMqConsumerMBean rocketMqConsumerMBean(List<AbstractRocketMqConsumer> list) {
        RocketMqConsumerMBean rocketMqConsumerMBean = new RocketMqConsumerMBean();
        list.forEach(this::registerMQConsumer);
        rocketMqConsumerMBean.setConsumers(list);
        return rocketMqConsumerMBean;
    }

    private void registerMQConsumer(AbstractRocketMqConsumer abstractRocketMqConsumer) {
        Map<String, Set<String>> subscribeTopicTags = abstractRocketMqConsumer.subscribeTopicTags();
        DefaultMQPushConsumer consumer = abstractRocketMqConsumer.getConsumer();
        consumer.setNamesrvAddr(this.rocketMqProperties.getNameServer());
        subscribeTopicTags.entrySet().forEach(entry -> {
            try {
                String str = (String) entry.getKey();
                Set set = (Set) entry.getValue();
                if (CollectionUtils.isEmpty(set)) {
                    consumer.subscribe(str, "*");
                } else {
                    String join = StringUtils.join(set, " || ");
                    consumer.subscribe(str, join);
                    log.info("subscribe, topic:{}, tags:{}", str, join);
                }
            } catch (MQClientException e) {
                log.error("consumer subscribe error", e);
            }
        });
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("consumer shutdown");
            consumer.shutdown();
            log.info("consumer has shutdown");
        }));
        try {
            consumer.start();
            abstractRocketMqConsumer.setStarted(true);
            log.info("rocketmq consumer started, nameserver:{}, group:{}", this.rocketMqProperties.getNameServer(), abstractRocketMqConsumer.getConsumerGroup());
        } catch (MQClientException e) {
            log.error("consumer start error, nameserver:{}, group:{}", new Object[]{this.rocketMqProperties.getNameServer(), abstractRocketMqConsumer.getConsumerGroup(), e});
        }
    }
}
