/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.rocketmq.broker.processor;

import com.alibaba.rocketmq.broker.BrokerController;
import com.alibaba.rocketmq.broker.client.ClientChannelInfo;
import com.alibaba.rocketmq.broker.client.ConsumerGroupInfo;
import com.alibaba.rocketmq.common.MQVersion;
import com.alibaba.rocketmq.common.MixAll;
import com.alibaba.rocketmq.common.TopicConfig;
import com.alibaba.rocketmq.common.UtilAll;
import com.alibaba.rocketmq.common.admin.ConsumeStats;
import com.alibaba.rocketmq.common.admin.OffsetWrapper;
import com.alibaba.rocketmq.common.admin.TopicOffset;
import com.alibaba.rocketmq.common.admin.TopicStatsTable;
import com.alibaba.rocketmq.common.message.MessageDecoder;
import com.alibaba.rocketmq.common.message.MessageId;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.common.protocol.body.BrokerStatsData;
import com.alibaba.rocketmq.common.protocol.body.BrokerStatsItem;
import com.alibaba.rocketmq.common.protocol.body.Connection;
import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection;
import com.alibaba.rocketmq.common.protocol.body.GroupList;
import com.alibaba.rocketmq.common.protocol.body.KVTable;
import com.alibaba.rocketmq.common.protocol.body.LockBatchRequestBody;
import com.alibaba.rocketmq.common.protocol.body.LockBatchResponseBody;
import com.alibaba.rocketmq.common.protocol.body.ProducerConnection;
import com.alibaba.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
import com.alibaba.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
import com.alibaba.rocketmq.common.protocol.body.QueueTimeSpan;
import com.alibaba.rocketmq.common.protocol.body.TopicList;
import com.alibaba.rocketmq.common.protocol.body.UnlockBatchRequestBody;
import com.alibaba.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.CreateTopicRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.GetAllTopicConfigResponseHeader;
import com.alibaba.rocketmq.common.protocol.header.GetBrokerConfigResponseHeader;
import com.alibaba.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
import com.alibaba.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
import com.alibaba.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
import com.alibaba.rocketmq.common.protocol.header.GetProducerConnectionListRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader;
import com.alibaba.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import com.alibaba.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
import com.alibaba.rocketmq.common.stats.StatsItem;
import com.alibaba.rocketmq.common.stats.StatsSnapshot;
import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
import com.alibaba.rocketmq.store.DefaultMessageStore;
import com.alibaba.rocketmq.store.SelectMapedBufferResult;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.io.UnsupportedEncodingException;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AdminBrokerProcessor
implements NettyRequestProcessor {
    private static final Logger log = LoggerFactory.getLogger((String)"RocketmqBroker");
    private final BrokerController brokerController;

    public AdminBrokerProcessor(BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        switch (request.getCode()) {
            case 17: {
                return this.updateAndCreateTopic(ctx, request);
            }
            case 215: {
                return this.deleteTopic(ctx, request);
            }
            case 21: {
                return this.getAllTopicConfig(ctx, request);
            }
            case 25: {
                return this.updateBrokerConfig(ctx, request);
            }
            case 26: {
                return this.getBrokerConfig(ctx, request);
            }
            case 29: {
                return this.searchOffsetByTimestamp(ctx, request);
            }
            case 30: {
                return this.getMaxOffset(ctx, request);
            }
            case 31: {
                return this.getMinOffset(ctx, request);
            }
            case 32: {
                return this.getEarliestMsgStoretime(ctx, request);
            }
            case 28: {
                return this.getBrokerRuntimeInfo(ctx, request);
            }
            case 41: {
                return this.lockBatchMQ(ctx, request);
            }
            case 42: {
                return this.unlockBatchMQ(ctx, request);
            }
            case 200: {
                return this.updateAndCreateSubscriptionGroup(ctx, request);
            }
            case 201: {
                return this.getAllSubscriptionGroup(ctx, request);
            }
            case 207: {
                return this.deleteSubscriptionGroup(ctx, request);
            }
            case 202: {
                return this.getTopicStatsInfo(ctx, request);
            }
            case 203: {
                return this.getConsumerConnectionList(ctx, request);
            }
            case 204: {
                return this.getProducerConnectionList(ctx, request);
            }
            case 208: {
                return this.getConsumeStats(ctx, request);
            }
            case 43: {
                return this.getAllConsumerOffset(ctx, request);
            }
            case 45: {
                return this.getAllDelayOffset(ctx, request);
            }
            case 222: {
                return this.resetOffset(ctx, request);
            }
            case 223: {
                return this.getConsumerStatus(ctx, request);
            }
            case 300: {
                return this.queryTopicConsumeByWho(ctx, request);
            }
            case 301: {
                return this.registerFilterServer(ctx, request);
            }
            case 303: {
                return this.queryConsumeTimeSpan(ctx, request);
            }
            case 305: {
                return this.getSystemTopicListFromBroker(ctx, request);
            }
            case 306: {
                return this.cleanExpiredConsumeQueue();
            }
            case 307: {
                return this.getConsumerRunningInfo(ctx, request);
            }
            case 308: {
                return this.queryCorrectionOffset(ctx, request);
            }
            case 309: {
                return this.consumeMessageDirectly(ctx, request);
            }
            case 314: {
                return this.cloneGroupOffset(ctx, request);
            }
            case 315: {
                return this.ViewBrokerStatsData(ctx, request);
            }
        }
        return null;
    }

    private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        ViewBrokerStatsDataRequestHeader requestHeader = (ViewBrokerStatsDataRequestHeader)request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class);
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        DefaultMessageStore messageStore = (DefaultMessageStore)this.brokerController.getMessageStore();
        StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey());
        if (null == statsItem) {
            response.setCode(1);
            response.setRemark(String.format("The stats <%s> <%s> not exist", requestHeader.getStatsName(), requestHeader.getStatsKey()));
            return response;
        }
        BrokerStatsData brokerStatsData = new BrokerStatsData();
        BrokerStatsItem it = new BrokerStatsItem();
        StatsSnapshot ss = statsItem.getStatsDataInMinute();
        it.setSum(ss.getSum());
        it.setTps(ss.getTps());
        it.setAvgpt(ss.getAvgpt());
        brokerStatsData.setStatsMinute(it);
        it = new BrokerStatsItem();
        ss = statsItem.getStatsDataInHour();
        it.setSum(ss.getSum());
        it.setTps(ss.getTps());
        it.setAvgpt(ss.getAvgpt());
        brokerStatsData.setStatsHour(it);
        it = new BrokerStatsItem();
        ss = statsItem.getStatsDataInDay();
        it.setSum(ss.getSum());
        it.setTps(ss.getTps());
        it.setAvgpt(ss.getAvgpt());
        brokerStatsData.setStatsDay(it);
        response.setBody(brokerStatsData.encode());
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand callConsumer(int requestCode, RemotingCommand request, String consumerGroup, String clientId) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId);
        if (null == clientChannelInfo) {
            response.setCode(1);
            response.setRemark(String.format("The Consumer <%s> <%s> not online", consumerGroup, clientId));
            return response;
        }
        if (clientChannelInfo.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {
            response.setCode(1);
            response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT", clientId, MQVersion.getVersionDesc((int)clientChannelInfo.getVersion())));
            return response;
        }
        try {
            RemotingCommand newRequest = RemotingCommand.createRequestCommand((int)requestCode, null);
            newRequest.setExtFields(request.getExtFields());
            newRequest.setBody(request.getBody());
            RemotingCommand consumerResponse = this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest);
            return consumerResponse;
        }
        catch (RemotingTimeoutException e) {
            response.setCode(207);
            response.setRemark(String.format("consumer <%s> <%s> Timeout: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc((Throwable)e)));
            return response;
        }
        catch (Exception e) {
            response.setCode(1);
            response.setRemark(String.format("invoke consumer <%s> <%s> Exception: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc((Throwable)e)));
            return response;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader)request.decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
        request.getExtFields().put("brokerName", this.brokerController.getBrokerConfig().getBrokerName());
        SelectMapedBufferResult selectMapedBufferResult = null;
        try {
            MessageId messageId = MessageDecoder.decodeMessageId((String)requestHeader.getMsgId());
            selectMapedBufferResult = this.brokerController.getMessageStore().selectOneMessageByOffset(messageId.getOffset());
            byte[] body = new byte[selectMapedBufferResult.getSize()];
            selectMapedBufferResult.getByteBuffer().get(body);
            request.setBody(body);
        }
        catch (UnknownHostException e) {
        }
        finally {
            if (selectMapedBufferResult != null) {
                selectMapedBufferResult.release();
            }
        }
        return this.callConsumer(309, request, requestHeader.getConsumerGroup(), requestHeader.getClientId());
    }

    private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        GetConsumerRunningInfoRequestHeader requestHeader = (GetConsumerRunningInfoRequestHeader)request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
        return this.callConsumer(307, request, requestHeader.getConsumerGroup(), requestHeader.getClientId());
    }

    public RemotingCommand cleanExpiredConsumeQueue() {
        log.warn("invoke cleanExpiredConsumeQueue start.");
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        this.brokerController.getMessageStore().cleanExpiredConsumerQueue();
        log.warn("invoke cleanExpiredConsumeQueue end.");
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand registerFilterServer(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(RegisterFilterServerResponseHeader.class);
        RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader)response.readCustomHeader();
        RegisterFilterServerRequestHeader requestHeader = (RegisterFilterServerRequestHeader)request.decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class);
        this.brokerController.getFilterServerManager().registerFilterServer(ctx.channel(), requestHeader.getFilterServerAddr());
        responseHeader.setBrokerId(this.brokerController.getBrokerConfig().getBrokerId());
        responseHeader.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        GetConsumeStatsRequestHeader requestHeader = (GetConsumeStatsRequestHeader)request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);
        ConsumeStats consumeStats = new ConsumeStats();
        Set<String> topics = new HashSet<String>();
        if (UtilAll.isBlank((String)requestHeader.getTopic())) {
            topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup());
        } else {
            topics.add(requestHeader.getTopic());
        }
        for (String topic : topics) {
            TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
            if (null == topicConfig) {
                log.warn("consumeStats, topic config not exist, {}", (Object)topic);
                continue;
            }
            SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);
            if (null == findSubscriptionData && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) {
                log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", (Object)requestHeader.getConsumerGroup(), (Object)topic);
                continue;
            }
            for (int i = 0; i < topicConfig.getWriteQueueNums(); ++i) {
                long lastTimestamp;
                long consumerOffset;
                MessageQueue mq = new MessageQueue();
                mq.setTopic(topic);
                mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
                mq.setQueueId(i);
                OffsetWrapper offsetWrapper = new OffsetWrapper();
                long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
                if (brokerOffset < 0L) {
                    brokerOffset = 0L;
                }
                if ((consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), topic, i)) < 0L) {
                    consumerOffset = 0L;
                }
                offsetWrapper.setBrokerOffset(brokerOffset);
                offsetWrapper.setConsumerOffset(consumerOffset);
                long timeOffset = consumerOffset - 1L;
                if (timeOffset >= 0L && (lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset)) > 0L) {
                    offsetWrapper.setLastTimestamp(lastTimestamp);
                }
                consumeStats.getOffsetTable().put(mq, offsetWrapper);
            }
            long consumeTps = (long)this.brokerController.getBrokerStatsManager().tpsGroupGetNums(requestHeader.getConsumerGroup(), topic);
            consumeStats.setConsumeTps(consumeTps += consumeStats.getConsumeTps());
        }
        byte[] body = consumeStats.encode();
        response.setBody(body);
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand getProducerConnectionList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        GetProducerConnectionListRequestHeader requestHeader = (GetProducerConnectionListRequestHeader)request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class);
        ProducerConnection bodydata = new ProducerConnection();
        HashMap<Channel, ClientChannelInfo> channelInfoHashMap = this.brokerController.getProducerManager().getGroupChannelTable().get(requestHeader.getProducerGroup());
        if (channelInfoHashMap != null) {
            Iterator<Map.Entry<Channel, ClientChannelInfo>> it = channelInfoHashMap.entrySet().iterator();
            while (it.hasNext()) {
                ClientChannelInfo info = it.next().getValue();
                Connection connection = new Connection();
                connection.setClientId(info.getClientId());
                connection.setLanguage(info.getLanguage());
                connection.setVersion(info.getVersion());
                connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr((Channel)info.getChannel()));
                bodydata.getConnectionSet().add(connection);
            }
            byte[] body = bodydata.encode();
            response.setBody(body);
            response.setCode(0);
            response.setRemark(null);
            return response;
        }
        response.setCode(1);
        response.setRemark("the producer group[" + requestHeader.getProducerGroup() + "] not exist");
        return response;
    }

    private RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        GetConsumerConnectionListRequestHeader requestHeader = (GetConsumerConnectionListRequestHeader)request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class);
        ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
        if (consumerGroupInfo != null) {
            ConsumerConnection bodydata = new ConsumerConnection();
            bodydata.setConsumeFromWhere(consumerGroupInfo.getConsumeFromWhere());
            bodydata.setConsumeType(consumerGroupInfo.getConsumeType());
            bodydata.setMessageModel(consumerGroupInfo.getMessageModel());
            bodydata.getSubscriptionTable().putAll(consumerGroupInfo.getSubscriptionTable());
            Iterator<Map.Entry<Channel, ClientChannelInfo>> it = consumerGroupInfo.getChannelInfoTable().entrySet().iterator();
            while (it.hasNext()) {
                ClientChannelInfo info = it.next().getValue();
                Connection connection = new Connection();
                connection.setClientId(info.getClientId());
                connection.setLanguage(info.getLanguage());
                connection.setVersion(info.getVersion());
                connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr((Channel)info.getChannel()));
                bodydata.getConnectionSet().add(connection);
            }
            byte[] body = bodydata.encode();
            response.setBody(body);
            response.setCode(0);
            response.setRemark(null);
            return response;
        }
        response.setCode(206);
        response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] not online");
        return response;
    }

    private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        GetTopicStatsInfoRequestHeader requestHeader = (GetTopicStatsInfoRequestHeader)request.decodeCommandCustomHeader(GetTopicStatsInfoRequestHeader.class);
        String topic = requestHeader.getTopic();
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
        if (null == topicConfig) {
            response.setCode(17);
            response.setRemark("topic[" + topic + "] not exist");
            return response;
        }
        TopicStatsTable topicStatsTable = new TopicStatsTable();
        for (int i = 0; i < topicConfig.getWriteQueueNums(); ++i) {
            long max;
            MessageQueue mq = new MessageQueue();
            mq.setTopic(topic);
            mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
            mq.setQueueId(i);
            TopicOffset topicOffset = new TopicOffset();
            long min = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, i);
            if (min < 0L) {
                min = 0L;
            }
            if ((max = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i)) < 0L) {
                max = 0L;
            }
            long timestamp = 0L;
            if (max > 0L) {
                timestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, max - 1L);
            }
            topicOffset.setMinOffset(min);
            topicOffset.setMaxOffset(max);
            topicOffset.setLastUpdateTimestamp(timestamp);
            topicStatsTable.getOffsetTable().put(mq, topicOffset);
        }
        byte[] body = topicStatsTable.encode();
        response.setBody(body);
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        log.info("updateAndCreateSubscriptionGroup called by {}", (Object)RemotingHelper.parseChannelRemoteAddr((Channel)ctx.channel()));
        SubscriptionGroupConfig config = (SubscriptionGroupConfig)RemotingSerializable.decode((byte[])request.getBody(), SubscriptionGroupConfig.class);
        if (config != null) {
            this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfig(config);
        }
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand getAllSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        String content = this.brokerController.getSubscriptionGroupManager().encode();
        if (content != null && content.length() > 0) {
            try {
                response.setBody(content.getBytes("UTF-8"));
            }
            catch (UnsupportedEncodingException e) {
                log.error("", (Throwable)e);
                response.setCode(1);
                response.setRemark("UnsupportedEncodingException " + e);
                return response;
            }
        } else {
            log.error("No subscription group in this broker, client: " + ctx.channel().remoteAddress());
            response.setCode(1);
            response.setRemark("No subscription group in this broker");
            return response;
        }
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        LockBatchRequestBody requestBody = (LockBatchRequestBody)LockBatchRequestBody.decode((byte[])request.getBody(), LockBatchRequestBody.class);
        Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(requestBody.getConsumerGroup(), requestBody.getMqSet(), requestBody.getClientId());
        LockBatchResponseBody responseBody = new LockBatchResponseBody();
        responseBody.setLockOKMQSet(lockOKMQSet);
        response.setBody(responseBody.encode());
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        UnlockBatchRequestBody requestBody = (UnlockBatchRequestBody)UnlockBatchRequestBody.decode((byte[])request.getBody(), UnlockBatchRequestBody.class);
        this.brokerController.getRebalanceLockManager().unlockBatch(requestBody.getConsumerGroup(), requestBody.getMqSet(), requestBody.getClientId());
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        CreateTopicRequestHeader requestHeader = (CreateTopicRequestHeader)request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
        log.info("updateAndCreateTopic called by {}", (Object)RemotingHelper.parseChannelRemoteAddr((Channel)ctx.channel()));
        if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) {
            String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
            log.warn(errorMsg);
            response.setCode(1);
            response.setRemark(errorMsg);
            return response;
        }
        TopicConfig topicConfig = new TopicConfig(requestHeader.getTopic());
        topicConfig.setReadQueueNums(requestHeader.getReadQueueNums().intValue());
        topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums().intValue());
        topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
        topicConfig.setPerm(requestHeader.getPerm().intValue());
        topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
        this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
        this.brokerController.registerBrokerAll(false, true);
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand deleteTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        DeleteTopicRequestHeader requestHeader = (DeleteTopicRequestHeader)request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class);
        log.info("deleteTopic called by {}", (Object)RemotingHelper.parseChannelRemoteAddr((Channel)ctx.channel()));
        this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic());
        this.brokerController.addDeleteTopicTask();
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCommand request) {
        RemotingCommand response = RemotingCommand.createResponseCommand(GetAllTopicConfigResponseHeader.class);
        String content = this.brokerController.getTopicConfigManager().encode();
        if (content != null && content.length() > 0) {
            try {
                response.setBody(content.getBytes("UTF-8"));
            }
            catch (UnsupportedEncodingException e) {
                log.error("", (Throwable)e);
                response.setCode(1);
                response.setRemark("UnsupportedEncodingException " + e);
                return response;
            }
        } else {
            log.error("No topic in this broker, client: " + ctx.channel().remoteAddress());
            response.setCode(1);
            response.setRemark("No topic in this broker");
            return response;
        }
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        log.info("updateBrokerConfig called by {}", (Object)RemotingHelper.parseChannelRemoteAddr((Channel)ctx.channel()));
        byte[] body = request.getBody();
        if (body != null) {
            try {
                String bodyStr = new String(body, "UTF-8");
                Properties properties = MixAll.string2Properties((String)bodyStr);
                if (properties == null) {
                    log.error("string2Properties error");
                    response.setCode(1);
                    response.setRemark("string2Properties error");
                    return response;
                }
                log.info("updateBrokerConfig, new config: " + properties + " client: " + ctx.channel().remoteAddress());
                this.brokerController.updateAllConfig(properties);
            }
            catch (UnsupportedEncodingException e) {
                log.error("", (Throwable)e);
                response.setCode(1);
                response.setRemark("UnsupportedEncodingException " + e);
                return response;
            }
        }
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand getBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
        RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerConfigResponseHeader.class);
        GetBrokerConfigResponseHeader responseHeader = (GetBrokerConfigResponseHeader)response.readCustomHeader();
        String content = this.brokerController.encodeAllConfig();
        if (content != null && content.length() > 0) {
            try {
                response.setBody(content.getBytes("UTF-8"));
            }
            catch (UnsupportedEncodingException e) {
                log.error("", (Throwable)e);
                response.setCode(1);
                response.setRemark("UnsupportedEncodingException " + e);
                return response;
            }
        }
        responseHeader.setVersion(this.brokerController.getConfigDataVersion());
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
        SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader)response.readCustomHeader();
        SearchOffsetRequestHeader requestHeader = (SearchOffsetRequestHeader)request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class);
        long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId().intValue(), requestHeader.getTimestamp().longValue());
        responseHeader.setOffset(Long.valueOf(offset));
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand getMaxOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
        GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader)response.readCustomHeader();
        GetMaxOffsetRequestHeader requestHeader = (GetMaxOffsetRequestHeader)request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class);
        long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId().intValue());
        responseHeader.setOffset(Long.valueOf(offset));
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand getMinOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
        GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader)response.readCustomHeader();
        GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader)request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
        long offset = this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId().intValue());
        responseHeader.setOffset(Long.valueOf(offset));
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand getEarliestMsgStoretime(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class);
        GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader)response.readCustomHeader();
        GetEarliestMsgStoretimeRequestHeader requestHeader = (GetEarliestMsgStoretimeRequestHeader)request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class);
        long timestamp = this.brokerController.getMessageStore().getEarliestMessageTime(requestHeader.getTopic(), requestHeader.getQueueId().intValue());
        responseHeader.setTimestamp(Long.valueOf(timestamp));
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private HashMap<String, String> prepareRuntimeInfo() {
        HashMap runtimeInfo = this.brokerController.getMessageStore().getRuntimeInfo();
        runtimeInfo.put("brokerVersionDesc", MQVersion.getVersionDesc((int)MQVersion.CurrentVersion));
        runtimeInfo.put("brokerVersion", String.valueOf(MQVersion.CurrentVersion));
        runtimeInfo.put("msgPutTotalYesterdayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalYesterdayMorning()));
        runtimeInfo.put("msgPutTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayMorning()));
        runtimeInfo.put("msgPutTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayNow()));
        runtimeInfo.put("msgGetTotalYesterdayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalYesterdayMorning()));
        runtimeInfo.put("msgGetTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayMorning()));
        runtimeInfo.put("msgGetTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayNow()));
        runtimeInfo.put("sendThreadPoolQueueSize", String.valueOf(this.brokerController.getSendThreadPoolQueue().size()));
        runtimeInfo.put("sendThreadPoolQueueCapacity", String.valueOf(this.brokerController.getBrokerConfig().getSendThreadPoolQueueCapacity()));
        return runtimeInfo;
    }

    private RemotingCommand getBrokerRuntimeInfo(ChannelHandlerContext ctx, RemotingCommand request) {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        HashMap<String, String> runtimeInfo = this.prepareRuntimeInfo();
        KVTable kvTable = new KVTable();
        kvTable.setTable(runtimeInfo);
        byte[] body = kvTable.encode();
        response.setBody(body);
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        String content = this.brokerController.getConsumerOffsetManager().encode();
        if (content != null && content.length() > 0) {
            try {
                response.setBody(content.getBytes("UTF-8"));
            }
            catch (UnsupportedEncodingException e) {
                log.error("get all consumer offset from master error.", (Throwable)e);
                response.setCode(1);
                response.setRemark("UnsupportedEncodingException " + e);
                return response;
            }
        } else {
            log.error("No consumer offset in this broker, client: " + ctx.channel().remoteAddress());
            response.setCode(1);
            response.setRemark("No consumer offset in this broker");
            return response;
        }
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand request) {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        String content = ((DefaultMessageStore)this.brokerController.getMessageStore()).getScheduleMessageService().encode();
        if (content != null && content.length() > 0) {
            try {
                response.setBody(content.getBytes("UTF-8"));
            }
            catch (UnsupportedEncodingException e) {
                log.error("get all delay offset from master error.", (Throwable)e);
                response.setCode(1);
                response.setRemark("UnsupportedEncodingException " + e);
                return response;
            }
        } else {
            log.error("No delay offset in this broker, client: " + ctx.channel().remoteAddress());
            response.setCode(1);
            response.setRemark("No delay offset in this broker");
            return response;
        }
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        DeleteSubscriptionGroupRequestHeader requestHeader = (DeleteSubscriptionGroupRequestHeader)request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class);
        log.info("deleteSubscriptionGroup called by {}", (Object)RemotingHelper.parseChannelRemoteAddr((Channel)ctx.channel()));
        this.brokerController.getSubscriptionGroupManager().deleteSubscriptionGroupConfig(requestHeader.getGroupName());
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        ResetOffsetRequestHeader requestHeader = (ResetOffsetRequestHeader)request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
        log.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}", new Object[]{RemotingHelper.parseChannelRemoteAddr((Channel)ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), requestHeader.getTimestamp(), requestHeader.isForce()});
        return this.brokerController.getBroker2Client().resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), requestHeader.getTimestamp(), requestHeader.isForce());
    }

    public RemotingCommand getConsumerStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        GetConsumerStatusRequestHeader requestHeader = (GetConsumerStatusRequestHeader)request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
        log.info("[get-consumer-status] get consumer status by {}. topic={}, group={}", new Object[]{RemotingHelper.parseChannelRemoteAddr((Channel)ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup()});
        return this.brokerController.getBroker2Client().getConsumeStatus(requestHeader.getTopic(), requestHeader.getGroup(), requestHeader.getClientAddr());
    }

    private RemotingCommand queryTopicConsumeByWho(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        QueryTopicConsumeByWhoRequestHeader requestHeader = (QueryTopicConsumeByWhoRequestHeader)request.decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class);
        HashSet<String> groups = this.brokerController.getConsumerManager().queryTopicConsumeByWho(requestHeader.getTopic());
        Set<String> groupInOffset = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(requestHeader.getTopic());
        if (groupInOffset != null && !groupInOffset.isEmpty()) {
            groups.addAll(groupInOffset);
        }
        GroupList groupList = new GroupList();
        groupList.setGroupList(groups);
        byte[] body = groupList.encode();
        response.setBody(body);
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        QueryConsumeTimeSpanRequestHeader requestHeader = (QueryConsumeTimeSpanRequestHeader)request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class);
        String topic = requestHeader.getTopic();
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
        if (null == topicConfig) {
            response.setCode(17);
            response.setRemark("topic[" + topic + "] not exist");
            return response;
        }
        HashSet<QueueTimeSpan> timeSpanSet = new HashSet<QueueTimeSpan>();
        for (int i = 0; i < topicConfig.getWriteQueueNums(); ++i) {
            QueueTimeSpan timeSpan = new QueueTimeSpan();
            MessageQueue mq = new MessageQueue();
            mq.setTopic(topic);
            mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
            mq.setQueueId(i);
            timeSpan.setMessageQueue(mq);
            long minTime = this.brokerController.getMessageStore().getEarliestMessageTime(topic, i);
            timeSpan.setMinTimeStamp(minTime);
            long max = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
            long maxTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, max - 1L);
            timeSpan.setMaxTimeStamp(maxTime);
            long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getGroup(), topic, i);
            long consumeTime = consumerOffset > 0L ? this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset) : minTime;
            timeSpan.setConsumeTimeStamp(consumeTime);
            timeSpanSet.add(timeSpan);
        }
        QueryConsumeTimeSpanBody queryConsumeTimeSpanBody = new QueryConsumeTimeSpanBody();
        queryConsumeTimeSpanBody.setConsumeTimeSpanSet(timeSpanSet);
        response.setBody(queryConsumeTimeSpanBody.encode());
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand getSystemTopicListFromBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        Set<String> topics = this.brokerController.getTopicConfigManager().getSystemTopic();
        TopicList topicList = new TopicList();
        topicList.setTopicList(topics);
        response.setBody(topicList.encode());
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        QueryCorrectionOffsetHeader requestHeader = (QueryCorrectionOffsetHeader)request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class);
        Map<Integer, Long> correctionOffset = this.brokerController.getConsumerOffsetManager().queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups());
        Map<Integer, Long> compareOffset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getTopic(), requestHeader.getCompareGroup());
        if (compareOffset != null && !compareOffset.isEmpty()) {
            for (Integer queueId : compareOffset.keySet()) {
                correctionOffset.put(queueId, correctionOffset.get(queueId) > compareOffset.get(queueId) ? Long.MAX_VALUE : correctionOffset.get(queueId));
            }
        }
        QueryCorrectionOffsetBody body = new QueryCorrectionOffsetBody();
        body.setCorrectionOffsets(correctionOffset);
        response.setBody(body.encode());
        response.setCode(0);
        response.setRemark(null);
        return response;
    }

    private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        Set<Object> topics;
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        CloneGroupOffsetRequestHeader requestHeader = (CloneGroupOffsetRequestHeader)request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class);
        if (UtilAll.isBlank((String)requestHeader.getTopic())) {
            topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getSrcGroup());
        } else {
            topics = new HashSet<String>();
            topics.add(requestHeader.getTopic());
        }
        for (String string : topics) {
            TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(string);
            if (null == topicConfig) {
                log.warn("[cloneGroupOffset], topic config not exist, {}", (Object)string);
                continue;
            }
            if (!requestHeader.isOffline()) {
                SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getSrcGroup(), string);
                if (this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getSrcGroup()) > 0 && findSubscriptionData == null) {
                    log.warn("[cloneGroupOffset], the consumer group[{}], topic[{}] not exist", (Object)requestHeader.getSrcGroup(), (Object)string);
                    continue;
                }
            }
            this.brokerController.getConsumerOffsetManager().cloneOffset(requestHeader.getSrcGroup(), requestHeader.getDestGroup(), requestHeader.getTopic());
        }
        response.setCode(0);
        response.setRemark(null);
        return response;
    }
}

