/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.rocketmq.broker.longpolling;

import com.alibaba.rocketmq.broker.BrokerController;
import com.alibaba.rocketmq.broker.longpolling.ManyPullRequest;
import com.alibaba.rocketmq.broker.longpolling.PullRequest;
import com.alibaba.rocketmq.common.ServiceThread;
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PullRequestHoldService
extends ServiceThread {
    private static final Logger log = LoggerFactory.getLogger((String)"RocketmqBroker");
    private static final String TOPIC_QUEUEID_SEPARATOR = "@";
    private ConcurrentHashMap<String, ManyPullRequest> pullRequestTable = new ConcurrentHashMap(1024);
    private final BrokerController brokerController;

    public PullRequestHoldService(BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    private String buildKey(String topic, int queueId) {
        StringBuilder sb = new StringBuilder();
        sb.append(topic);
        sb.append(TOPIC_QUEUEID_SEPARATOR);
        sb.append(queueId);
        return sb.toString();
    }

    public void suspendPullRequest(String topic, int queueId, PullRequest pullRequest) {
        ManyPullRequest prev;
        String key = this.buildKey(topic, queueId);
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (null == mpr && (prev = this.pullRequestTable.putIfAbsent(key, mpr = new ManyPullRequest())) != null) {
            mpr = prev;
        }
        mpr.addPullRequest(pullRequest);
    }

    private void checkHoldRequest() {
        for (String key : this.pullRequestTable.keySet()) {
            String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
            if (kArray == null || 2 != kArray.length) continue;
            String topic = kArray[0];
            int queueId = Integer.parseInt(kArray[1]);
            long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
            this.notifyMessageArriving(topic, queueId, offset);
        }
    }

    public void notifyMessageArriving(String topic, int queueId, long maxOffset) {
        List<PullRequest> requestList;
        String key = this.buildKey(topic, queueId);
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (mpr != null && (requestList = mpr.cloneListAndClear()) != null) {
            ArrayList<PullRequest> replayList = new ArrayList<PullRequest>();
            for (PullRequest request : requestList) {
                if (maxOffset > request.getPullFromThisOffset()) {
                    try {
                        this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand());
                    }
                    catch (RemotingCommandException e) {
                        log.error("", (Throwable)e);
                    }
                    continue;
                }
                long newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
                if (newestOffset > request.getPullFromThisOffset()) {
                    try {
                        this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand());
                    }
                    catch (RemotingCommandException e) {
                        log.error("", (Throwable)e);
                    }
                    continue;
                }
                if (System.currentTimeMillis() >= request.getSuspendTimestamp() + request.getTimeoutMillis()) {
                    try {
                        this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand());
                    }
                    catch (RemotingCommandException e) {
                        log.error("", (Throwable)e);
                    }
                    continue;
                }
                replayList.add(request);
            }
            if (!replayList.isEmpty()) {
                mpr.addPullRequest(replayList);
            }
        }
    }

    public void run() {
        log.info(this.getServiceName() + " service started");
        while (!this.isStoped()) {
            try {
                this.waitForRunning(1000L);
                this.checkHoldRequest();
            }
            catch (Exception e) {
                log.warn(this.getServiceName() + " service has exception. ", (Throwable)e);
            }
        }
        log.info(this.getServiceName() + " service end");
    }

    public String getServiceName() {
        return PullRequestHoldService.class.getSimpleName();
    }
}

