package com.alibaba.rocketmq.filtersrv;

import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.common.MixAll;
import com.alibaba.rocketmq.common.ThreadFactoryImpl;
import com.alibaba.rocketmq.common.UtilAll;
import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
import com.alibaba.rocketmq.filtersrv.filter.FilterClassManager;
import com.alibaba.rocketmq.filtersrv.processor.DefaultRequestProcessor;
import com.alibaba.rocketmq.filtersrv.stats.FilterServerStatsManager;
import com.alibaba.rocketmq.remoting.RemotingServer;
import com.alibaba.rocketmq.remoting.netty.NettyRemotingServer;
import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/rocketmq/filtersrv/FiltersrvController.class */
public class FiltersrvController {
    private static final Logger log = LoggerFactory.getLogger("RocketmqFiltersrv");
    private final FiltersrvConfig filtersrvConfig;
    private final NettyServerConfig nettyServerConfig;
    private RemotingServer remotingServer;
    private ExecutorService remotingExecutor;
    private final FilterServerOuterAPI filterServerOuterAPI = new FilterServerOuterAPI();
    private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer("FILTERSRV_CONSUMER");
    private volatile String brokerName = null;
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSScheduledThread"));
    private final FilterServerStatsManager filterServerStatsManager = new FilterServerStatsManager();
    private final FilterClassManager filterClassManager = new FilterClassManager(this);

    public FiltersrvController(FiltersrvConfig filtersrvConfig, NettyServerConfig nettyServerConfig) {
        this.filtersrvConfig = filtersrvConfig;
        this.nettyServerConfig = nettyServerConfig;
    }

    public boolean initialize() {
        MixAll.printObjectProperties(log, this.filtersrvConfig);
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig);
        this.remotingExecutor = Executors.newFixedThreadPool(this.nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
        registerProcessor();
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: com.alibaba.rocketmq.filtersrv.FiltersrvController.1
            @Override // java.lang.Runnable
            public void run() {
                FiltersrvController.this.registerFilterServerToBroker();
            }
        }, 3L, 10L, TimeUnit.SECONDS);
        this.defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis() - 1000);
        this.defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() - 1000);
        this.defaultMQPullConsumer.setNamesrvAddr(this.filtersrvConfig.getNamesrvAddr());
        this.defaultMQPullConsumer.setInstanceName(String.valueOf(UtilAll.getPid()));
        return true;
    }

    public String localAddr() {
        return String.format("%s:%d", this.filtersrvConfig.getFilterServerIP(), Integer.valueOf(this.remotingServer.localListenPort()));
    }

    public void registerFilterServerToBroker() {
        try {
            RegisterFilterServerResponseHeader registerFilterServerToBroker = this.filterServerOuterAPI.registerFilterServerToBroker(this.filtersrvConfig.getConnectWhichBroker(), localAddr());
            this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setDefaultBrokerId(registerFilterServerToBroker.getBrokerId());
            if (null == this.brokerName) {
                this.brokerName = registerFilterServerToBroker.getBrokerName();
            }
            log.info("register filter server<{}> to broker<{}> OK, Return: {} {}", new Object[]{localAddr(), this.filtersrvConfig.getConnectWhichBroker(), registerFilterServerToBroker.getBrokerName(), Long.valueOf(registerFilterServerToBroker.getBrokerId())});
        } catch (Exception e) {
            log.warn("register filter server Exception", e);
            log.warn("access broker failed, kill oneself");
            System.exit(-1);
        }
    }

    private void registerProcessor() {
        this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
    }

    public void start() throws Exception {
        this.defaultMQPullConsumer.start();
        this.remotingServer.start();
        this.filterServerOuterAPI.start();
        this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);
        this.filterClassManager.start();
        this.filterServerStatsManager.start();
    }

    public void shutdown() {
        this.remotingServer.shutdown();
        this.remotingExecutor.shutdown();
        this.scheduledExecutorService.shutdown();
        this.defaultMQPullConsumer.shutdown();
        this.filterServerOuterAPI.shutdown();
        this.filterClassManager.shutdown();
        this.filterServerStatsManager.shutdown();
    }

    public RemotingServer getRemotingServer() {
        return this.remotingServer;
    }

    public void setRemotingServer(RemotingServer remotingServer) {
        this.remotingServer = remotingServer;
    }

    public ExecutorService getRemotingExecutor() {
        return this.remotingExecutor;
    }

    public void setRemotingExecutor(ExecutorService executorService) {
        this.remotingExecutor = executorService;
    }

    public FiltersrvConfig getFiltersrvConfig() {
        return this.filtersrvConfig;
    }

    public NettyServerConfig getNettyServerConfig() {
        return this.nettyServerConfig;
    }

    public ScheduledExecutorService getScheduledExecutorService() {
        return this.scheduledExecutorService;
    }

    public FilterServerOuterAPI getFilterServerOuterAPI() {
        return this.filterServerOuterAPI;
    }

    public FilterClassManager getFilterClassManager() {
        return this.filterClassManager;
    }

    public DefaultMQPullConsumer getDefaultMQPullConsumer() {
        return this.defaultMQPullConsumer;
    }

    public String getBrokerName() {
        return this.brokerName;
    }

    public void setBrokerName(String str) {
        this.brokerName = str;
    }

    public FilterServerStatsManager getFilterServerStatsManager() {
        return this.filterServerStatsManager;
    }
}
