/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.rocketmq.broker.processor;

import com.alibaba.rocketmq.broker.BrokerController;
import com.alibaba.rocketmq.broker.mqtrace.SendMessageContext;
import com.alibaba.rocketmq.broker.mqtrace.SendMessageHook;
import com.alibaba.rocketmq.common.TopicConfig;
import com.alibaba.rocketmq.common.TopicFilterType;
import com.alibaba.rocketmq.common.constant.PermName;
import com.alibaba.rocketmq.common.help.FAQUrl;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageAccessor;
import com.alibaba.rocketmq.common.message.MessageDecoder;
import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import com.alibaba.rocketmq.common.protocol.header.SendMessageResponseHeader;
import com.alibaba.rocketmq.common.sysflag.TopicSysFlag;
import com.alibaba.rocketmq.common.utils.ChannelUtil;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
import com.alibaba.rocketmq.store.MessageExtBrokerInner;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractSendMessageProcessor
implements NettyRequestProcessor {
    protected static final Logger log = LoggerFactory.getLogger((String)"RocketmqBroker");
    protected static final int DLQ_NUMS_PER_GROUP = 1;
    protected final BrokerController brokerController;
    protected final Random random = new Random(System.currentTimeMillis());
    protected final SocketAddress storeHost;
    private List<SendMessageHook> sendMessageHookList;

    public AbstractSendMessageProcessor(BrokerController brokerController) {
        this.brokerController = brokerController;
        this.storeHost = new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController.getNettyServerConfig().getListenPort());
    }

    protected SendMessageContext buildMsgContext(ChannelHandlerContext ctx, SendMessageRequestHeader requestHeader) {
        if (!this.hasSendMessageHook()) {
            return null;
        }
        SendMessageContext mqtraceContext = new SendMessageContext();
        mqtraceContext.setProducerGroup(requestHeader.getProducerGroup());
        mqtraceContext.setTopic(requestHeader.getTopic());
        mqtraceContext.setMsgProps(requestHeader.getProperties());
        mqtraceContext.setBornHost(RemotingHelper.parseChannelRemoteAddr((Channel)ctx.channel()));
        mqtraceContext.setBrokerAddr(this.brokerController.getBrokerAddr());
        return mqtraceContext;
    }

    protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request) throws RemotingCommandException {
        SendMessageRequestHeaderV2 requestHeaderV2 = null;
        SendMessageRequestHeader requestHeader = null;
        switch (request.getCode()) {
            case 310: {
                requestHeaderV2 = (SendMessageRequestHeaderV2)request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
            }
            case 10: {
                requestHeader = null == requestHeaderV2 ? (SendMessageRequestHeader)request.decodeCommandCustomHeader(SendMessageRequestHeader.class) : SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1((SendMessageRequestHeaderV2)requestHeaderV2);
            }
        }
        return requestHeader;
    }

    protected MessageExtBrokerInner buildInnerMsg(ChannelHandlerContext ctx, SendMessageRequestHeader requestHeader, byte[] body, TopicConfig topicConfig) {
        int queueIdInt = requestHeader.getQueueId();
        if (queueIdInt < 0) {
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
        }
        int sysFlag = requestHeader.getSysFlag();
        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
            sysFlag |= 2;
        }
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(requestHeader.getTopic());
        msgInner.setBody(body);
        msgInner.setFlag(requestHeader.getFlag().intValue());
        MessageAccessor.setProperties((Message)msgInner, (Map)MessageDecoder.string2messageProperties((String)requestHeader.getProperties()));
        msgInner.setPropertiesString(requestHeader.getProperties());
        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode((TopicFilterType)topicConfig.getTopicFilterType(), (String)msgInner.getTags()));
        msgInner.setQueueId(queueIdInt);
        msgInner.setSysFlag(sysFlag);
        msgInner.setBornTimestamp(requestHeader.getBornTimestamp().longValue());
        msgInner.setBornHost(ctx.channel().remoteAddress());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
        return msgInner;
    }

    protected RemotingCommand msgContentCheck(ChannelHandlerContext ctx, SendMessageRequestHeader requestHeader, RemotingCommand request, RemotingCommand response) {
        if (requestHeader.getTopic().length() > 127) {
            log.warn("putMessage message topic length too long " + requestHeader.getTopic().length());
            response.setCode(13);
            return response;
        }
        if (requestHeader.getProperties() != null && requestHeader.getProperties().length() > Short.MAX_VALUE) {
            log.warn("putMessage message properties length too long " + requestHeader.getProperties().length());
            response.setCode(13);
            return response;
        }
        if (request.getBody().length > 0x4B40000) {
            log.warn(" topic {}  msg body size {}  from {}", new Object[]{requestHeader.getTopic(), request.getBody().length, ChannelUtil.getRemoteIp((Channel)ctx.channel())});
            response.setRemark("msg body must be less 64KB");
            response.setCode(13);
            return response;
        }
        return response;
    }

    protected RemotingCommand msgCheck(ChannelHandlerContext ctx, SendMessageRequestHeader requestHeader, RemotingCommand response) {
        int idValid;
        int queueIdInt;
        if (!PermName.isWriteable((int)this.brokerController.getBrokerConfig().getBrokerPermission()) && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
            response.setCode(16);
            response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
            return response;
        }
        if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
            String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
            log.warn(errorMsg);
            response.setCode(1);
            response.setRemark(errorMsg);
            return response;
        }
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
        if (null == topicConfig) {
            int topicSysFlag = 0;
            if (requestHeader.isUnitMode()) {
                topicSysFlag = requestHeader.getTopic().startsWith("%RETRY%") ? TopicSysFlag.buildSysFlag((boolean)false, (boolean)true) : TopicSysFlag.buildSysFlag((boolean)true, (boolean)false);
            }
            log.warn("the topic " + requestHeader.getTopic() + " not exist, producer: " + ctx.channel().remoteAddress());
            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(requestHeader.getTopic(), requestHeader.getDefaultTopic(), RemotingHelper.parseChannelRemoteAddr((Channel)ctx.channel()), requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
            if (null == topicConfig && requestHeader.getTopic().startsWith("%RETRY%")) {
                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(requestHeader.getTopic(), 1, 6, topicSysFlag);
            }
            if (null == topicConfig) {
                response.setCode(17);
                response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo((String)"https://github.com/alibaba/RocketMQ/issues/38"));
                return response;
            }
        }
        if ((queueIdInt = requestHeader.getQueueId().intValue()) >= (idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums()))) {
            String errorInfo = String.format("request queueId[%d] is illagal, %s Producer: %s", queueIdInt, topicConfig.toString(), RemotingHelper.parseChannelRemoteAddr((Channel)ctx.channel()));
            log.warn(errorInfo);
            response.setCode(1);
            response.setRemark(errorInfo);
            return response;
        }
        return response;
    }

    public SocketAddress getStoreHost() {
        return this.storeHost;
    }

    public boolean hasSendMessageHook() {
        return this.sendMessageHookList != null && !this.sendMessageHookList.isEmpty();
    }

    public void registerSendMessageHook(List<SendMessageHook> sendMessageHookList) {
        this.sendMessageHookList = sendMessageHookList;
    }

    protected void doResponse(ChannelHandlerContext ctx, RemotingCommand request, RemotingCommand response) {
        if (!request.isOnewayRPC()) {
            try {
                ctx.writeAndFlush((Object)response);
            }
            catch (Throwable e) {
                log.error("SendMessageProcessor process request over, but response failed", e);
                log.error(request.toString());
                log.error(response.toString());
            }
        }
    }

    public void executeSendMessageHookBefore(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext context) {
        if (this.hasSendMessageHook()) {
            for (SendMessageHook hook : this.sendMessageHookList) {
                try {
                    SendMessageRequestHeader requestHeader = (SendMessageRequestHeader)request.decodeCommandCustomHeader(SendMessageRequestHeader.class);
                    context.setProducerGroup(requestHeader.getProducerGroup());
                    context.setTopic(requestHeader.getTopic());
                    context.setBodyLength(request.getBody().length);
                    context.setMsgProps(requestHeader.getProperties());
                    context.setBornHost(RemotingHelper.parseChannelRemoteAddr((Channel)ctx.channel()));
                    context.setBrokerAddr(this.brokerController.getBrokerAddr());
                    context.setQueueId(requestHeader.getQueueId());
                    hook.sendMessageBefore(context);
                    requestHeader.setProperties(context.getMsgProps());
                }
                catch (Throwable e) {}
            }
        }
    }

    public void executeSendMessageHookAfter(RemotingCommand response, SendMessageContext context) {
        if (this.hasSendMessageHook()) {
            for (SendMessageHook hook : this.sendMessageHookList) {
                try {
                    if (response != null) {
                        SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
                        context.setMsgId(responseHeader.getMsgId());
                        context.setQueueId(responseHeader.getQueueId());
                        context.setQueueOffset(responseHeader.getQueueOffset());
                        context.setCode(response.getCode());
                        context.setErrorMsg(response.getRemark());
                    }
                    hook.sendMessageAfter(context);
                }
                catch (Throwable e) {}
            }
        }
    }
}

