/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.rocketmq.broker.processor;

import com.alibaba.rocketmq.broker.BrokerController;
import com.alibaba.rocketmq.broker.longpolling.PullRequest;
import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageContext;
import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook;
import com.alibaba.rocketmq.broker.pagecache.ManyMessageTransfer;
import com.alibaba.rocketmq.common.TopicFilterType;
import com.alibaba.rocketmq.common.constant.PermName;
import com.alibaba.rocketmq.common.filter.FilterAPI;
import com.alibaba.rocketmq.common.help.FAQUrl;
import com.alibaba.rocketmq.common.message.MessageDecoder;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.common.protocol.header.PullMessageRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.PullMessageResponseHeader;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.alibaba.rocketmq.common.protocol.topic.OffsetMovedEvent;
import com.alibaba.rocketmq.common.sysflag.PullSysFlag;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.alibaba.rocketmq.remoting.common.RemotingUtil;
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
import com.alibaba.rocketmq.store.GetMessageStatus;
import com.alibaba.rocketmq.store.MessageExtBrokerInner;
import com.alibaba.rocketmq.store.PutMessageResult;
import com.alibaba.rocketmq.store.config.BrokerRole;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PullMessageProcessor
implements NettyRequestProcessor {
    private static final Logger log = LoggerFactory.getLogger((String)"RocketmqBroker");
    private final BrokerController brokerController;
    private List<ConsumeMessageHook> consumeMessageHookList;

    public PullMessageProcessor(BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        return this.processRequest(ctx.channel(), request, true);
    }

    public void excuteRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException {
        Runnable run = new Runnable(){

            @Override
            public void run() {
                try {
                    final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
                    if (response != null) {
                        response.setOpaque(request.getOpaque());
                        response.markResponseType();
                        try {
                            channel.writeAndFlush((Object)response).addListener((GenericFutureListener)new ChannelFutureListener(){

                                public void operationComplete(ChannelFuture future) throws Exception {
                                    if (!future.isSuccess()) {
                                        log.error("processRequestWrapper response to " + future.channel().remoteAddress() + " failed", future.cause());
                                        log.error(request.toString());
                                        log.error(response.toString());
                                    }
                                }
                            });
                        }
                        catch (Throwable e) {
                            log.error("processRequestWrapper process request over, but response failed", e);
                            log.error(request.toString());
                            log.error(response.toString());
                        }
                    }
                }
                catch (RemotingCommandException e1) {
                    log.error("excuteRequestWhenWakeup run", (Throwable)e1);
                }
            }
        };
        this.brokerController.getPullMessageExecutor().submit(run);
    }

    private void generateOffsetMovedEvent(OffsetMovedEvent event) {
        try {
            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
            msgInner.setTopic("OFFSET_MOVED_EVENT");
            msgInner.setTags(event.getConsumerGroup());
            msgInner.setDelayTimeLevel(0);
            msgInner.setKeys(event.getConsumerGroup());
            msgInner.setBody(event.encode());
            msgInner.setFlag(0);
            msgInner.setPropertiesString(MessageDecoder.messageProperties2String((Map)msgInner.getProperties()));
            msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode((TopicFilterType)TopicFilterType.SINGLE_TAG, (String)msgInner.getTags()));
            msgInner.setQueueId(0);
            msgInner.setSysFlag(0);
            msgInner.setBornTimestamp(System.currentTimeMillis());
            msgInner.setBornHost(RemotingUtil.string2SocketAddress((String)this.brokerController.getBrokerAddr()));
            msgInner.setStoreHost(msgInner.getBornHost());
            msgInner.setReconsumeTimes(0);
            PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        }
        catch (Exception e) {
            log.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), (Throwable)e);
        }
    }

    /*
     * Unable to fully structure code
     */
    private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException {
        block43: {
            block42: {
                response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
                responseHeader = (PullMessageResponseHeader)response.readCustomHeader();
                requestHeader = (PullMessageRequestHeader)request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
                response.setOpaque(request.getOpaque());
                if (PullMessageProcessor.log.isDebugEnabled()) {
                    PullMessageProcessor.log.debug("receive PullMessage request command, " + request);
                }
                if (!PermName.isReadable((int)this.brokerController.getBrokerConfig().getBrokerPermission())) {
                    response.setCode(16);
                    response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] pulling message is forbidden");
                    return response;
                }
                subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
                if (null == subscriptionGroupConfig) {
                    response.setCode(26);
                    response.setRemark("subscription group not exist, " + requestHeader.getConsumerGroup() + " " + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/42"));
                    return response;
                }
                if (!subscriptionGroupConfig.isConsumeEnable()) {
                    response.setCode(16);
                    response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
                    return response;
                }
                hasSuspendFlag = PullSysFlag.hasSuspendFlag((int)requestHeader.getSysFlag());
                hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag((int)requestHeader.getSysFlag());
                hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag((int)requestHeader.getSysFlag());
                suspendTimeoutMillisLong = hasSuspendFlag != false ? requestHeader.getSuspendTimeoutMillis() : 0L;
                topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
                if (null == topicConfig) {
                    PullMessageProcessor.log.error("the topic " + requestHeader.getTopic() + " not exist, consumer: " + RemotingHelper.parseChannelRemoteAddr((Channel)channel));
                    response.setCode(17);
                    response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/38"));
                    return response;
                }
                if (!PermName.isReadable((int)topicConfig.getPerm())) {
                    response.setCode(16);
                    response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
                    return response;
                }
                if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
                    errorInfo = "queueId[" + requestHeader.getQueueId() + "] is illagal,Topic :" + requestHeader.getTopic() + " topicConfig.readQueueNums: " + topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress();
                    PullMessageProcessor.log.warn(errorInfo);
                    response.setCode(1);
                    response.setRemark(errorInfo);
                    return response;
                }
                subscriptionData = null;
                if (hasSubscriptionFlag) {
                    try {
                        subscriptionData = FilterAPI.buildSubscriptionData((String)requestHeader.getConsumerGroup(), (String)requestHeader.getTopic(), (String)requestHeader.getSubscription());
                    }
                    catch (Exception e) {
                        PullMessageProcessor.log.warn("parse the consumer's subscription[{}] failed, group: {}", (Object)requestHeader.getSubscription(), (Object)requestHeader.getConsumerGroup());
                        response.setCode(23);
                        response.setRemark("parse the consumer's subscription failed");
                        return response;
                    }
                } else {
                    consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
                    if (null == consumerGroupInfo) {
                        PullMessageProcessor.log.warn("the consumer's group info not exist, group: {}", (Object)requestHeader.getConsumerGroup());
                        response.setCode(24);
                        response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/46"));
                        return response;
                    }
                    if (!subscriptionGroupConfig.isConsumeBroadcastEnable() && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
                        response.setCode(16);
                        response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
                        return response;
                    }
                    subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
                    if (null == subscriptionData) {
                        PullMessageProcessor.log.warn("the consumer's subscription not exist, group: {}", (Object)requestHeader.getConsumerGroup());
                        response.setCode(24);
                        response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/46"));
                        return response;
                    }
                    if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
                        PullMessageProcessor.log.warn("the broker's subscription is not latest, group: {} {}", (Object)requestHeader.getConsumerGroup(), (Object)subscriptionData.getSubString());
                        response.setCode(25);
                        response.setRemark("the consumer's subscription not latest");
                        return response;
                    }
                }
                getMessageResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId().intValue(), requestHeader.getQueueOffset().longValue(), requestHeader.getMaxMsgNums().intValue(), subscriptionData);
                if (getMessageResult == null) break block42;
                response.setRemark(getMessageResult.getStatus().name());
                responseHeader.setNextBeginOffset(Long.valueOf(getMessageResult.getNextBeginOffset()));
                responseHeader.setMinOffset(Long.valueOf(getMessageResult.getMinOffset()));
                responseHeader.setMaxOffset(Long.valueOf(getMessageResult.getMaxOffset()));
                if (getMessageResult.isSuggestPullingFromSlave()) {
                    responseHeader.setSuggestWhichBrokerId(Long.valueOf(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()));
                } else {
                    responseHeader.setSuggestWhichBrokerId(Long.valueOf(subscriptionGroupConfig.getBrokerId()));
                }
                switch (3.$SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus[getMessageResult.getStatus().ordinal()]) {
                    case 1: {
                        response.setCode(0);
                        if (!this.hasConsumeMessageHook()) break;
                        context = new ConsumeMessageContext();
                        context.setConsumerGroup(requestHeader.getConsumerGroup());
                        context.setTopic(requestHeader.getTopic());
                        context.setClientHost(RemotingHelper.parseChannelRemoteAddr((Channel)channel));
                        context.setStoreHost(this.brokerController.getBrokerAddr());
                        context.setQueueId(requestHeader.getQueueId());
                        storeHost = new InetSocketAddress(this.brokerController.getBrokerConfig().getBrokerIP1(), this.brokerController.getNettyServerConfig().getListenPort());
                        messageIds = this.brokerController.getMessageStore().getMessageIds(requestHeader.getTopic(), requestHeader.getQueueId().intValue(), requestHeader.getQueueOffset().longValue(), requestHeader.getQueueOffset() + (long)getMessageResult.getMessageCount(), (SocketAddress)storeHost);
                        context.setMessageIds(messageIds);
                        context.setBodyLength(getMessageResult.getBufferTotalSize() / getMessageResult.getMessageCount());
                        this.executeConsumeMessageHookBefore(context);
                        break;
                    }
                    case 2: {
                        response.setCode(20);
                        break;
                    }
                    case 3: 
                    case 4: {
                        if (0L != requestHeader.getQueueOffset()) {
                            response.setCode(21);
                            PullMessageProcessor.log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", new Object[]{requestHeader.getQueueOffset(), getMessageResult.getNextBeginOffset(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getConsumerGroup()});
                            break;
                        }
                        response.setCode(19);
                        break;
                    }
                    case 5: {
                        response.setCode(20);
                        break;
                    }
                    case 6: {
                        response.setCode(19);
                        break;
                    }
                    case 7: {
                        response.setCode(21);
                        PullMessageProcessor.log.info("the request offset: " + requestHeader.getQueueOffset() + " over flow badly, broker max offset: " + getMessageResult.getMaxOffset() + ", consumer: " + channel.remoteAddress());
                        break;
                    }
                    case 8: {
                        response.setCode(19);
                        break;
                    }
                    case 9: {
                        response.setCode(21);
                        PullMessageProcessor.log.info("the request offset: " + requestHeader.getQueueOffset() + " too small, broker min offset: " + getMessageResult.getMinOffset() + ", consumer: " + channel.remoteAddress());
                        break;
                    }
                    default: {
                        if (!PullMessageProcessor.$assertionsDisabled) {
                            throw new AssertionError();
                        }
                        break;
                    }
                }
                switch (response.getCode()) {
                    case 0: {
                        this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), getMessageResult.getMessageCount());
                        this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(), getMessageResult.getBufferTotalSize());
                        this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
                        try {
                            fileRegion = new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
                            channel.writeAndFlush((Object)fileRegion).addListener((GenericFutureListener)new ChannelFutureListener(){

                                public void operationComplete(ChannelFuture future) throws Exception {
                                    getMessageResult.release();
                                    if (!future.isSuccess()) {
                                        log.error("transfer many message by pagecache failed, " + channel.remoteAddress(), future.cause());
                                    }
                                }
                            });
                        }
                        catch (Throwable e) {
                            PullMessageProcessor.log.error("", e);
                            getMessageResult.release();
                        }
                        response = null;
                        break block43;
                    }
                    case 19: {
                        if (!brokerAllowSuspend || !hasSuspendFlag) ** GOTO lbl152
                        pollingTimeMills = suspendTimeoutMillisLong;
                        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                        }
                        pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), requestHeader.getQueueOffset());
                        this.brokerController.getPullRequestHoldService().suspendPullRequest(requestHeader.getTopic(), requestHeader.getQueueId(), pullRequest);
                        response = null;
                        break block43;
                    }
lbl152:
                    // 2 sources

                    case 20: {
                        break block43;
                    }
                    case 21: {
                        if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this.brokerController.getBrokerConfig().isOffsetCheckInSlave()) {
                            mq = new MessageQueue();
                            mq.setTopic(requestHeader.getTopic());
                            mq.setQueueId(requestHeader.getQueueId().intValue());
                            mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
                            event = new OffsetMovedEvent();
                            event.setConsumerGroup(requestHeader.getConsumerGroup());
                            event.setMessageQueue(mq);
                            event.setOffsetRequest(requestHeader.getQueueOffset().longValue());
                            event.setOffsetNew(getMessageResult.getNextBeginOffset());
                            this.generateOffsetMovedEvent(event);
                        } else {
                            responseHeader.setSuggestWhichBrokerId(Long.valueOf(subscriptionGroupConfig.getBrokerId()));
                            response.setCode(20);
                        }
                        PullMessageProcessor.log.warn("PULL_OFFSET_MOVED:topic={}, groupId={}, clientId={}, offset={}, suggestBrokerId={}", new Object[]{requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(), responseHeader.getSuggestWhichBrokerId()});
                        break block43;
                    }
                    default: {
                        if (!PullMessageProcessor.$assertionsDisabled) {
                            throw new AssertionError();
                        }
                        break block43;
                    }
                }
            }
            response.setCode(1);
            response.setRemark("store getMessage return null");
        }
        storeOffsetEnable = brokerAllowSuspend;
        storeOffsetEnable = storeOffsetEnable != false && hasCommitOffsetFlag != false;
        v0 = storeOffsetEnable = storeOffsetEnable != false && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
        if (storeOffsetEnable) {
            this.brokerController.getConsumerOffsetManager().commitOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
        }
        return response;
    }

    public boolean hasConsumeMessageHook() {
        return this.consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
    }

    public void registerConsumeMessageHook(List<ConsumeMessageHook> sendMessageHookList) {
        this.consumeMessageHookList = sendMessageHookList;
    }

    public void executeConsumeMessageHookBefore(ConsumeMessageContext context) {
        if (this.hasConsumeMessageHook()) {
            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
                try {
                    hook.consumeMessageBefore(context);
                }
                catch (Throwable e) {}
            }
        }
    }

    static class 3 {
        static final /* synthetic */ int[] $SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus;

        static {
            $SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus = new int[GetMessageStatus.values().length];
            try {
                3.$SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus[GetMessageStatus.FOUND.ordinal()] = 1;
            }
            catch (NoSuchFieldError ex) {
                // empty catch block
            }
            try {
                3.$SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus[GetMessageStatus.MESSAGE_WAS_REMOVING.ordinal()] = 2;
            }
            catch (NoSuchFieldError ex) {
                // empty catch block
            }
            try {
                3.$SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus[GetMessageStatus.NO_MATCHED_LOGIC_QUEUE.ordinal()] = 3;
            }
            catch (NoSuchFieldError ex) {
                // empty catch block
            }
            try {
                3.$SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus[GetMessageStatus.NO_MESSAGE_IN_QUEUE.ordinal()] = 4;
            }
            catch (NoSuchFieldError ex) {
                // empty catch block
            }
            try {
                3.$SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus[GetMessageStatus.NO_MATCHED_MESSAGE.ordinal()] = 5;
            }
            catch (NoSuchFieldError ex) {
                // empty catch block
            }
            try {
                3.$SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus[GetMessageStatus.OFFSET_FOUND_NULL.ordinal()] = 6;
            }
            catch (NoSuchFieldError ex) {
                // empty catch block
            }
            try {
                3.$SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus[GetMessageStatus.OFFSET_OVERFLOW_BADLY.ordinal()] = 7;
            }
            catch (NoSuchFieldError ex) {
                // empty catch block
            }
            try {
                3.$SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus[GetMessageStatus.OFFSET_OVERFLOW_ONE.ordinal()] = 8;
            }
            catch (NoSuchFieldError ex) {
                // empty catch block
            }
            try {
                3.$SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus[GetMessageStatus.OFFSET_TOO_SMALL.ordinal()] = 9;
            }
            catch (NoSuchFieldError noSuchFieldError) {
                // empty catch block
            }
        }
    }
}

