package com.alibaba.rocketmq.filtersrv.processor;

import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.PullCallback;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.consumer.PullStatus;
import com.alibaba.rocketmq.common.UtilAll;
import com.alibaba.rocketmq.common.message.MessageDecoder;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.common.protocol.header.PullMessageRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.PullMessageResponseHeader;
import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader;
import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
import com.alibaba.rocketmq.filtersrv.FiltersrvController;
import com.alibaba.rocketmq.filtersrv.filter.FilterClassInfo;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/rocketmq/filtersrv/processor/DefaultRequestProcessor.class */
public class DefaultRequestProcessor implements NettyRequestProcessor {
    private static final Logger log = LoggerFactory.getLogger("RocketmqFiltersrv");
    private final FiltersrvController filtersrvController;

    /* renamed from: com.alibaba.rocketmq.filtersrv.processor.DefaultRequestProcessor$3, reason: invalid class name */
    /* loaded from: input_file:com/alibaba/rocketmq/filtersrv/processor/DefaultRequestProcessor$3.class */
    static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$com$alibaba$rocketmq$client$consumer$PullStatus = new int[PullStatus.values().length];

        static {
            try {
                $SwitchMap$com$alibaba$rocketmq$client$consumer$PullStatus[PullStatus.FOUND.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$alibaba$rocketmq$client$consumer$PullStatus[PullStatus.NO_MATCHED_MSG.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$alibaba$rocketmq$client$consumer$PullStatus[PullStatus.NO_NEW_MSG.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$alibaba$rocketmq$client$consumer$PullStatus[PullStatus.OFFSET_ILLEGAL.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public DefaultRequestProcessor(FiltersrvController filtersrvController) {
        this.filtersrvController = filtersrvController;
    }

    public RemotingCommand processRequest(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("receive request, {} {} {}", new Object[]{Integer.valueOf(remotingCommand.getCode()), RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel()), remotingCommand});
        }
        switch (remotingCommand.getCode()) {
            case 11:
                return pullMessageForward(channelHandlerContext, remotingCommand);
            case 302:
                return registerMessageFilterClass(channelHandlerContext, remotingCommand);
            default:
                return null;
        }
    }

    private ByteBuffer messageToByteBuffer(MessageExt messageExt) throws IOException {
        byte[] compress;
        int clearCompressedFlag = MessageSysFlag.clearCompressedFlag(messageExt.getSysFlag());
        if (messageExt.getBody() != null && messageExt.getBody().length >= this.filtersrvController.getFiltersrvConfig().getCompressMsgBodyOverHowmuch() && (compress = UtilAll.compress(messageExt.getBody(), this.filtersrvController.getFiltersrvConfig().getZipCompressLevel())) != null) {
            messageExt.setBody(compress);
            clearCompressedFlag |= 1;
        }
        int length = messageExt.getBody() != null ? messageExt.getBody().length : 0;
        byte[] bytes = messageExt.getTopic().getBytes("UTF-8");
        int length2 = bytes.length;
        byte[] bytes2 = MessageDecoder.messageProperties2String(messageExt.getProperties()).getBytes("UTF-8");
        int length3 = bytes2.length;
        int i = 88 + length + 1 + length2 + 2 + length3 + 0;
        ByteBuffer allocate = ByteBuffer.allocate(i);
        allocate.putInt(i);
        allocate.putInt(-626843481);
        allocate.putInt(UtilAll.crc32(messageExt.getBody()));
        allocate.putInt(messageExt.getQueueId());
        allocate.putInt(messageExt.getFlag());
        allocate.putLong(messageExt.getQueueOffset());
        allocate.putLong(messageExt.getCommitLogOffset());
        allocate.putInt(clearCompressedFlag);
        allocate.putLong(messageExt.getBornTimestamp());
        allocate.put(messageExt.getBornHostBytes());
        allocate.putLong(messageExt.getStoreTimestamp());
        allocate.put(messageExt.getStoreHostBytes());
        allocate.putInt(messageExt.getReconsumeTimes());
        allocate.putLong(messageExt.getPreparedTransactionOffset());
        allocate.putInt(length);
        if (length > 0) {
            allocate.put(messageExt.getBody());
        }
        allocate.put((byte) length2);
        allocate.put(bytes);
        allocate.putShort((short) length3);
        if (length3 > 0) {
            allocate.put(bytes2);
        }
        return allocate;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void returnResponse(String str, String str2, ChannelHandlerContext channelHandlerContext, final RemotingCommand remotingCommand, List<MessageExt> list) {
        if (null != list) {
            ByteBuffer[] byteBufferArr = new ByteBuffer[list.size()];
            int i = 0;
            for (int i2 = 0; i2 < list.size(); i2++) {
                try {
                    byteBufferArr[i2] = messageToByteBuffer(list.get(i2));
                    i += byteBufferArr[i2].capacity();
                } catch (Exception e) {
                    log.error("messageToByteBuffer UnsupportedEncodingException", e);
                }
            }
            ByteBuffer allocate = ByteBuffer.allocate(i);
            for (ByteBuffer byteBuffer : byteBufferArr) {
                byteBuffer.flip();
                allocate.put(byteBuffer);
            }
            remotingCommand.setBody(allocate.array());
            this.filtersrvController.getFilterServerStatsManager().incGroupGetNums(str, str2, list.size());
            this.filtersrvController.getFilterServerStatsManager().incGroupGetSize(str, str2, i);
        }
        try {
            channelHandlerContext.writeAndFlush(remotingCommand).addListener(new ChannelFutureListener() { // from class: com.alibaba.rocketmq.filtersrv.processor.DefaultRequestProcessor.1
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (channelFuture.isSuccess()) {
                        return;
                    }
                    DefaultRequestProcessor.log.error("FilterServer response to " + channelFuture.channel().remoteAddress() + " failed", channelFuture.cause());
                    DefaultRequestProcessor.log.error(remotingCommand.toString());
                }
            });
        } catch (Throwable th) {
            log.error("FilterServer process request over, but response failed", th);
            log.error(remotingCommand.toString());
        }
    }

    private RemotingCommand pullMessageForward(final ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws Exception {
        final RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
        final PullMessageResponseHeader readCustomHeader = createResponseCommand.readCustomHeader();
        final PullMessageRequestHeader decodeCommandCustomHeader = remotingCommand.decodeCommandCustomHeader(PullMessageRequestHeader.class);
        createResponseCommand.setOpaque(remotingCommand.getOpaque());
        DefaultMQPullConsumer defaultMQPullConsumer = this.filtersrvController.getDefaultMQPullConsumer();
        final FilterClassInfo findFilterClass = this.filtersrvController.getFilterClassManager().findFilterClass(decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic());
        if (null == findFilterClass) {
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark("Find Filter class failed, not registered");
            return createResponseCommand;
        }
        if (null == findFilterClass.getMessageFilter()) {
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark("Find Filter class failed, registered but no class");
            return createResponseCommand;
        }
        readCustomHeader.setSuggestWhichBrokerId(0L);
        MessageQueue messageQueue = new MessageQueue();
        messageQueue.setTopic(decodeCommandCustomHeader.getTopic());
        messageQueue.setQueueId(decodeCommandCustomHeader.getQueueId().intValue());
        messageQueue.setBrokerName(this.filtersrvController.getBrokerName());
        defaultMQPullConsumer.pullBlockIfNotFound(messageQueue, (String) null, decodeCommandCustomHeader.getQueueOffset().longValue(), decodeCommandCustomHeader.getMaxMsgNums().intValue(), new PullCallback() { // from class: com.alibaba.rocketmq.filtersrv.processor.DefaultRequestProcessor.2
            public void onSuccess(PullResult pullResult) {
                readCustomHeader.setMaxOffset(Long.valueOf(pullResult.getMaxOffset()));
                readCustomHeader.setMinOffset(Long.valueOf(pullResult.getMinOffset()));
                readCustomHeader.setNextBeginOffset(Long.valueOf(pullResult.getNextBeginOffset()));
                createResponseCommand.setRemark((String) null);
                switch (AnonymousClass3.$SwitchMap$com$alibaba$rocketmq$client$consumer$PullStatus[pullResult.getPullStatus().ordinal()]) {
                    case 1:
                        createResponseCommand.setCode(0);
                        ArrayList arrayList = new ArrayList();
                        try {
                            for (MessageExt messageExt : pullResult.getMsgFoundList()) {
                                if (findFilterClass.getMessageFilter().match(messageExt)) {
                                    arrayList.add(messageExt);
                                }
                            }
                            if (arrayList.isEmpty()) {
                                createResponseCommand.setCode(20);
                                break;
                            } else {
                                DefaultRequestProcessor.this.returnResponse(decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), channelHandlerContext, createResponseCommand, arrayList);
                                return;
                            }
                        } catch (Throwable th) {
                            String format = String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ", decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic());
                            DefaultRequestProcessor.log.error(format, th);
                            createResponseCommand.setCode(1);
                            createResponseCommand.setRemark(format + RemotingHelper.exceptionSimpleDesc(th));
                            DefaultRequestProcessor.this.returnResponse(decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), channelHandlerContext, createResponseCommand, null);
                            return;
                        }
                    case 2:
                        createResponseCommand.setCode(20);
                        break;
                    case 3:
                        createResponseCommand.setCode(19);
                        break;
                    case 4:
                        createResponseCommand.setCode(21);
                        break;
                }
                DefaultRequestProcessor.this.returnResponse(decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), channelHandlerContext, createResponseCommand, null);
            }

            public void onException(Throwable th) {
                createResponseCommand.setCode(1);
                createResponseCommand.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(th));
                DefaultRequestProcessor.this.returnResponse(decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), channelHandlerContext, createResponseCommand, null);
            }
        });
        return null;
    }

    private RemotingCommand registerMessageFilterClass(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws RemotingCommandException {
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand((Class) null);
        RegisterMessageFilterClassRequestHeader decodeCommandCustomHeader = remotingCommand.decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class);
        try {
            if (!this.filtersrvController.getFilterClassManager().registerFilterClass(decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getClassName(), decodeCommandCustomHeader.getClassCRC().intValue(), remotingCommand.getBody())) {
                throw new Exception("registerFilterClass error");
            }
            createResponseCommand.setCode(0);
            createResponseCommand.setRemark((String) null);
            return createResponseCommand;
        } catch (Exception e) {
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark(RemotingHelper.exceptionSimpleDesc(e));
            return createResponseCommand;
        }
    }
}
