/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.rocketmq.broker;

import com.alibaba.rocketmq.broker.BrokerPathConfigHelper;
import com.alibaba.rocketmq.broker.client.ClientHousekeepingService;
import com.alibaba.rocketmq.broker.client.ConsumerIdsChangeListener;
import com.alibaba.rocketmq.broker.client.ConsumerManager;
import com.alibaba.rocketmq.broker.client.DefaultConsumerIdsChangeListener;
import com.alibaba.rocketmq.broker.client.ProducerManager;
import com.alibaba.rocketmq.broker.client.net.Broker2Client;
import com.alibaba.rocketmq.broker.client.rebalance.RebalanceLockManager;
import com.alibaba.rocketmq.broker.filtersrv.FilterServerManager;
import com.alibaba.rocketmq.broker.longpolling.PullRequestHoldService;
import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook;
import com.alibaba.rocketmq.broker.mqtrace.SendMessageHook;
import com.alibaba.rocketmq.broker.offset.ConsumerOffsetManager;
import com.alibaba.rocketmq.broker.out.BrokerOuterAPI;
import com.alibaba.rocketmq.broker.processor.AdminBrokerProcessor;
import com.alibaba.rocketmq.broker.processor.ClientManageProcessor;
import com.alibaba.rocketmq.broker.processor.EndTransactionProcessor;
import com.alibaba.rocketmq.broker.processor.PullMessageProcessor;
import com.alibaba.rocketmq.broker.processor.QueryMessageProcessor;
import com.alibaba.rocketmq.broker.processor.SendMessageProcessor;
import com.alibaba.rocketmq.broker.slave.SlaveSynchronize;
import com.alibaba.rocketmq.broker.subscription.SubscriptionGroupManager;
import com.alibaba.rocketmq.broker.topic.TopicConfigManager;
import com.alibaba.rocketmq.common.BrokerConfig;
import com.alibaba.rocketmq.common.DataVersion;
import com.alibaba.rocketmq.common.MixAll;
import com.alibaba.rocketmq.common.ThreadFactoryImpl;
import com.alibaba.rocketmq.common.TopicConfig;
import com.alibaba.rocketmq.common.UtilAll;
import com.alibaba.rocketmq.common.constant.PermName;
import com.alibaba.rocketmq.common.namesrv.RegisterBrokerResult;
import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import com.alibaba.rocketmq.remoting.ChannelEventListener;
import com.alibaba.rocketmq.remoting.RPCHook;
import com.alibaba.rocketmq.remoting.RemotingServer;
import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
import com.alibaba.rocketmq.remoting.netty.NettyRemotingServer;
import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
import com.alibaba.rocketmq.store.DefaultMessageStore;
import com.alibaba.rocketmq.store.MessageStore;
import com.alibaba.rocketmq.store.config.BrokerRole;
import com.alibaba.rocketmq.store.config.MessageStoreConfig;
import com.alibaba.rocketmq.store.stats.BrokerStats;
import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BrokerController {
    private static final Logger log = LoggerFactory.getLogger((String)"RocketmqBroker");
    private final BrokerConfig brokerConfig;
    private final NettyServerConfig nettyServerConfig;
    private final NettyClientConfig nettyClientConfig;
    private final MessageStoreConfig messageStoreConfig;
    private final DataVersion configDataVersion = new DataVersion();
    private final ConsumerOffsetManager consumerOffsetManager;
    private final ConsumerManager consumerManager;
    private final ProducerManager producerManager;
    private final ClientHousekeepingService clientHousekeepingService;
    private final PullMessageProcessor pullMessageProcessor;
    private final PullRequestHoldService pullRequestHoldService;
    private final Broker2Client broker2Client;
    private final SubscriptionGroupManager subscriptionGroupManager;
    private final ConsumerIdsChangeListener consumerIdsChangeListener;
    private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
    private final BrokerOuterAPI brokerOuterAPI;
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new ThreadFactoryImpl("BrokerControllerScheduledThread"));
    private final SlaveSynchronize slaveSynchronize;
    private MessageStore messageStore;
    private RemotingServer remotingServer;
    private TopicConfigManager topicConfigManager;
    private ExecutorService sendMessageExecutor;
    private ExecutorService pullMessageExecutor;
    private ExecutorService adminBrokerExecutor;
    private ExecutorService clientManageExecutor;
    private boolean updateMasterHAServerAddrPeriodically = false;
    private BrokerStats brokerStats;
    private final BlockingQueue<Runnable> sendThreadPoolQueue;
    private final BlockingQueue<Runnable> pullThreadPoolQueue;
    private final FilterServerManager filterServerManager;
    private final BrokerStatsManager brokerStatsManager;
    private InetSocketAddress storeHost;
    private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
    private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();

    public BrokerController(BrokerConfig brokerConfig, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig, MessageStoreConfig messageStoreConfig) {
        this.brokerConfig = brokerConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.nettyClientConfig = nettyClientConfig;
        this.messageStoreConfig = messageStoreConfig;
        this.consumerOffsetManager = new ConsumerOffsetManager(this);
        this.topicConfigManager = new TopicConfigManager(this);
        this.pullMessageProcessor = new PullMessageProcessor(this);
        this.pullRequestHoldService = new PullRequestHoldService(this);
        this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
        this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
        this.producerManager = new ProducerManager();
        this.clientHousekeepingService = new ClientHousekeepingService(this);
        this.broker2Client = new Broker2Client(this);
        this.subscriptionGroupManager = new SubscriptionGroupManager(this);
        this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
        this.filterServerManager = new FilterServerManager(this);
        if (this.brokerConfig.getNamesrvAddr() != null) {
            this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
            log.info("user specfied name server address: {}", (Object)this.brokerConfig.getNamesrvAddr());
        }
        this.slaveSynchronize = new SlaveSynchronize(this);
        this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
        this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
        this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
        this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
    }

    public boolean initialize() {
        boolean result = true;
        result = result && this.topicConfigManager.load();
        result = result && this.consumerOffsetManager.load();
        boolean bl = result = result && this.subscriptionGroupManager.load();
        if (result) {
            try {
                this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager);
            }
            catch (IOException e) {
                result = false;
                e.printStackTrace();
            }
        }
        boolean bl2 = result = result && this.messageStore.load();
        if (result) {
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, (ChannelEventListener)this.clientHousekeepingService);
            this.sendMessageExecutor = new ThreadPoolExecutor(this.brokerConfig.getSendMessageThreadPoolNums(), this.brokerConfig.getSendMessageThreadPoolNums(), 60000L, TimeUnit.MILLISECONDS, this.sendThreadPoolQueue, (ThreadFactory)new ThreadFactoryImpl("SendMessageThread_"));
            this.pullMessageExecutor = new ThreadPoolExecutor(this.brokerConfig.getPullMessageThreadPoolNums(), this.brokerConfig.getPullMessageThreadPoolNums(), 60000L, TimeUnit.MILLISECONDS, this.pullThreadPoolQueue, (ThreadFactory)new ThreadFactoryImpl("PullMessageThread_"));
            this.adminBrokerExecutor = Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), (ThreadFactory)new ThreadFactoryImpl("AdminBrokerThread_"));
            this.clientManageExecutor = Executors.newFixedThreadPool(this.brokerConfig.getClientManageThreadPoolNums(), (ThreadFactory)new ThreadFactoryImpl("ClientManageThread_"));
            this.registerProcessor();
            this.brokerStats = new BrokerStats((DefaultMessageStore)this.messageStore);
            long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
            long period = 86400000L;
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

                @Override
                public void run() {
                    try {
                        BrokerController.this.getBrokerStats().record();
                    }
                    catch (Exception e) {
                        log.error("schedule record error.", (Throwable)e);
                    }
                }
            }, initialDelay, 86400000L, TimeUnit.MILLISECONDS);
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerOffsetManager.persist();
                    }
                    catch (Exception e) {
                        log.error("schedule persist consumerOffset error.", (Throwable)e);
                    }
                }
            }, 10000L, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerOffsetManager.scanUnsubscribedTopic();
                    }
                    catch (Exception e) {
                        log.error("schedule scanUnsubscribedTopic error.", (Throwable)e);
                    }
                }
            }, 10L, 60L, TimeUnit.MINUTES);
            if (this.brokerConfig.getNamesrvAddr() != null) {
                this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
            } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                        }
                        catch (Exception e) {
                            log.error("ScheduledTask fetchNameServerAddr exception", (Throwable)e);
                        }
                    }
                }, 10000L, 120000L, TimeUnit.MILLISECONDS);
            }
            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                    this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                    this.updateMasterHAServerAddrPeriodically = false;
                } else {
                    this.updateMasterHAServerAddrPeriodically = true;
                }
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            BrokerController.this.slaveSynchronize.syncAll();
                        }
                        catch (Exception e) {
                            log.error("ScheduledTask syncAll slave exception", (Throwable)e);
                        }
                    }
                }, 10000L, 60000L, TimeUnit.MILLISECONDS);
            } else {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            BrokerController.this.printMasterAndSlaveDiff();
                        }
                        catch (Exception e) {
                            log.error("schedule printMasterAndSlaveDiff error.", (Throwable)e);
                        }
                    }
                }, 10000L, 60000L, TimeUnit.MILLISECONDS);
            }
        }
        return result;
    }

    public void registerProcessor() {
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        sendProcessor.registerSendMessageHook(this.sendMessageHookList);
        this.remotingServer.registerProcessor(10, (NettyRequestProcessor)sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(310, (NettyRequestProcessor)sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(36, (NettyRequestProcessor)sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(11, (NettyRequestProcessor)this.pullMessageProcessor, this.pullMessageExecutor);
        this.pullMessageProcessor.registerConsumeMessageHook(this.consumeMessageHookList);
        QueryMessageProcessor queryProcessor = new QueryMessageProcessor(this);
        this.remotingServer.registerProcessor(12, (NettyRequestProcessor)queryProcessor, this.pullMessageExecutor);
        this.remotingServer.registerProcessor(33, (NettyRequestProcessor)queryProcessor, this.pullMessageExecutor);
        ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
        clientProcessor.registerConsumeMessageHook(this.consumeMessageHookList);
        this.remotingServer.registerProcessor(34, (NettyRequestProcessor)clientProcessor, this.clientManageExecutor);
        this.remotingServer.registerProcessor(35, (NettyRequestProcessor)clientProcessor, this.clientManageExecutor);
        this.remotingServer.registerProcessor(38, (NettyRequestProcessor)clientProcessor, this.clientManageExecutor);
        this.remotingServer.registerProcessor(15, (NettyRequestProcessor)clientProcessor, this.clientManageExecutor);
        this.remotingServer.registerProcessor(14, (NettyRequestProcessor)clientProcessor, this.clientManageExecutor);
        this.remotingServer.registerProcessor(37, (NettyRequestProcessor)new EndTransactionProcessor(this), this.sendMessageExecutor);
        AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
        this.remotingServer.registerDefaultProcessor((NettyRequestProcessor)adminProcessor, this.adminBrokerExecutor);
    }

    public Broker2Client getBroker2Client() {
        return this.broker2Client;
    }

    public BrokerConfig getBrokerConfig() {
        return this.brokerConfig;
    }

    public String getConfigDataVersion() {
        return this.configDataVersion.toJson();
    }

    public ConsumerManager getConsumerManager() {
        return this.consumerManager;
    }

    public ConsumerOffsetManager getConsumerOffsetManager() {
        return this.consumerOffsetManager;
    }

    public MessageStore getMessageStore() {
        return this.messageStore;
    }

    public void setMessageStore(MessageStore messageStore) {
        this.messageStore = messageStore;
    }

    public MessageStoreConfig getMessageStoreConfig() {
        return this.messageStoreConfig;
    }

    public NettyServerConfig getNettyServerConfig() {
        return this.nettyServerConfig;
    }

    public ProducerManager getProducerManager() {
        return this.producerManager;
    }

    public PullMessageProcessor getPullMessageProcessor() {
        return this.pullMessageProcessor;
    }

    public PullRequestHoldService getPullRequestHoldService() {
        return this.pullRequestHoldService;
    }

    public RemotingServer getRemotingServer() {
        return this.remotingServer;
    }

    public void setRemotingServer(RemotingServer remotingServer) {
        this.remotingServer = remotingServer;
    }

    public SubscriptionGroupManager getSubscriptionGroupManager() {
        return this.subscriptionGroupManager;
    }

    public void shutdown() {
        if (this.brokerStatsManager != null) {
            this.brokerStatsManager.shutdown();
        }
        if (this.clientHousekeepingService != null) {
            this.clientHousekeepingService.shutdown();
        }
        if (this.pullRequestHoldService != null) {
            this.pullRequestHoldService.shutdown();
        }
        if (this.remotingServer != null) {
            this.remotingServer.shutdown();
        }
        if (this.messageStore != null) {
            this.messageStore.shutdown();
        }
        this.scheduledExecutorService.shutdown();
        try {
            this.scheduledExecutorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        this.unregisterBrokerAll();
        if (this.sendMessageExecutor != null) {
            this.sendMessageExecutor.shutdown();
        }
        if (this.pullMessageExecutor != null) {
            this.pullMessageExecutor.shutdown();
        }
        if (this.adminBrokerExecutor != null) {
            this.adminBrokerExecutor.shutdown();
        }
        if (this.brokerOuterAPI != null) {
            this.brokerOuterAPI.shutdown();
        }
        this.consumerOffsetManager.persist();
        if (this.filterServerManager != null) {
            this.filterServerManager.shutdown();
        }
    }

    private void unregisterBrokerAll() {
        this.brokerOuterAPI.unregisterBrokerAll(this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId());
    }

    public String getBrokerAddr() {
        String addr = this.brokerConfig.getBrokerIP1() + ":" + this.nettyServerConfig.getListenPort();
        return addr;
    }

    public void start() throws Exception {
        if (this.messageStore != null) {
            this.messageStore.start();
        }
        if (this.remotingServer != null) {
            this.remotingServer.start();
        }
        if (this.brokerOuterAPI != null) {
            this.brokerOuterAPI.start();
        }
        if (this.pullRequestHoldService != null) {
            this.pullRequestHoldService.start();
        }
        if (this.clientHousekeepingService != null) {
            this.clientHousekeepingService.start();
        }
        if (this.filterServerManager != null) {
            this.filterServerManager.start();
        }
        this.registerBrokerAll(true, false);
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false);
                }
                catch (Exception e) {
                    log.error("registerBrokerAll Exception", (Throwable)e);
                }
            }
        }, 10000L, 30000L, TimeUnit.MILLISECONDS);
        if (this.brokerStatsManager != null) {
            this.brokerStatsManager.start();
        }
        this.addDeleteTopicTask();
    }

    public synchronized void registerBrokerAll(boolean checkOrderConfig, boolean oneway) {
        RegisterBrokerResult registerBrokerResult;
        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
        if (!PermName.isWriteable((int)this.getBrokerConfig().getBrokerPermission()) || !PermName.isReadable((int)this.getBrokerConfig().getBrokerPermission())) {
            ConcurrentHashMap topicConfigTable = new ConcurrentHashMap(topicConfigWrapper.getTopicConfigTable());
            for (TopicConfig topicConfig : topicConfigTable.values()) {
                topicConfig.setPerm(this.getBrokerConfig().getBrokerPermission());
            }
            topicConfigWrapper.setTopicConfigTable(topicConfigTable);
        }
        if ((registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.getHAServerAddr(), topicConfigWrapper, this.filterServerManager.buildNewFilterServerList(), oneway)) != null) {
            if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
            }
            this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
            if (checkOrderConfig) {
                this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
            }
        }
    }

    public TopicConfigManager getTopicConfigManager() {
        return this.topicConfigManager;
    }

    public void setTopicConfigManager(TopicConfigManager topicConfigManager) {
        this.topicConfigManager = topicConfigManager;
    }

    public String getHAServerAddr() {
        String addr = this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort();
        return addr;
    }

    public void updateAllConfig(Properties properties) {
        MixAll.properties2Object((Properties)properties, (Object)this.brokerConfig);
        MixAll.properties2Object((Properties)properties, (Object)this.nettyServerConfig);
        MixAll.properties2Object((Properties)properties, (Object)this.nettyClientConfig);
        MixAll.properties2Object((Properties)properties, (Object)this.messageStoreConfig);
        this.configDataVersion.nextVersion();
        this.flushAllConfig();
    }

    private void flushAllConfig() {
        String allConfig = this.encodeAllConfig();
        try {
            MixAll.string2File((String)allConfig, (String)BrokerPathConfigHelper.getBrokerConfigPath());
            log.info("flush broker config, {} OK", (Object)BrokerPathConfigHelper.getBrokerConfigPath());
        }
        catch (IOException e) {
            log.info("flush broker config Exception, " + BrokerPathConfigHelper.getBrokerConfigPath(), (Throwable)e);
        }
    }

    public String encodeAllConfig() {
        StringBuilder sb = new StringBuilder();
        Properties properties = MixAll.object2Properties((Object)this.brokerConfig);
        if (properties != null) {
            sb.append(MixAll.properties2String((Properties)properties));
        } else {
            log.error("encodeAllConfig object2Properties error");
        }
        properties = MixAll.object2Properties((Object)this.messageStoreConfig);
        if (properties != null) {
            sb.append(MixAll.properties2String((Properties)properties));
        } else {
            log.error("encodeAllConfig object2Properties error");
        }
        properties = MixAll.object2Properties((Object)this.nettyServerConfig);
        if (properties != null) {
            sb.append(MixAll.properties2String((Properties)properties));
        } else {
            log.error("encodeAllConfig object2Properties error");
        }
        properties = MixAll.object2Properties((Object)this.nettyClientConfig);
        if (properties != null) {
            sb.append(MixAll.properties2String((Properties)properties));
        } else {
            log.error("encodeAllConfig object2Properties error");
        }
        return sb.toString();
    }

    public RebalanceLockManager getRebalanceLockManager() {
        return this.rebalanceLockManager;
    }

    public SlaveSynchronize getSlaveSynchronize() {
        return this.slaveSynchronize;
    }

    public BrokerOuterAPI getBrokerOuterAPI() {
        return this.brokerOuterAPI;
    }

    public ExecutorService getPullMessageExecutor() {
        return this.pullMessageExecutor;
    }

    public void setPullMessageExecutor(ExecutorService pullMessageExecutor) {
        this.pullMessageExecutor = pullMessageExecutor;
    }

    public BrokerStats getBrokerStats() {
        return this.brokerStats;
    }

    public void setBrokerStats(BrokerStats brokerStats) {
        this.brokerStats = brokerStats;
    }

    public BlockingQueue<Runnable> getSendThreadPoolQueue() {
        return this.sendThreadPoolQueue;
    }

    public FilterServerManager getFilterServerManager() {
        return this.filterServerManager;
    }

    public BrokerStatsManager getBrokerStatsManager() {
        return this.brokerStatsManager;
    }

    private void printMasterAndSlaveDiff() {
        long diff = this.messageStore.slaveFallBehindMuch();
        log.info("slave fall behind master, how much, {} bytes", (Object)diff);
    }

    public void addDeleteTopicTask() {
        this.scheduledExecutorService.schedule(new Runnable(){

            @Override
            public void run() {
                int removedTopicCnt = BrokerController.this.messageStore.cleanUnusedTopic(BrokerController.this.getTopicConfigManager().getTopicConfigTable().keySet());
                log.info("addDeleteTopicTask removed topic count {}", (Object)removedTopicCnt);
            }
        }, 5L, TimeUnit.MINUTES);
    }

    public void registerSendMessageHook(SendMessageHook hook) {
        this.sendMessageHookList.add(hook);
        log.info("register SendMessageHook Hook, {}", (Object)hook.hookName());
    }

    public void registerConsumeMessageHook(ConsumeMessageHook hook) {
        this.consumeMessageHookList.add(hook);
        log.info("register ConsumeMessageHook Hook, {}", (Object)hook.hookName());
    }

    public void registerServerRPCHook(RPCHook rpcHook) {
        this.getRemotingServer().registerRPCHook(rpcHook);
    }

    public void registerClientRPCHook(RPCHook rpcHook) {
        this.getBrokerOuterAPI().registerRPCHook(rpcHook);
    }

    public InetSocketAddress getStoreHost() {
        return this.storeHost;
    }

    public void setStoreHost(InetSocketAddress storeHost) {
        this.storeHost = storeHost;
    }
}

