package com.alibaba.rocketmq.store;

import com.alibaba.rocketmq.common.ServiceThread;
import com.alibaba.rocketmq.common.SystemClock;
import com.alibaba.rocketmq.common.ThreadFactoryImpl;
import com.alibaba.rocketmq.common.UtilAll;
import com.alibaba.rocketmq.common.message.MessageDecoder;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
import com.alibaba.rocketmq.common.running.RunningStats;
import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
import com.alibaba.rocketmq.store.config.BrokerRole;
import com.alibaba.rocketmq.store.config.MessageStoreConfig;
import com.alibaba.rocketmq.store.config.StorePathConfigHelper;
import com.alibaba.rocketmq.store.ha.HAService;
import com.alibaba.rocketmq.store.index.IndexService;
import com.alibaba.rocketmq.store.index.QueryOffsetResult;
import com.alibaba.rocketmq.store.schedule.ScheduleMessageService;
import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
import java.io.File;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/rocketmq/store/DefaultMessageStore.class */
public class DefaultMessageStore implements MessageStore {
    private static final Logger log = LoggerFactory.getLogger("RocketmqStore");
    private final MessageStoreConfig messageStoreConfig;
    private final DispatchMessageService dispatchMessageService;
    private final ReputMessageService reputMessageService;
    private final ScheduleMessageService scheduleMessageService;
    private StoreCheckpoint storeCheckpoint;
    private final BrokerStatsManager brokerStatsManager;
    private final MessageFilter messageFilter = new DefaultMessageFilter();
    private final RunningFlags runningFlags = new RunningFlags();
    private final SystemClock systemClock = new SystemClock(1);
    private volatile boolean shutdown = true;
    private AtomicLong printTimes = new AtomicLong(0);
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
    private final AllocateMapedFileService allocateMapedFileService = new AllocateMapedFileService();
    private final CommitLog commitLog = new CommitLog(this);
    private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> consumeQueueTable = new ConcurrentHashMap<>(32);
    private final FlushConsumeQueueService flushConsumeQueueService = new FlushConsumeQueueService();
    private final CleanCommitLogService cleanCommitLogService = new CleanCommitLogService();
    private final CleanConsumeQueueService cleanConsumeQueueService = new CleanConsumeQueueService();
    private final StoreStatsService storeStatsService = new StoreStatsService();
    private final IndexService indexService = new IndexService(this);
    private final HAService haService = new HAService(this);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/rocketmq/store/DefaultMessageStore$CleanCommitLogService.class */
    public class CleanCommitLogService {
        private static final int MaxManualDeleteFileTimes = 20;
        private final double DiskSpaceWarningLevelRatio = Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90"));
        private final double DiskSpaceCleanForciblyRatio = Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85"));
        private long lastRedeleteTimestamp = 0;
        private volatile int manualDeleteFileSeveralTimes = 0;
        private volatile boolean cleanImmediately = false;

        CleanCommitLogService() {
        }

        public void excuteDeleteFilesManualy() {
            this.manualDeleteFileSeveralTimes = 20;
            DefaultMessageStore.log.info("excuteDeleteFilesManualy was invoked");
        }

        public void run() {
            try {
                deleteExpiredFiles();
                redeleteHangedFile();
            } catch (Exception e) {
                DefaultMessageStore.log.warn(getServiceName() + " service has exception. ", e);
            }
        }

        public String getServiceName() {
            return CleanCommitLogService.class.getSimpleName();
        }

        private void redeleteHangedFile() {
            int redeleteHangedFileInterval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval();
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastRedeleteTimestamp > redeleteHangedFileInterval) {
                this.lastRedeleteTimestamp = currentTimeMillis;
                if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly())) {
                }
            }
        }

        private void deleteExpiredFiles() {
            long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
            int deleteCommitLogFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
            int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
            boolean isTimeToDelete = isTimeToDelete();
            boolean isSpaceToDelete = isSpaceToDelete();
            boolean z = this.manualDeleteFileSeveralTimes > 0;
            if (isTimeToDelete || isSpaceToDelete || z) {
                if (z) {
                    this.manualDeleteFileSeveralTimes--;
                }
                boolean z2 = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
                DefaultMessageStore.log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", new Object[]{Long.valueOf(fileReservedTime), Boolean.valueOf(isTimeToDelete), Boolean.valueOf(isSpaceToDelete), Integer.valueOf(this.manualDeleteFileSeveralTimes), Boolean.valueOf(z2)});
                if (DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime * 3600000, deleteCommitLogFilesInterval, destroyMapedFileIntervalForcibly, z2) <= 0 && isSpaceToDelete) {
                    DefaultMessageStore.log.warn("disk space will be full soon, but delete file failed.");
                }
            }
        }

        private boolean isSpaceToDelete() {
            double diskMaxUsedSpaceRatio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0d;
            this.cleanImmediately = false;
            double diskPartitionSpaceUsedPercent = UtilAll.getDiskPartitionSpaceUsedPercent(DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog());
            if (diskPartitionSpaceUsedPercent > this.DiskSpaceWarningLevelRatio) {
                if (DefaultMessageStore.this.runningFlags.getAndMakeDiskFull()) {
                    DefaultMessageStore.log.error("physic disk maybe full soon " + diskPartitionSpaceUsedPercent + ", so mark disk full");
                    System.gc();
                }
                this.cleanImmediately = true;
            } else if (diskPartitionSpaceUsedPercent > this.DiskSpaceCleanForciblyRatio) {
                this.cleanImmediately = true;
            } else if (!DefaultMessageStore.this.runningFlags.getAndMakeDiskOK()) {
                DefaultMessageStore.log.info("physic disk space OK " + diskPartitionSpaceUsedPercent + ", so mark disk ok");
            }
            if (diskPartitionSpaceUsedPercent < 0.0d || diskPartitionSpaceUsedPercent > diskMaxUsedSpaceRatio) {
                DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + diskPartitionSpaceUsedPercent);
                return true;
            }
            double diskPartitionSpaceUsedPercent2 = UtilAll.getDiskPartitionSpaceUsedPercent(StorePathConfigHelper.getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir()));
            if (diskPartitionSpaceUsedPercent2 > this.DiskSpaceWarningLevelRatio) {
                if (DefaultMessageStore.this.runningFlags.getAndMakeDiskFull()) {
                    DefaultMessageStore.log.error("logics disk maybe full soon " + diskPartitionSpaceUsedPercent2 + ", so mark disk full");
                    System.gc();
                }
                this.cleanImmediately = true;
            } else if (diskPartitionSpaceUsedPercent2 > this.DiskSpaceCleanForciblyRatio) {
                this.cleanImmediately = true;
            } else if (!DefaultMessageStore.this.runningFlags.getAndMakeDiskOK()) {
                DefaultMessageStore.log.info("logics disk space OK " + diskPartitionSpaceUsedPercent2 + ", so mark disk ok");
            }
            if (diskPartitionSpaceUsedPercent2 >= 0.0d && diskPartitionSpaceUsedPercent2 <= diskMaxUsedSpaceRatio) {
                return false;
            }
            DefaultMessageStore.log.info("logics disk maybe full soon, so reclaim space, " + diskPartitionSpaceUsedPercent2);
            return true;
        }

        private boolean isTimeToDelete() {
            String deleteWhen = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
            if (!UtilAll.isItTimeToDo(deleteWhen)) {
                return false;
            }
            DefaultMessageStore.log.info("it's time to reclaim disk space, " + deleteWhen);
            return true;
        }

        public int getManualDeleteFileSeveralTimes() {
            return this.manualDeleteFileSeveralTimes;
        }

        public void setManualDeleteFileSeveralTimes(int i) {
            this.manualDeleteFileSeveralTimes = i;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/rocketmq/store/DefaultMessageStore$CleanConsumeQueueService.class */
    public class CleanConsumeQueueService {
        private long lastPhysicalMinOffset = 0;

        CleanConsumeQueueService() {
        }

        private void deleteExpiredFiles() {
            int deleteConsumeQueueFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
            long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
            if (minOffset > this.lastPhysicalMinOffset) {
                this.lastPhysicalMinOffset = minOffset;
                Iterator it = DefaultMessageStore.this.consumeQueueTable.values().iterator();
                while (it.hasNext()) {
                    Iterator it2 = ((ConcurrentHashMap) it.next()).values().iterator();
                    while (it2.hasNext()) {
                        if (((ConsumeQueue) it2.next()).deleteExpiredFile(minOffset) > 0 && deleteConsumeQueueFilesInterval > 0) {
                            try {
                                Thread.sleep(deleteConsumeQueueFilesInterval);
                            } catch (InterruptedException e) {
                            }
                        }
                    }
                }
                DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
            }
        }

        public void run() {
            try {
                deleteExpiredFiles();
            } catch (Exception e) {
                DefaultMessageStore.log.warn(getServiceName() + " service has exception. ", e);
            }
        }

        public String getServiceName() {
            return CleanConsumeQueueService.class.getSimpleName();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/rocketmq/store/DefaultMessageStore$DispatchMessageService.class */
    public class DispatchMessageService extends ServiceThread {
        private volatile List<DispatchRequest> requestsWrite;
        private volatile List<DispatchRequest> requestsRead;

        public DispatchMessageService(int i) {
            int i2 = (int) (i * 1.5d);
            this.requestsWrite = new ArrayList(i2);
            this.requestsRead = new ArrayList(i2);
        }

        public boolean hasRemainMessage() {
            List<DispatchRequest> list = this.requestsWrite;
            if (list != null && !list.isEmpty()) {
                return true;
            }
            List<DispatchRequest> list2 = this.requestsRead;
            return (list2 == null || list2.isEmpty()) ? false : true;
        }

        public void putRequest(DispatchRequest dispatchRequest) {
            int size;
            int putMsgIndexHightWater = DefaultMessageStore.this.getMessageStoreConfig().getPutMsgIndexHightWater();
            synchronized (this) {
                this.requestsWrite.add(dispatchRequest);
                size = this.requestsWrite.size();
                if (!this.hasNotified) {
                    this.hasNotified = true;
                    notify();
                }
            }
            DefaultMessageStore.this.getStoreStatsService().setDispatchMaxBuffer(size);
            if (size > putMsgIndexHightWater) {
                try {
                    if (DefaultMessageStore.log.isDebugEnabled()) {
                        DefaultMessageStore.log.debug("Message index buffer size " + size + " > high water " + putMsgIndexHightWater);
                    }
                    Thread.sleep(1L);
                } catch (InterruptedException e) {
                }
            }
        }

        private void swapRequests() {
            List<DispatchRequest> list = this.requestsWrite;
            this.requestsWrite = this.requestsRead;
            this.requestsRead = list;
        }

        private void doDispatch() {
            if (this.requestsRead.isEmpty()) {
                return;
            }
            for (DispatchRequest dispatchRequest : this.requestsRead) {
                switch (MessageSysFlag.getTransactionValue(dispatchRequest.getSysFlag())) {
                    case 0:
                    case 8:
                        DefaultMessageStore.this.putMessagePostionInfo(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getCommitLogOffset(), dispatchRequest.getMsgSize(), dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getConsumeQueueOffset());
                        break;
                }
            }
            if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {
                DefaultMessageStore.this.indexService.putRequest(this.requestsRead.toArray());
            }
            this.requestsRead.clear();
        }

        public void run() {
            DefaultMessageStore.log.info(getServiceName() + " service started");
            while (!isStoped()) {
                try {
                    waitForRunning(0L);
                    doDispatch();
                } catch (Exception e) {
                    DefaultMessageStore.log.warn(getServiceName() + " service has exception. ", e);
                }
            }
            try {
                Thread.sleep(5000L);
            } catch (InterruptedException e2) {
                DefaultMessageStore.log.warn("DispatchMessageService Exception, ", e2);
            }
            synchronized (this) {
                swapRequests();
            }
            doDispatch();
            DefaultMessageStore.log.info(getServiceName() + " service end");
        }

        protected void onWaitEnd() {
            swapRequests();
        }

        public String getServiceName() {
            return DispatchMessageService.class.getSimpleName();
        }
    }

    /* loaded from: input_file:com/alibaba/rocketmq/store/DefaultMessageStore$FlushConsumeQueueService.class */
    class FlushConsumeQueueService extends ServiceThread {
        private static final int RetryTimesOver = 3;
        private long lastFlushTimestamp = 0;

        FlushConsumeQueueService() {
        }

        private void doFlush(int i) {
            int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
            if (i == RetryTimesOver) {
                flushConsumeQueueLeastPages = 0;
            }
            long j = 0;
            int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis >= this.lastFlushTimestamp + flushConsumeQueueThoroughInterval) {
                this.lastFlushTimestamp = currentTimeMillis;
                flushConsumeQueueLeastPages = 0;
                j = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
            }
            Iterator it = DefaultMessageStore.this.consumeQueueTable.values().iterator();
            while (it.hasNext()) {
                for (ConsumeQueue consumeQueue : ((ConcurrentHashMap) it.next()).values()) {
                    boolean z = false;
                    for (int i2 = 0; i2 < i && !z; i2++) {
                        z = consumeQueue.commit(flushConsumeQueueLeastPages);
                    }
                }
            }
            if (0 == flushConsumeQueueLeastPages) {
                if (j > 0) {
                    DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(j);
                }
                DefaultMessageStore.this.getStoreCheckpoint().flush();
            }
        }

        public void run() {
            DefaultMessageStore.log.info(getServiceName() + " service started");
            while (!isStoped()) {
                try {
                    waitForRunning(DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue());
                    doFlush(1);
                } catch (Exception e) {
                    DefaultMessageStore.log.warn(getServiceName() + " service has exception. ", e);
                }
            }
            doFlush(RetryTimesOver);
            DefaultMessageStore.log.info(getServiceName() + " service end");
        }

        public String getServiceName() {
            return FlushConsumeQueueService.class.getSimpleName();
        }

        public long getJointime() {
            return 60000L;
        }
    }

    /* loaded from: input_file:com/alibaba/rocketmq/store/DefaultMessageStore$ReputMessageService.class */
    class ReputMessageService extends ServiceThread {
        private volatile long reputFromOffset = 0;

        ReputMessageService() {
        }

        public long getReputFromOffset() {
            return this.reputFromOffset;
        }

        public void setReputFromOffset(long j) {
            this.reputFromOffset = j;
        }

        private void doReput() {
            boolean z = true;
            while (z) {
                SelectMapedBufferResult data = DefaultMessageStore.this.commitLog.getData(this.reputFromOffset);
                if (data != null) {
                    try {
                        this.reputFromOffset = data.getStartOffset();
                        int i = 0;
                        while (i < data.getSize() && z) {
                            DispatchRequest checkMessageAndReturnSize = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(data.getByteBuffer(), false, false);
                            int msgSize = checkMessageAndReturnSize.getMsgSize();
                            if (msgSize > 0) {
                                DefaultMessageStore.this.putDispatchRequest(checkMessageAndReturnSize);
                                this.reputFromOffset += msgSize;
                                i += msgSize;
                                DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(checkMessageAndReturnSize.getTopic()).incrementAndGet();
                                DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(checkMessageAndReturnSize.getTopic()).addAndGet(checkMessageAndReturnSize.getMsgSize());
                            } else if (msgSize == -1) {
                                z = false;
                            } else if (msgSize == 0) {
                                this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                                i = data.getSize();
                            }
                        }
                    } finally {
                        data.release();
                    }
                } else {
                    z = false;
                }
            }
        }

        public void run() {
            DefaultMessageStore.log.info(getServiceName() + " service started");
            while (!isStoped()) {
                try {
                    waitForRunning(1000L);
                    doReput();
                } catch (Exception e) {
                    DefaultMessageStore.log.warn(getServiceName() + " service has exception. ", e);
                }
            }
            DefaultMessageStore.log.info(getServiceName() + " service end");
        }

        public String getServiceName() {
            return ReputMessageService.class.getSimpleName();
        }
    }

    public DefaultMessageStore(MessageStoreConfig messageStoreConfig, BrokerStatsManager brokerStatsManager) throws IOException {
        this.messageStoreConfig = messageStoreConfig;
        this.brokerStatsManager = brokerStatsManager;
        this.dispatchMessageService = new DispatchMessageService(this.messageStoreConfig.getPutMsgIndexHightWater());
        switch (this.messageStoreConfig.getBrokerRole()) {
            case SLAVE:
                this.reputMessageService = new ReputMessageService();
                this.scheduleMessageService = new ScheduleMessageService(this);
                break;
            case ASYNC_MASTER:
            case SYNC_MASTER:
                this.reputMessageService = null;
                this.scheduleMessageService = new ScheduleMessageService(this);
                break;
            default:
                this.reputMessageService = null;
                this.scheduleMessageService = null;
                break;
        }
        this.allocateMapedFileService.start();
        this.dispatchMessageService.start();
        this.indexService.start();
    }

    public void truncateDirtyLogicFiles(long j) {
        Iterator<ConcurrentHashMap<Integer, ConsumeQueue>> it = this.consumeQueueTable.values().iterator();
        while (it.hasNext()) {
            Iterator<ConsumeQueue> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                it2.next().truncateDirtyLogicFiles(j);
            }
        }
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public boolean load() {
        boolean z;
        boolean z2 = true;
        try {
            boolean z3 = !isTempFileExist();
            log.info("last shutdown {}", z3 ? "normally" : "abnormally");
            if (null != this.scheduleMessageService) {
                z2 = 1 != 0 && this.scheduleMessageService.load();
            }
            z = (z2 && this.commitLog.load()) && loadConsumeQueue();
            if (z) {
                this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
                this.indexService.load(z3);
                recover(z3);
                log.info("load over, and the max phy offset = {}", Long.valueOf(getMaxPhyOffset()));
            }
        } catch (Exception e) {
            log.error("load exception", e);
            z = false;
        }
        if (!z) {
            this.allocateMapedFileService.shutdown();
        }
        return z;
    }

    private void addScheduleTask() {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: com.alibaba.rocketmq.store.DefaultMessageStore.1
            @Override // java.lang.Runnable
            public void run() {
                DefaultMessageStore.this.cleanFilesPeriodically();
            }
        }, 60000L, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanFilesPeriodically() {
        this.cleanCommitLogService.run();
        this.cleanConsumeQueueService.run();
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public void cleanExpiredConsumerQueue() {
        long minOffset = this.commitLog.getMinOffset();
        Iterator<Map.Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>> next = it.next();
            String key = next.getKey();
            if (!key.equals(ScheduleMessageService.SCHEDULE_TOPIC)) {
                ConcurrentHashMap<Integer, ConsumeQueue> value = next.getValue();
                Iterator<Map.Entry<Integer, ConsumeQueue>> it2 = value.entrySet().iterator();
                while (it2.hasNext()) {
                    Map.Entry<Integer, ConsumeQueue> next2 = it2.next();
                    long lastOffset = next2.getValue().getLastOffset();
                    if (lastOffset == -1) {
                        log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.", new Object[]{next2.getValue().getTopic(), Integer.valueOf(next2.getValue().getQueueId()), Long.valueOf(next2.getValue().getMaxPhysicOffset()), Long.valueOf(next2.getValue().getMinLogicOffset())});
                    } else if (lastOffset < minOffset) {
                        log.info("cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", new Object[]{key, next2.getKey(), Long.valueOf(minOffset), Long.valueOf(lastOffset)});
                        this.commitLog.removeQueurFromTopicQueueTable(next2.getValue().getTopic(), next2.getValue().getQueueId());
                        next2.getValue().destroy();
                        it2.remove();
                    }
                }
                if (value.isEmpty()) {
                    log.info("cleanExpiredConsumerQueue: {},topic destroyed", key);
                    it.remove();
                }
            }
        }
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public void start() throws Exception {
        this.flushConsumeQueueService.start();
        this.commitLog.start();
        this.storeStatsService.start();
        if (this.scheduleMessageService != null && BrokerRole.SLAVE != this.messageStoreConfig.getBrokerRole()) {
            this.scheduleMessageService.start();
        }
        if (this.reputMessageService != null) {
            this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
            this.reputMessageService.start();
        }
        this.haService.start();
        createTempFile();
        addScheduleTask();
        this.shutdown = false;
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public void shutdown() {
        if (this.shutdown) {
            return;
        }
        this.shutdown = true;
        this.scheduledExecutorService.shutdown();
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            log.error("shutdown Exception, ", e);
        }
        if (this.scheduleMessageService != null) {
            this.scheduleMessageService.shutdown();
        }
        this.haService.shutdown();
        this.storeStatsService.shutdown();
        this.dispatchMessageService.shutdown();
        this.indexService.shutdown();
        this.flushConsumeQueueService.shutdown();
        this.commitLog.shutdown();
        this.allocateMapedFileService.shutdown();
        if (this.reputMessageService != null) {
            this.reputMessageService.shutdown();
        }
        this.storeCheckpoint.flush();
        this.storeCheckpoint.shutdown();
        deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public void destroy() {
        destroyLogics();
        this.commitLog.destroy();
        this.indexService.destroy();
        deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
        deleteFile(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
    }

    public void destroyLogics() {
        Iterator<ConcurrentHashMap<Integer, ConsumeQueue>> it = this.consumeQueueTable.values().iterator();
        while (it.hasNext()) {
            Iterator<ConsumeQueue> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                it2.next().destroy();
            }
        }
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public PutMessageResult putMessage(MessageExtBrokerInner messageExtBrokerInner) {
        if (this.shutdown) {
            log.warn("message store has shutdown, so putMessage is forbidden");
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }
        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
            if (this.printTimes.getAndIncrement() % 50000 == 0) {
                log.warn("message store is slave mode, so putMessage is forbidden ");
            }
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }
        if (!this.runningFlags.isWriteable()) {
            if (this.printTimes.getAndIncrement() % 50000 == 0) {
                log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
            }
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }
        this.printTimes.set(0L);
        if (messageExtBrokerInner.getTopic().length() > 127) {
            log.warn("putMessage message topic length too long " + messageExtBrokerInner.getTopic().length());
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        }
        if (messageExtBrokerInner.getPropertiesString() != null && messageExtBrokerInner.getPropertiesString().length() > 32767) {
            log.warn("putMessage message properties length too long " + messageExtBrokerInner.getPropertiesString().length());
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        }
        long now = getSystemClock().now();
        PutMessageResult putMessage = this.commitLog.putMessage(messageExtBrokerInner);
        long now2 = getSystemClock().now() - now;
        if (now2 > 1000) {
            log.warn("putMessage not in lock eclipse time(ms) " + now2);
        }
        this.storeStatsService.setPutMessageEntireTimeMax(now2);
        this.storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBrokerInner.getTopic()).incrementAndGet();
        if (null == putMessage || !putMessage.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }
        return putMessage;
    }

    public SystemClock getSystemClock() {
        return this.systemClock;
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public GetMessageResult getMessage(String str, String str2, int i, long j, int i2, SubscriptionData subscriptionData) {
        GetMessageStatus getMessageStatus;
        long j2;
        if (this.shutdown) {
            log.warn("message store has shutdown, so getMessage is forbidden");
            return null;
        }
        if (!this.runningFlags.isReadable()) {
            log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
            return null;
        }
        long now = getSystemClock().now();
        GetMessageStatus getMessageStatus2 = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
        long j3 = 0;
        long j4 = 0;
        GetMessageResult getMessageResult = new GetMessageResult();
        long maxOffset = this.commitLog.getMaxOffset();
        ConsumeQueue findConsumeQueue = findConsumeQueue(str2, i);
        if (findConsumeQueue != null) {
            j3 = findConsumeQueue.getMinOffsetInQuque();
            j4 = findConsumeQueue.getMaxOffsetInQuque();
            if (j4 == 0) {
                getMessageStatus = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
                j2 = 0;
            } else if (j < j3) {
                getMessageStatus = GetMessageStatus.OFFSET_TOO_SMALL;
                j2 = j3;
            } else if (j == j4) {
                getMessageStatus = GetMessageStatus.OFFSET_OVERFLOW_ONE;
                j2 = j;
            } else if (j > j4) {
                getMessageStatus = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
                j2 = 0 == j3 ? j3 : j4;
            } else {
                SelectMapedBufferResult indexBuffer = findConsumeQueue.getIndexBuffer(j);
                if (indexBuffer != null) {
                    try {
                        getMessageStatus = GetMessageStatus.NO_MATCHED_MESSAGE;
                        long j5 = Long.MIN_VALUE;
                        long j6 = 0;
                        int i3 = 0;
                        boolean z = false;
                        while (i3 < indexBuffer.getSize() && i3 < 16000) {
                            long j7 = indexBuffer.getByteBuffer().getLong();
                            int i4 = indexBuffer.getByteBuffer().getInt();
                            long j8 = indexBuffer.getByteBuffer().getLong();
                            j6 = j7;
                            if (j5 == Long.MIN_VALUE || j7 >= j5) {
                                if (isTheBatchFull(i4, i2, getMessageResult.getBufferTotalSize(), getMessageResult.getMessageCount(), checkInDiskByCommitOffset(j7, maxOffset))) {
                                    break;
                                }
                                if (this.messageFilter.isMessageMatched(subscriptionData, j8)) {
                                    SelectMapedBufferResult message = this.commitLog.getMessage(j7, i4);
                                    if (message != null) {
                                        this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
                                        getMessageResult.addMessage(message);
                                        getMessageStatus = GetMessageStatus.FOUND;
                                        j5 = Long.MIN_VALUE;
                                        if (z) {
                                            z = true;
                                            this.brokerStatsManager.recordDiskFallBehind(str, str2, i, findConsumeQueue.getMaxPhysicOffset() - j7);
                                        }
                                    } else {
                                        if (getMessageResult.getBufferTotalSize() == 0) {
                                            getMessageStatus = GetMessageStatus.MESSAGE_WAS_REMOVING;
                                        }
                                        j5 = this.commitLog.rollNextFile(j7);
                                    }
                                } else {
                                    if (getMessageResult.getBufferTotalSize() == 0) {
                                        getMessageStatus = GetMessageStatus.NO_MATCHED_MESSAGE;
                                    }
                                    if (log.isDebugEnabled()) {
                                        log.debug("message type not matched, client: " + subscriptionData + " server: " + j8);
                                    }
                                }
                            }
                            i3 += 20;
                        }
                        j2 = j + (i3 / 20);
                        getMessageResult.setSuggestPullingFromSlave(getMaxPhyOffset() - j6 > ((long) (((double) StoreUtil.TotalPhysicalMemorySize) * (((double) this.messageStoreConfig.getAccessMessageInMemoryMaxRatio()) / 100.0d))));
                        indexBuffer.release();
                    } catch (Throwable th) {
                        indexBuffer.release();
                        throw th;
                    }
                } else {
                    getMessageStatus = GetMessageStatus.OFFSET_FOUND_NULL;
                    j2 = findConsumeQueue.rollNextFile(j);
                    log.warn("consumer request topic: " + str2 + "offset: " + j + " minOffset: " + j3 + " maxOffset: " + j4 + ", but access logic queue failed.");
                }
            }
        } else {
            getMessageStatus = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
            j2 = 0;
        }
        if (GetMessageStatus.FOUND == getMessageStatus) {
            this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
        } else {
            this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
        }
        this.storeStatsService.setGetMessageEntireTimeMax(getSystemClock().now() - now);
        getMessageResult.setStatus(getMessageStatus);
        getMessageResult.setNextBeginOffset(j2);
        getMessageResult.setMaxOffset(j4);
        getMessageResult.setMinOffset(j3);
        return getMessageResult;
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public long getMaxOffsetInQuque(String str, int i) {
        ConsumeQueue findConsumeQueue = findConsumeQueue(str, i);
        if (findConsumeQueue != null) {
            return findConsumeQueue.getMaxOffsetInQuque();
        }
        return 0L;
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public long getMinOffsetInQuque(String str, int i) {
        ConsumeQueue findConsumeQueue = findConsumeQueue(str, i);
        if (findConsumeQueue != null) {
            return findConsumeQueue.getMinOffsetInQuque();
        }
        return -1L;
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public long getOffsetInQueueByTime(String str, int i, long j) {
        ConsumeQueue findConsumeQueue = findConsumeQueue(str, i);
        if (findConsumeQueue != null) {
            return findConsumeQueue.getOffsetInQueueByTime(j);
        }
        return 0L;
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public MessageExt lookMessageByOffset(long j) {
        SelectMapedBufferResult message = this.commitLog.getMessage(j, 4);
        if (null == message) {
            return null;
        }
        try {
            MessageExt lookMessageByOffset = lookMessageByOffset(j, message.getByteBuffer().getInt());
            message.release();
            return lookMessageByOffset;
        } catch (Throwable th) {
            message.release();
            throw th;
        }
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public SelectMapedBufferResult selectOneMessageByOffset(long j) {
        SelectMapedBufferResult message = this.commitLog.getMessage(j, 4);
        if (null == message) {
            return null;
        }
        try {
            SelectMapedBufferResult message2 = this.commitLog.getMessage(j, message.getByteBuffer().getInt());
            message.release();
            return message2;
        } catch (Throwable th) {
            message.release();
            throw th;
        }
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public SelectMapedBufferResult selectOneMessageByOffset(long j, int i) {
        return this.commitLog.getMessage(j, i);
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public String getRunningDataInfo() {
        return this.storeStatsService.toString();
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public HashMap<String, String> getRuntimeInfo() {
        HashMap<String, String> runtimeInfo = this.storeStatsService.getRuntimeInfo();
        runtimeInfo.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(UtilAll.getDiskPartitionSpaceUsedPercent(getMessageStoreConfig().getStorePathCommitLog())));
        runtimeInfo.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(UtilAll.getDiskPartitionSpaceUsedPercent(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()))));
        if (this.scheduleMessageService != null) {
            this.scheduleMessageService.buildRunningStats(runtimeInfo);
        }
        runtimeInfo.put(RunningStats.commitLogMinOffset.name(), String.valueOf(getMinPhyOffset()));
        runtimeInfo.put(RunningStats.commitLogMaxOffset.name(), String.valueOf(getMaxPhyOffset()));
        return runtimeInfo;
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public long getMaxPhyOffset() {
        return this.commitLog.getMaxOffset();
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public long getEarliestMessageTime(String str, int i) {
        SelectMapedBufferResult indexBuffer;
        ConsumeQueue findConsumeQueue = findConsumeQueue(str, i);
        if (findConsumeQueue == null || (indexBuffer = findConsumeQueue.getIndexBuffer(findConsumeQueue.getMinLogicOffset() / 20)) == null) {
            return -1L;
        }
        try {
            long pickupStoretimestamp = getCommitLog().pickupStoretimestamp(indexBuffer.getByteBuffer().getLong(), indexBuffer.getByteBuffer().getInt());
            indexBuffer.release();
            return pickupStoretimestamp;
        } catch (Exception e) {
            indexBuffer.release();
            return -1L;
        } catch (Throwable th) {
            indexBuffer.release();
            throw th;
        }
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public long getMessageStoreTimeStamp(String str, int i, long j) {
        SelectMapedBufferResult indexBuffer;
        ConsumeQueue findConsumeQueue = findConsumeQueue(str, i);
        if (findConsumeQueue == null || (indexBuffer = findConsumeQueue.getIndexBuffer(j)) == null) {
            return -1L;
        }
        try {
            long pickupStoretimestamp = getCommitLog().pickupStoretimestamp(indexBuffer.getByteBuffer().getLong(), indexBuffer.getByteBuffer().getInt());
            indexBuffer.release();
            return pickupStoretimestamp;
        } catch (Exception e) {
            indexBuffer.release();
            return -1L;
        } catch (Throwable th) {
            indexBuffer.release();
            throw th;
        }
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public long getMessageTotalInQueue(String str, int i) {
        ConsumeQueue findConsumeQueue = findConsumeQueue(str, i);
        if (findConsumeQueue != null) {
            return findConsumeQueue.getMessageTotalInQueue();
        }
        return -1L;
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public SelectMapedBufferResult getCommitLogData(long j) {
        if (!this.shutdown) {
            return this.commitLog.getData(j);
        }
        log.warn("message store has shutdown, so getPhyQueueData is forbidden");
        return null;
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public boolean appendToCommitLog(long j, byte[] bArr) {
        if (this.shutdown) {
            log.warn("message store has shutdown, so appendToPhyQueue is forbidden");
            return false;
        }
        boolean appendData = this.commitLog.appendData(j, bArr);
        if (appendData) {
            this.reputMessageService.wakeup();
        } else {
            log.error("appendToPhyQueue failed " + j + " " + bArr.length);
        }
        return appendData;
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public void excuteDeleteFilesManualy() {
        this.cleanCommitLogService.excuteDeleteFilesManualy();
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public QueryMessageResult queryMessage(String str, String str2, int i, long j, long j2) {
        QueryMessageResult queryMessageResult = new QueryMessageResult();
        long j3 = j2;
        for (int i2 = 0; i2 < 3; i2++) {
            QueryOffsetResult queryOffset = this.indexService.queryOffset(str, str2, i, j, j3);
            if (queryOffset.getPhyOffsets().isEmpty()) {
                break;
            }
            Collections.sort(queryOffset.getPhyOffsets());
            queryMessageResult.setIndexLastUpdatePhyoffset(queryOffset.getIndexLastUpdatePhyoffset());
            queryMessageResult.setIndexLastUpdateTimestamp(queryOffset.getIndexLastUpdateTimestamp());
            for (int i3 = 0; i3 < queryOffset.getPhyOffsets().size(); i3++) {
                long longValue = queryOffset.getPhyOffsets().get(i3).longValue();
                try {
                    boolean z = true;
                    MessageExt lookMessageByOffset = lookMessageByOffset(longValue);
                    if (0 == i3) {
                        j3 = lookMessageByOffset.getStoreTimestamp();
                    }
                    String[] split = lookMessageByOffset.getKeys().split(" ");
                    if (str.equals(lookMessageByOffset.getTopic())) {
                        int length = split.length;
                        int i4 = 0;
                        while (true) {
                            if (i4 >= length) {
                                break;
                            }
                            if (split[i4].equals(str2)) {
                                z = true;
                                break;
                            }
                            i4++;
                        }
                    }
                    if (z) {
                        SelectMapedBufferResult data = this.commitLog.getData(longValue, false);
                        if (data != null) {
                            int i5 = data.getByteBuffer().getInt(0);
                            data.getByteBuffer().limit(i5);
                            data.setSize(i5);
                            queryMessageResult.addMessage(data);
                        }
                    } else {
                        log.warn("queryMessage hash duplicate, {} {}", str, str2);
                    }
                } catch (Exception e) {
                    log.error("queryMessage exception", e);
                }
            }
            if (queryMessageResult.getBufferTotalSize() > 0 || j3 < j) {
                break;
            }
        }
        return queryMessageResult;
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public void updateHaMasterAddress(String str) {
        this.haService.updateMasterAddress(str);
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public long now() {
        return this.systemClock.now();
    }

    public CommitLog getCommitLog() {
        return this.commitLog;
    }

    public MessageExt lookMessageByOffset(long j, int i) {
        SelectMapedBufferResult message = this.commitLog.getMessage(j, i);
        if (null == message) {
            return null;
        }
        try {
            MessageExt decode = MessageDecoder.decode(message.getByteBuffer(), true, false);
            message.release();
            return decode;
        } catch (Throwable th) {
            message.release();
            throw th;
        }
    }

    public ConsumeQueue findConsumeQueue(String str, int i) {
        ConcurrentHashMap<Integer, ConsumeQueue> concurrentHashMap = this.consumeQueueTable.get(str);
        if (null == concurrentHashMap) {
            ConcurrentHashMap<Integer, ConsumeQueue> concurrentHashMap2 = new ConcurrentHashMap<>(128);
            ConcurrentHashMap<Integer, ConsumeQueue> putIfAbsent = this.consumeQueueTable.putIfAbsent(str, concurrentHashMap2);
            concurrentHashMap = putIfAbsent != null ? putIfAbsent : concurrentHashMap2;
        }
        ConsumeQueue consumeQueue = concurrentHashMap.get(Integer.valueOf(i));
        if (null == consumeQueue) {
            ConsumeQueue consumeQueue2 = new ConsumeQueue(str, i, StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), getMessageStoreConfig().getMapedFileSizeConsumeQueue(), this);
            ConsumeQueue putIfAbsent2 = concurrentHashMap.putIfAbsent(Integer.valueOf(i), consumeQueue2);
            consumeQueue = putIfAbsent2 != null ? putIfAbsent2 : consumeQueue2;
        }
        return consumeQueue;
    }

    private boolean isTheBatchFull(int i, int i2, int i3, int i4, boolean z) {
        if (0 == i3 || 0 == i4) {
            return false;
        }
        if (i4 + 1 >= i2) {
            return true;
        }
        return z ? i3 + i > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk() || i4 + 1 > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk() : i3 + i > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory() || i4 + 1 > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory();
    }

    private void deleteFile(String str) {
        log.info(str + (new File(str).delete() ? " delete OK" : " delete Failed"));
    }

    private void createTempFile() throws IOException {
        String abortFile = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
        File file = new File(abortFile);
        MapedFile.ensureDirOK(file.getParent());
        log.info(abortFile + (file.createNewFile() ? " create OK" : " already exists"));
    }

    private boolean isTempFileExist() {
        return new File(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir())).exists();
    }

    private boolean loadConsumeQueue() {
        File[] listFiles = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir())).listFiles();
        if (listFiles != null) {
            for (File file : listFiles) {
                String name = file.getName();
                File[] listFiles2 = file.listFiles();
                if (listFiles2 != null) {
                    for (File file2 : listFiles2) {
                        int parseInt = Integer.parseInt(file2.getName());
                        ConsumeQueue consumeQueue = new ConsumeQueue(name, parseInt, StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), getMessageStoreConfig().getMapedFileSizeConsumeQueue(), this);
                        putConsumeQueue(name, parseInt, consumeQueue);
                        if (!consumeQueue.load()) {
                            return false;
                        }
                    }
                }
            }
        }
        log.info("load logics queue all over, OK");
        return true;
    }

    public MessageStoreConfig getMessageStoreConfig() {
        return this.messageStoreConfig;
    }

    private void putConsumeQueue(String str, int i, ConsumeQueue consumeQueue) {
        ConcurrentHashMap<Integer, ConsumeQueue> concurrentHashMap = this.consumeQueueTable.get(str);
        if (null != concurrentHashMap) {
            concurrentHashMap.put(Integer.valueOf(i), consumeQueue);
            return;
        }
        ConcurrentHashMap<Integer, ConsumeQueue> concurrentHashMap2 = new ConcurrentHashMap<>();
        concurrentHashMap2.put(Integer.valueOf(i), consumeQueue);
        this.consumeQueueTable.put(str, concurrentHashMap2);
    }

    private void recover(boolean z) {
        recoverConsumeQueue();
        if (z) {
            this.commitLog.recoverNormally();
        } else {
            this.commitLog.recoverAbnormally();
        }
        while (this.dispatchMessageService.hasRemainMessage()) {
            try {
                Thread.sleep(500L);
                log.info("waiting dispatching message over");
            } catch (InterruptedException e) {
            }
        }
        recoverTopicQueueTable();
    }

    private void recoverTopicQueueTable() {
        HashMap<String, Long> hashMap = new HashMap<>(1024);
        long minOffset = this.commitLog.getMinOffset();
        Iterator<ConcurrentHashMap<Integer, ConsumeQueue>> it = this.consumeQueueTable.values().iterator();
        while (it.hasNext()) {
            for (ConsumeQueue consumeQueue : it.next().values()) {
                hashMap.put(consumeQueue.getTopic() + "-" + consumeQueue.getQueueId(), Long.valueOf(consumeQueue.getMaxOffsetInQuque()));
                consumeQueue.correctMinOffset(minOffset);
            }
        }
        this.commitLog.setTopicQueueTable(hashMap);
    }

    private void recoverConsumeQueue() {
        Iterator<ConcurrentHashMap<Integer, ConsumeQueue>> it = this.consumeQueueTable.values().iterator();
        while (it.hasNext()) {
            Iterator<ConsumeQueue> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                it2.next().recover();
            }
        }
    }

    public void putMessagePostionInfo(String str, int i, long j, int i2, long j2, long j3, long j4) {
        findConsumeQueue(str, i).putMessagePostionInfoWrapper(j, i2, j2, j3, j4);
    }

    public void putDispatchRequest(DispatchRequest dispatchRequest) {
        this.dispatchMessageService.putRequest(dispatchRequest);
    }

    public DispatchMessageService getDispatchMessageService() {
        return this.dispatchMessageService;
    }

    public AllocateMapedFileService getAllocateMapedFileService() {
        return this.allocateMapedFileService;
    }

    public StoreStatsService getStoreStatsService() {
        return this.storeStatsService;
    }

    public RunningFlags getAccessRights() {
        return this.runningFlags;
    }

    public ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> getConsumeQueueTable() {
        return this.consumeQueueTable;
    }

    public StoreCheckpoint getStoreCheckpoint() {
        return this.storeCheckpoint;
    }

    public HAService getHaService() {
        return this.haService;
    }

    public ScheduleMessageService getScheduleMessageService() {
        return this.scheduleMessageService;
    }

    public RunningFlags getRunningFlags() {
        return this.runningFlags;
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public long getCommitLogOffsetInQueue(String str, int i, long j) {
        SelectMapedBufferResult indexBuffer;
        ConsumeQueue findConsumeQueue = findConsumeQueue(str, i);
        if (findConsumeQueue == null || (indexBuffer = findConsumeQueue.getIndexBuffer(j)) == null) {
            return 0L;
        }
        try {
            long j2 = indexBuffer.getByteBuffer().getLong();
            indexBuffer.release();
            return j2;
        } catch (Throwable th) {
            indexBuffer.release();
            throw th;
        }
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public long getMinPhyOffset() {
        return this.commitLog.getMinOffset();
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public long slaveFallBehindMuch() {
        return this.commitLog.getMaxOffset() - this.haService.getPush2SlaveMaxOffset().get();
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public int cleanUnusedTopic(Set<String> set) {
        Iterator<Map.Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>> next = it.next();
            String key = next.getKey();
            if (!set.contains(key) && !key.equals(ScheduleMessageService.SCHEDULE_TOPIC)) {
                for (ConsumeQueue consumeQueue : next.getValue().values()) {
                    consumeQueue.destroy();
                    log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", consumeQueue.getTopic(), Integer.valueOf(consumeQueue.getQueueId()));
                    this.commitLog.removeQueurFromTopicQueueTable(consumeQueue.getTopic(), consumeQueue.getQueueId());
                }
                it.remove();
                log.info("cleanUnusedTopic: {},topic destroyed", key);
            }
        }
        return 0;
    }

    /* JADX WARN: Type inference failed for: r0v0, types: [java.util.Map<java.lang.String, java.lang.Long>, java.util.Map, java.util.HashMap, long] */
    @Override // com.alibaba.rocketmq.store.MessageStore
    public Map<String, Long> getMessageIds(String str, int i, long j, long j2, SocketAddress socketAddress) {
        ?? hashMap = new HashMap();
        if (this.shutdown) {
            return hashMap;
        }
        ConsumeQueue findConsumeQueue = findConsumeQueue(str, i);
        if (findConsumeQueue != null) {
            long max = Math.max(j, findConsumeQueue.getMinOffsetInQuque());
            long min = Math.min(j2, findConsumeQueue.getMaxOffsetInQuque());
            if (min == 0) {
                return hashMap;
            }
            long j3 = max;
            while (j3 < min) {
                SelectMapedBufferResult indexBuffer = findConsumeQueue.getIndexBuffer(j3);
                if (indexBuffer == null) {
                    return hashMap;
                }
                for (int i2 = 0; i2 < indexBuffer.getSize(); i2 += 20) {
                    try {
                        j3++;
                        hashMap.put(MessageDecoder.createMessageId(ByteBuffer.allocate(16), MessageExt.SocketAddress2ByteBuffer(socketAddress), indexBuffer.getByteBuffer().getLong()), Long.valueOf((long) hashMap));
                        if (j3 > min) {
                            return hashMap;
                        }
                    } finally {
                        indexBuffer.release();
                    }
                }
                indexBuffer.release();
            }
        }
        return hashMap;
    }

    private boolean checkInDiskByCommitOffset(long j, long j2) {
        return j2 - j > ((long) (((double) StoreUtil.TotalPhysicalMemorySize) * (((double) this.messageStoreConfig.getAccessMessageInMemoryMaxRatio()) / 100.0d)));
    }

    @Override // com.alibaba.rocketmq.store.MessageStore
    public boolean checkInDiskByConsumeOffset(String str, int i, long j) {
        SelectMapedBufferResult indexBuffer;
        long maxOffset = this.commitLog.getMaxOffset();
        ConsumeQueue findConsumeQueue = findConsumeQueue(str, i);
        if (findConsumeQueue == null || (indexBuffer = findConsumeQueue.getIndexBuffer(j)) == null) {
            return false;
        }
        try {
            if (0 >= indexBuffer.getSize()) {
                indexBuffer.release();
                return false;
            }
            int i2 = 0 + 20;
            boolean checkInDiskByCommitOffset = checkInDiskByCommitOffset(indexBuffer.getByteBuffer().getLong(), maxOffset);
            indexBuffer.release();
            return checkInDiskByCommitOffset;
        } catch (Throwable th) {
            indexBuffer.release();
            throw th;
        }
    }

    public BrokerStatsManager getBrokerStatsManager() {
        return this.brokerStatsManager;
    }
}
