/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.rocketmq.broker.processor;

import com.alibaba.rocketmq.broker.BrokerController;
import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageContext;
import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook;
import com.alibaba.rocketmq.broker.mqtrace.SendMessageContext;
import com.alibaba.rocketmq.broker.mqtrace.SendMessageHook;
import com.alibaba.rocketmq.broker.processor.AbstractSendMessageProcessor;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.common.MixAll;
import com.alibaba.rocketmq.common.TopicConfig;
import com.alibaba.rocketmq.common.TopicFilterType;
import com.alibaba.rocketmq.common.UtilAll;
import com.alibaba.rocketmq.common.constant.PermName;
import com.alibaba.rocketmq.common.help.FAQUrl;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageAccessor;
import com.alibaba.rocketmq.common.message.MessageDecoder;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.SendMessageResponseHeader;
import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
import com.alibaba.rocketmq.common.sysflag.TopicSysFlag;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
import com.alibaba.rocketmq.store.MessageExtBrokerInner;
import com.alibaba.rocketmq.store.PutMessageResult;
import com.alibaba.rocketmq.store.config.StorePathConfigHelper;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class SendMessageProcessor
extends AbstractSendMessageProcessor
implements NettyRequestProcessor {
    private List<SendMessageHook> sendMessageHookList;
    private List<ConsumeMessageHook> consumeMessageHookList;

    public SendMessageProcessor(BrokerController brokerController) {
        super(brokerController);
    }

    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        SendMessageContext mqtraceContext = null;
        switch (request.getCode()) {
            case 36: {
                return this.consumerSendMsgBack(ctx, request);
            }
        }
        SendMessageRequestHeader requestHeader = this.parseRequestHeader(request);
        if (requestHeader == null) {
            return null;
        }
        mqtraceContext = this.buildMsgContext(ctx, requestHeader);
        this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
        RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
        this.executeSendMessageHookAfter(response, mqtraceContext);
        return response;
    }

    private RemotingCommand consumerSendMsgBack(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        TopicConfig topicConfig;
        SubscriptionGroupConfig subscriptionGroupConfig;
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        ConsumerSendMsgBackRequestHeader requestHeader = (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
        if (this.hasConsumeMessageHook() && !UtilAll.isBlank((String)requestHeader.getOriginMsgId())) {
            ConsumeMessageContext context = new ConsumeMessageContext();
            context.setConsumerGroup(requestHeader.getGroup());
            context.setTopic(requestHeader.getOriginTopic());
            context.setClientHost(RemotingHelper.parseChannelRemoteAddr((Channel)ctx.channel()));
            context.setSuccess(false);
            context.setStatus(ConsumeConcurrentlyStatus.RECONSUME_LATER.toString());
            HashMap<String, Long> messageIds = new HashMap<String, Long>();
            messageIds.put(requestHeader.getOriginMsgId(), requestHeader.getOffset());
            context.setMessageIds(messageIds);
            this.executeConsumeMessageHookAfter(context);
        }
        if (null == (subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup()))) {
            response.setCode(26);
            response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " " + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/42"));
            return response;
        }
        if (!PermName.isWriteable((int)this.brokerController.getBrokerConfig().getBrokerPermission())) {
            response.setCode(16);
            response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
            return response;
        }
        if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
            response.setCode(0);
            response.setRemark(null);
            return response;
        }
        String newTopic = MixAll.getRetryTopic((String)requestHeader.getGroup());
        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
        int topicSysFlag = 0;
        if (requestHeader.isUnitMode()) {
            topicSysFlag = TopicSysFlag.buildSysFlag((boolean)false, (boolean)true);
        }
        if (null == (topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, subscriptionGroupConfig.getRetryQueueNums(), 6, topicSysFlag))) {
            response.setCode(1);
            response.setRemark("topic[" + newTopic + "] not exist");
            return response;
        }
        if (!PermName.isWriteable((int)topicConfig.getPerm())) {
            response.setCode(16);
            response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
            return response;
        }
        MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset().longValue());
        if (null == msgExt) {
            response.setCode(1);
            response.setRemark("look message by offset failed, " + requestHeader.getOffset());
            return response;
        }
        String retryTopic = msgExt.getProperty("RETRY_TOPIC");
        if (null == retryTopic) {
            MessageAccessor.putProperty((Message)msgExt, (String)"RETRY_TOPIC", (String)msgExt.getTopic());
        }
        msgExt.setWaitStoreMsgOK(false);
        int delayLevel = requestHeader.getDelayLevel();
        if (msgExt.getReconsumeTimes() >= subscriptionGroupConfig.getRetryMaxTimes() || delayLevel < 0) {
            newTopic = MixAll.getDLQTopic((String)requestHeader.getGroup());
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % 1;
            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, 1, 2, 0);
            if (null == topicConfig) {
                response.setCode(1);
                response.setRemark("topic[" + newTopic + "] not exist");
                return response;
            }
        } else {
            if (0 == delayLevel) {
                delayLevel = 3 + msgExt.getReconsumeTimes();
            }
            msgExt.setDelayTimeLevel(delayLevel);
        }
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(newTopic);
        msgInner.setBody(msgExt.getBody());
        msgInner.setFlag(msgExt.getFlag());
        MessageAccessor.setProperties((Message)msgInner, (Map)msgExt.getProperties());
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String((Map)msgExt.getProperties()));
        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, (String)msgExt.getTags()));
        msgInner.setQueueId(queueIdInt);
        msgInner.setSysFlag(msgExt.getSysFlag());
        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
        msgInner.setBornHost(msgExt.getBornHost());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
        String originMsgId = MessageAccessor.getOriginMessageId((Message)msgExt);
        MessageAccessor.setOriginMessageId((Message)msgInner, (String)(UtilAll.isBlank((String)originMsgId) ? msgExt.getMsgId() : originMsgId));
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        if (putMessageResult != null) {
            switch (putMessageResult.getPutMessageStatus()) {
                case PUT_OK: {
                    String backTopic = msgExt.getTopic();
                    String correctTopic = msgExt.getProperty("RETRY_TOPIC");
                    if (correctTopic != null) {
                        backTopic = correctTopic;
                    }
                    this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
                    response.setCode(0);
                    response.setRemark(null);
                    return response;
                }
            }
            response.setCode(1);
            response.setRemark(putMessageResult.getPutMessageStatus().name());
            return response;
        }
        response.setCode(1);
        response.setRemark("putMessageResult is null");
        return response;
    }

    private String diskUtil() {
        String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
        double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent((String)storePathPhysic);
        String storePathLogis = StorePathConfigHelper.getStorePathConsumeQueue((String)this.brokerController.getMessageStoreConfig().getStorePathRootDir());
        double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent((String)storePathLogis);
        String storePathIndex = StorePathConfigHelper.getStorePathIndex((String)this.brokerController.getMessageStoreConfig().getStorePathRootDir());
        double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent((String)storePathIndex);
        return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio);
    }

    private RemotingCommand sendMessage(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext mqtraceContext, SendMessageRequestHeader requestHeader) throws RemotingCommandException {
        String traFlag;
        RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
        SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
        response.setOpaque(request.getOpaque());
        if (log.isDebugEnabled()) {
            log.debug("receive SendMessage request command, " + request);
        }
        response.setCode(-1);
        super.msgCheck(ctx, requestHeader, response);
        if (response.getCode() != -1) {
            return response;
        }
        byte[] body = request.getBody();
        int queueIdInt = requestHeader.getQueueId();
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
        if (queueIdInt < 0) {
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
        }
        int sysFlag = requestHeader.getSysFlag();
        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
            sysFlag |= 2;
        }
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(requestHeader.getTopic());
        msgInner.setBody(body);
        msgInner.setFlag(requestHeader.getFlag().intValue());
        MessageAccessor.setProperties((Message)msgInner, (Map)MessageDecoder.string2messageProperties((String)requestHeader.getProperties()));
        msgInner.setPropertiesString(requestHeader.getProperties());
        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode((TopicFilterType)topicConfig.getTopicFilterType(), (String)msgInner.getTags()));
        msgInner.setQueueId(queueIdInt);
        msgInner.setSysFlag(sysFlag);
        msgInner.setBornTimestamp(requestHeader.getBornTimestamp().longValue());
        msgInner.setBornHost(ctx.channel().remoteAddress());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage() && (traFlag = msgInner.getProperty("TRAN_MSG")) != null) {
            response.setCode(16);
            response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
            return response;
        }
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        if (putMessageResult != null) {
            boolean sendOK = false;
            switch (putMessageResult.getPutMessageStatus()) {
                case PUT_OK: {
                    sendOK = true;
                    response.setCode(0);
                    break;
                }
                case FLUSH_DISK_TIMEOUT: {
                    response.setCode(10);
                    sendOK = true;
                    break;
                }
                case FLUSH_SLAVE_TIMEOUT: {
                    response.setCode(12);
                    sendOK = true;
                    break;
                }
                case SLAVE_NOT_AVAILABLE: {
                    response.setCode(11);
                    sendOK = true;
                    break;
                }
                case CREATE_MAPEDFILE_FAILED: {
                    response.setCode(1);
                    response.setRemark("create maped file failed, please make sure OS and JDK both 64bit.");
                    break;
                }
                case MESSAGE_ILLEGAL: {
                    response.setCode(13);
                    response.setRemark("the message is illegal, maybe length not matched.");
                    break;
                }
                case SERVICE_NOT_AVAILABLE: {
                    response.setCode(14);
                    response.setRemark("service not available now, maybe disk full, " + this.diskUtil() + ", maybe your broker machine memory too small.");
                    break;
                }
                case UNKNOWN_ERROR: {
                    response.setCode(1);
                    response.setRemark("UNKNOWN_ERROR");
                    break;
                }
                default: {
                    response.setCode(1);
                    response.setRemark("UNKNOWN_ERROR DEFAULT");
                }
            }
            if (sendOK) {
                this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
                this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
                this.brokerController.getBrokerStatsManager().incBrokerPutNums();
                response.setRemark(null);
                responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
                responseHeader.setQueueId(Integer.valueOf(queueIdInt));
                responseHeader.setQueueOffset(Long.valueOf(putMessageResult.getAppendMessageResult().getLogicsOffset()));
                this.doResponse(ctx, request, response);
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    this.brokerController.getPullRequestHoldService().notifyMessageArriving(requestHeader.getTopic(), queueIdInt, putMessageResult.getAppendMessageResult().getLogicsOffset() + 1L);
                }
                if (this.hasSendMessageHook()) {
                    mqtraceContext.setMsgId(responseHeader.getMsgId());
                    mqtraceContext.setQueueId(responseHeader.getQueueId());
                    mqtraceContext.setQueueOffset(responseHeader.getQueueOffset());
                }
                return null;
            }
        } else {
            response.setCode(1);
            response.setRemark("store putMessage return null");
        }
        return response;
    }

    @Override
    public SocketAddress getStoreHost() {
        return this.storeHost;
    }

    @Override
    public boolean hasSendMessageHook() {
        return this.sendMessageHookList != null && !this.sendMessageHookList.isEmpty();
    }

    @Override
    public void registerSendMessageHook(List<SendMessageHook> sendMessageHookList) {
        this.sendMessageHookList = sendMessageHookList;
    }

    public boolean hasConsumeMessageHook() {
        return this.consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
    }

    public void registerConsumeMessageHook(List<ConsumeMessageHook> consumeMessageHookList) {
        this.consumeMessageHookList = consumeMessageHookList;
    }

    public void executeConsumeMessageHookAfter(ConsumeMessageContext context) {
        if (this.hasConsumeMessageHook()) {
            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
                try {
                    hook.consumeMessageAfter(context);
                }
                catch (Throwable e) {}
            }
        }
    }
}

