/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.rocketmq.broker.client.net;

import com.alibaba.rocketmq.broker.BrokerController;
import com.alibaba.rocketmq.broker.client.ClientChannelInfo;
import com.alibaba.rocketmq.broker.client.ConsumerGroupInfo;
import com.alibaba.rocketmq.broker.pagecache.OneMessageTransfer;
import com.alibaba.rocketmq.common.MQVersion;
import com.alibaba.rocketmq.common.TopicConfig;
import com.alibaba.rocketmq.common.UtilAll;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.common.protocol.body.GetConsumerStatusBody;
import com.alibaba.rocketmq.common.protocol.body.ResetOffsetBody;
import com.alibaba.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
import com.alibaba.rocketmq.remoting.CommandCustomHeader;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
import com.alibaba.rocketmq.store.SelectMapedBufferResult;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Broker2Client {
    private static final Logger log = LoggerFactory.getLogger((String)"RocketmqBroker");
    private final BrokerController brokerController;

    public Broker2Client(BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    public void checkProducerTransactionState(Channel channel, CheckTransactionStateRequestHeader requestHeader, final SelectMapedBufferResult selectMapedBufferResult) {
        RemotingCommand request = RemotingCommand.createRequestCommand((int)39, (CommandCustomHeader)requestHeader);
        request.markOnewayRPC();
        try {
            OneMessageTransfer fileRegion = new OneMessageTransfer(request.encodeHeader(selectMapedBufferResult.getSize()), selectMapedBufferResult);
            channel.writeAndFlush((Object)fileRegion).addListener((GenericFutureListener)new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) throws Exception {
                    selectMapedBufferResult.release();
                    if (!future.isSuccess()) {
                        log.error("invokeProducer failed,", future.cause());
                    }
                }
            });
        }
        catch (Throwable e) {
            log.error("invokeProducer exception", e);
            selectMapedBufferResult.release();
        }
    }

    public RemotingCommand callClient(Channel channel, RemotingCommand request) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
        return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000L);
    }

    public void notifyConsumerIdsChanged(Channel channel, String consumerGroup) {
        if (null == consumerGroup) {
            log.error("notifyConsumerIdsChanged consumerGroup is null");
            return;
        }
        NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
        requestHeader.setConsumerGroup(consumerGroup);
        RemotingCommand request = RemotingCommand.createRequestCommand((int)40, (CommandCustomHeader)requestHeader);
        try {
            this.brokerController.getRemotingServer().invokeOneway(channel, request, 10L);
        }
        catch (Exception e) {
            log.error("notifyConsumerIdsChanged exception, " + consumerGroup, (Throwable)e);
        }
    }

    public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce) {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
        if (null == topicConfig) {
            log.error("[reset-offset] reset offset failed, no topic in this broker. topic={}", (Object)topic);
            response.setCode(1);
            response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic);
            return response;
        }
        HashMap<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
        for (int i = 0; i < topicConfig.getWriteQueueNums(); ++i) {
            MessageQueue mq = new MessageQueue();
            mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
            mq.setTopic(topic);
            mq.setQueueId(i);
            long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
            if (-1L == consumerOffset) {
                response.setCode(1);
                response.setRemark(String.format("THe consumer group <%s> not exist", group));
                return response;
            }
            long timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
            if (isForce || timeStampOffset < consumerOffset) {
                offsetTable.put(mq, timeStampOffset);
                continue;
            }
            offsetTable.put(mq, consumerOffset);
        }
        ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
        requestHeader.setTopic(topic);
        requestHeader.setGroup(group);
        requestHeader.setTimestamp(timeStamp);
        RemotingCommand request = RemotingCommand.createRequestCommand((int)220, (CommandCustomHeader)requestHeader);
        ResetOffsetBody body = new ResetOffsetBody();
        body.setOffsetTable(offsetTable);
        request.setBody(body.encode());
        ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
        if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
            ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = consumerGroupInfo.getChannelInfoTable();
            for (Channel channel : channelInfoTable.keySet()) {
                int version = channelInfoTable.get(channel).getVersion();
                if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
                    try {
                        this.brokerController.getRemotingServer().invokeOneway(channel, request, 5000L);
                        log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}", new Object[]{topic, group, channelInfoTable.get(channel).getClientId()});
                    }
                    catch (Exception e) {
                        log.error("[reset-offset] reset offset exception. topic={}, group={}", (Object)new Object[]{topic, group}, (Object)e);
                    }
                    continue;
                }
                response.setCode(1);
                response.setRemark("the client does not support this feature. version=" + MQVersion.getVersionDesc((int)version));
                log.warn("[reset-offset] the client does not support this feature. version={}", (Object)RemotingHelper.parseChannelRemoteAddr((Channel)channel), (Object)MQVersion.getVersionDesc((int)version));
                return response;
            }
        } else {
            String errorInfo = String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d", requestHeader.getGroup(), requestHeader.getTopic(), requestHeader.getTimestamp());
            log.error(errorInfo);
            response.setCode(206);
            response.setRemark(errorInfo);
            return response;
        }
        response.setCode(0);
        ResetOffsetBody resBody = new ResetOffsetBody();
        resBody.setOffsetTable(offsetTable);
        response.setBody(resBody.encode());
        return response;
    }

    public RemotingCommand getConsumeStatus(String topic, String group, String originClientId) {
        RemotingCommand result = RemotingCommand.createResponseCommand(null);
        GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader();
        requestHeader.setTopic(topic);
        requestHeader.setGroup(group);
        RemotingCommand request = RemotingCommand.createRequestCommand((int)221, (CommandCustomHeader)requestHeader);
        HashMap<String, Map> consumerStatusTable = new HashMap<String, Map>();
        ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable();
        if (null == channelInfoTable || channelInfoTable.isEmpty()) {
            result.setCode(1);
            result.setRemark(String.format("No Any Consumer online in the consumer group: [%s]", group));
            return result;
        }
        for (Channel channel : channelInfoTable.keySet()) {
            int version = channelInfoTable.get(channel).getVersion();
            String clientId = channelInfoTable.get(channel).getClientId();
            if (version < MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
                result.setCode(1);
                result.setRemark("the client does not support this feature. version=" + MQVersion.getVersionDesc((int)version));
                log.warn("[get-consumer-status] the client does not support this feature. version={}", (Object)RemotingHelper.parseChannelRemoteAddr((Channel)channel), (Object)MQVersion.getVersionDesc((int)version));
                return result;
            }
            if (!UtilAll.isBlank((String)originClientId) && !originClientId.equals(clientId)) continue;
            try {
                RemotingCommand response = this.brokerController.getRemotingServer().invokeSync(channel, request, 5000L);
                assert (response != null);
                switch (response.getCode()) {
                    case 0: {
                        if (response.getBody() == null) break;
                        GetConsumerStatusBody body = (GetConsumerStatusBody)GetConsumerStatusBody.decode((byte[])response.getBody(), GetConsumerStatusBody.class);
                        consumerStatusTable.put(clientId, body.getMessageQueueTable());
                        log.info("[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}", new Object[]{topic, group, clientId});
                    }
                }
            }
            catch (Exception e) {
                log.error("[get-consumer-status] get consumer status exception. topic={}, group={}, offset={}", (Object)new Object[]{topic, group}, (Object)e);
            }
            if (UtilAll.isBlank((String)originClientId) || !originClientId.equals(clientId)) continue;
            break;
        }
        result.setCode(0);
        GetConsumerStatusBody resBody = new GetConsumerStatusBody();
        resBody.setConsumerTable(consumerStatusTable);
        result.setBody(resBody.encode());
        return result;
    }
}

