/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.rocketmq.store;

import com.alibaba.rocketmq.common.ServiceThread;
import com.alibaba.rocketmq.common.SystemClock;
import com.alibaba.rocketmq.common.ThreadFactoryImpl;
import com.alibaba.rocketmq.common.UtilAll;
import com.alibaba.rocketmq.common.message.MessageDecoder;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
import com.alibaba.rocketmq.common.running.RunningStats;
import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
import com.alibaba.rocketmq.store.AllocateMapedFileService;
import com.alibaba.rocketmq.store.CommitLog;
import com.alibaba.rocketmq.store.ConsumeQueue;
import com.alibaba.rocketmq.store.DefaultMessageFilter;
import com.alibaba.rocketmq.store.DispatchRequest;
import com.alibaba.rocketmq.store.GetMessageResult;
import com.alibaba.rocketmq.store.GetMessageStatus;
import com.alibaba.rocketmq.store.MapedFile;
import com.alibaba.rocketmq.store.MessageExtBrokerInner;
import com.alibaba.rocketmq.store.MessageFilter;
import com.alibaba.rocketmq.store.MessageStore;
import com.alibaba.rocketmq.store.PutMessageResult;
import com.alibaba.rocketmq.store.PutMessageStatus;
import com.alibaba.rocketmq.store.QueryMessageResult;
import com.alibaba.rocketmq.store.RunningFlags;
import com.alibaba.rocketmq.store.SelectMapedBufferResult;
import com.alibaba.rocketmq.store.StoreCheckpoint;
import com.alibaba.rocketmq.store.StoreStatsService;
import com.alibaba.rocketmq.store.StoreUtil;
import com.alibaba.rocketmq.store.config.BrokerRole;
import com.alibaba.rocketmq.store.config.MessageStoreConfig;
import com.alibaba.rocketmq.store.config.StorePathConfigHelper;
import com.alibaba.rocketmq.store.ha.HAService;
import com.alibaba.rocketmq.store.index.IndexService;
import com.alibaba.rocketmq.store.index.QueryOffsetResult;
import com.alibaba.rocketmq.store.schedule.ScheduleMessageService;
import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
import java.io.File;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultMessageStore
implements MessageStore {
    private static final Logger log = LoggerFactory.getLogger((String)"RocketmqStore");
    private final MessageFilter messageFilter = new DefaultMessageFilter();
    private final MessageStoreConfig messageStoreConfig;
    private final CommitLog commitLog;
    private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> consumeQueueTable;
    private final FlushConsumeQueueService flushConsumeQueueService;
    private final CleanCommitLogService cleanCommitLogService;
    private final CleanConsumeQueueService cleanConsumeQueueService;
    private final DispatchMessageService dispatchMessageService;
    private final IndexService indexService;
    private final AllocateMapedFileService allocateMapedFileService;
    private final ReputMessageService reputMessageService;
    private final HAService haService;
    private final ScheduleMessageService scheduleMessageService;
    private final StoreStatsService storeStatsService;
    private final RunningFlags runningFlags = new RunningFlags();
    private final SystemClock systemClock = new SystemClock(1L);
    private volatile boolean shutdown = true;
    private StoreCheckpoint storeCheckpoint;
    private AtomicLong printTimes = new AtomicLong(0L);
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new ThreadFactoryImpl("StoreScheduledThread"));
    private final BrokerStatsManager brokerStatsManager;

    public DefaultMessageStore(MessageStoreConfig messageStoreConfig, BrokerStatsManager brokerStatsManager) throws IOException {
        this.messageStoreConfig = messageStoreConfig;
        this.brokerStatsManager = brokerStatsManager;
        this.allocateMapedFileService = new AllocateMapedFileService();
        this.commitLog = new CommitLog(this);
        this.consumeQueueTable = new ConcurrentHashMap(32);
        this.flushConsumeQueueService = new FlushConsumeQueueService();
        this.cleanCommitLogService = new CleanCommitLogService();
        this.cleanConsumeQueueService = new CleanConsumeQueueService();
        this.dispatchMessageService = new DispatchMessageService(this.messageStoreConfig.getPutMsgIndexHightWater());
        this.storeStatsService = new StoreStatsService();
        this.indexService = new IndexService(this);
        this.haService = new HAService(this);
        switch (this.messageStoreConfig.getBrokerRole()) {
            case SLAVE: {
                this.reputMessageService = new ReputMessageService();
                this.scheduleMessageService = new ScheduleMessageService(this);
                break;
            }
            case ASYNC_MASTER: 
            case SYNC_MASTER: {
                this.reputMessageService = null;
                this.scheduleMessageService = new ScheduleMessageService(this);
                break;
            }
            default: {
                this.reputMessageService = null;
                this.scheduleMessageService = null;
            }
        }
        this.allocateMapedFileService.start();
        this.dispatchMessageService.start();
        this.indexService.start();
    }

    public void truncateDirtyLogicFiles(long phyOffset) {
        ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = this.consumeQueueTable;
        for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
            for (ConsumeQueue logic : maps.values()) {
                logic.truncateDirtyLogicFiles(phyOffset);
            }
        }
    }

    @Override
    public boolean load() {
        boolean result = true;
        try {
            boolean lastExitOK = !this.isTempFileExist();
            log.info("last shutdown {}", (Object)(lastExitOK ? "normally" : "abnormally"));
            if (null != this.scheduleMessageService) {
                result = result && this.scheduleMessageService.load();
            }
            result = result && this.commitLog.load();
            boolean bl = result = result && this.loadConsumeQueue();
            if (result) {
                this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
                this.indexService.load(lastExitOK);
                this.recover(lastExitOK);
                log.info("load over, and the max phy offset = {}", (Object)this.getMaxPhyOffset());
            }
        }
        catch (Exception e) {
            log.error("load exception", (Throwable)e);
            result = false;
        }
        if (!result) {
            this.allocateMapedFileService.shutdown();
        }
        return result;
    }

    private void addScheduleTask() {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                DefaultMessageStore.this.cleanFilesPeriodically();
            }
        }, 60000L, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
    }

    private void cleanFilesPeriodically() {
        this.cleanCommitLogService.run();
        this.cleanConsumeQueueService.run();
    }

    @Override
    public void cleanExpiredConsumerQueue() {
        long minCommitLogOffset = this.commitLog.getMinOffset();
        Iterator<Map.Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>> next = it.next();
            String topic = next.getKey();
            if (topic.equals("SCHEDULE_TOPIC_XXXX")) continue;
            ConcurrentHashMap<Integer, ConsumeQueue> queueTable = next.getValue();
            Iterator<Map.Entry<Integer, ConsumeQueue>> itQT = queueTable.entrySet().iterator();
            while (itQT.hasNext()) {
                Map.Entry<Integer, ConsumeQueue> nextQT = itQT.next();
                long maxCLOffsetInConsumeQueue = nextQT.getValue().getLastOffset();
                if (maxCLOffsetInConsumeQueue == -1L) {
                    log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.", new Object[]{nextQT.getValue().getTopic(), nextQT.getValue().getQueueId(), nextQT.getValue().getMaxPhysicOffset(), nextQT.getValue().getMinLogicOffset()});
                    continue;
                }
                if (maxCLOffsetInConsumeQueue >= minCommitLogOffset) continue;
                log.info("cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", new Object[]{topic, nextQT.getKey(), minCommitLogOffset, maxCLOffsetInConsumeQueue});
                this.commitLog.removeQueurFromTopicQueueTable(nextQT.getValue().getTopic(), nextQT.getValue().getQueueId());
                nextQT.getValue().destroy();
                itQT.remove();
            }
            if (!queueTable.isEmpty()) continue;
            log.info("cleanExpiredConsumerQueue: {},topic destroyed", (Object)topic);
            it.remove();
        }
    }

    @Override
    public void start() throws Exception {
        this.flushConsumeQueueService.start();
        this.commitLog.start();
        this.storeStatsService.start();
        if (this.scheduleMessageService != null && BrokerRole.SLAVE != this.messageStoreConfig.getBrokerRole()) {
            this.scheduleMessageService.start();
        }
        if (this.reputMessageService != null) {
            this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
            this.reputMessageService.start();
        }
        this.haService.start();
        this.createTempFile();
        this.addScheduleTask();
        this.shutdown = false;
    }

    @Override
    public void shutdown() {
        if (!this.shutdown) {
            this.shutdown = true;
            this.scheduledExecutorService.shutdown();
            try {
                Thread.sleep(3000L);
            }
            catch (InterruptedException e) {
                log.error("shutdown Exception, ", (Throwable)e);
            }
            if (this.scheduleMessageService != null) {
                this.scheduleMessageService.shutdown();
            }
            this.haService.shutdown();
            this.storeStatsService.shutdown();
            this.dispatchMessageService.shutdown();
            this.indexService.shutdown();
            this.flushConsumeQueueService.shutdown();
            this.commitLog.shutdown();
            this.allocateMapedFileService.shutdown();
            if (this.reputMessageService != null) {
                this.reputMessageService.shutdown();
            }
            this.storeCheckpoint.flush();
            this.storeCheckpoint.shutdown();
            this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
        }
    }

    @Override
    public void destroy() {
        this.destroyLogics();
        this.commitLog.destroy();
        this.indexService.destroy();
        this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
        this.deleteFile(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
    }

    public void destroyLogics() {
        for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
            for (ConsumeQueue logic : maps.values()) {
                logic.destroy();
            }
        }
    }

    @Override
    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
        if (this.shutdown) {
            log.warn("message store has shutdown, so putMessage is forbidden");
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }
        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
            long value = this.printTimes.getAndIncrement();
            if (value % 50000L == 0L) {
                log.warn("message store is slave mode, so putMessage is forbidden ");
            }
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }
        if (!this.runningFlags.isWriteable()) {
            long value = this.printTimes.getAndIncrement();
            if (value % 50000L == 0L) {
                log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
            }
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }
        this.printTimes.set(0L);
        if (msg.getTopic().length() > 127) {
            log.warn("putMessage message topic length too long " + msg.getTopic().length());
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        }
        if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
            log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        }
        long beginTime = this.getSystemClock().now();
        PutMessageResult result = this.commitLog.putMessage(msg);
        long eclipseTime = this.getSystemClock().now() - beginTime;
        if (eclipseTime > 1000L) {
            log.warn("putMessage not in lock eclipse time(ms) " + eclipseTime);
        }
        this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
        this.storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }
        return result;
    }

    public SystemClock getSystemClock() {
        return this.systemClock;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public GetMessageResult getMessage(String group, String topic, int queueId, long offset, int maxMsgNums, SubscriptionData subscriptionData) {
        if (this.shutdown) {
            log.warn("message store has shutdown, so getMessage is forbidden");
            return null;
        }
        if (!this.runningFlags.isReadable()) {
            log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
            return null;
        }
        long beginTime = this.getSystemClock().now();
        GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
        long nextBeginOffset = offset;
        long minOffset = 0L;
        long maxOffset = 0L;
        GetMessageResult getResult = new GetMessageResult();
        long maxOffsetPy = this.commitLog.getMaxOffset();
        ConsumeQueue consumeQueue = this.findConsumeQueue(topic, queueId);
        if (consumeQueue != null) {
            minOffset = consumeQueue.getMinOffsetInQuque();
            maxOffset = consumeQueue.getMaxOffsetInQuque();
            if (maxOffset == 0L) {
                status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
                nextBeginOffset = 0L;
            } else if (offset < minOffset) {
                status = GetMessageStatus.OFFSET_TOO_SMALL;
                nextBeginOffset = minOffset;
            } else if (offset == maxOffset) {
                status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
                nextBeginOffset = offset;
            } else if (offset > maxOffset) {
                status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
                nextBeginOffset = 0L == minOffset ? minOffset : maxOffset;
            } else {
                SelectMapedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
                if (bufferConsumeQueue != null) {
                    try {
                        long memory;
                        int i;
                        status = GetMessageStatus.NO_MATCHED_MESSAGE;
                        long nextPhyFileStartOffset = Long.MIN_VALUE;
                        long maxPhyOffsetPulling = 0L;
                        int MaxFilterMessageCount = 16000;
                        boolean diskFallRecorded = false;
                        for (i = 0; i < bufferConsumeQueue.getSize() && i < 16000; i += 20) {
                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                            int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
                            long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
                            maxPhyOffsetPulling = offsetPy;
                            if (nextPhyFileStartOffset != Long.MIN_VALUE && offsetPy < nextPhyFileStartOffset) continue;
                            boolean isInDisk = this.checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
                            if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) break;
                            if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {
                                SelectMapedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
                                if (selectResult != null) {
                                    this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
                                    getResult.addMessage(selectResult);
                                    status = GetMessageStatus.FOUND;
                                    nextPhyFileStartOffset = Long.MIN_VALUE;
                                    if (!diskFallRecorded) continue;
                                    diskFallRecorded = true;
                                    long fallBehind = consumeQueue.getMaxPhysicOffset() - offsetPy;
                                    this.brokerStatsManager.recordDiskFallBehind(group, topic, queueId, fallBehind);
                                    continue;
                                }
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                                }
                                nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                                continue;
                            }
                            if (getResult.getBufferTotalSize() == 0) {
                                status = GetMessageStatus.NO_MATCHED_MESSAGE;
                            }
                            if (!log.isDebugEnabled()) continue;
                            log.debug("message type not matched, client: " + subscriptionData + " server: " + tagsCode);
                        }
                        nextBeginOffset = offset + (long)(i / 20);
                        long diff = this.getMaxPhyOffset() - maxPhyOffsetPulling;
                        getResult.setSuggestPullingFromSlave(diff > (memory = (long)((double)StoreUtil.TotalPhysicalMemorySize * ((double)this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0))));
                    }
                    finally {
                        bufferConsumeQueue.release();
                    }
                } else {
                    status = GetMessageStatus.OFFSET_FOUND_NULL;
                    nextBeginOffset = consumeQueue.rollNextFile(offset);
                    log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: " + maxOffset + ", but access logic queue failed.");
                }
            }
        } else {
            status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
            nextBeginOffset = 0L;
        }
        if (GetMessageStatus.FOUND == status) {
            this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
        } else {
            this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
        }
        long eclipseTime = this.getSystemClock().now() - beginTime;
        this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);
        getResult.setStatus(status);
        getResult.setNextBeginOffset(nextBeginOffset);
        getResult.setMaxOffset(maxOffset);
        getResult.setMinOffset(minOffset);
        return getResult;
    }

    @Override
    public long getMaxOffsetInQuque(String topic, int queueId) {
        ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
        if (logic != null) {
            long offset = logic.getMaxOffsetInQuque();
            return offset;
        }
        return 0L;
    }

    @Override
    public long getMinOffsetInQuque(String topic, int queueId) {
        ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
        if (logic != null) {
            return logic.getMinOffsetInQuque();
        }
        return -1L;
    }

    @Override
    public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
        ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
        if (logic != null) {
            return logic.getOffsetInQueueByTime(timestamp);
        }
        return 0L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MessageExt lookMessageByOffset(long commitLogOffset) {
        SelectMapedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);
        if (null != sbr) {
            try {
                int size = sbr.getByteBuffer().getInt();
                MessageExt messageExt = this.lookMessageByOffset(commitLogOffset, size);
                return messageExt;
            }
            finally {
                sbr.release();
            }
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public SelectMapedBufferResult selectOneMessageByOffset(long commitLogOffset) {
        SelectMapedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);
        if (null != sbr) {
            try {
                int size = sbr.getByteBuffer().getInt();
                SelectMapedBufferResult selectMapedBufferResult = this.commitLog.getMessage(commitLogOffset, size);
                return selectMapedBufferResult;
            }
            finally {
                sbr.release();
            }
        }
        return null;
    }

    @Override
    public SelectMapedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) {
        return this.commitLog.getMessage(commitLogOffset, msgSize);
    }

    @Override
    public String getRunningDataInfo() {
        return this.storeStatsService.toString();
    }

    @Override
    public HashMap<String, String> getRuntimeInfo() {
        HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();
        String storePathPhysic = this.getMessageStoreConfig().getStorePathCommitLog();
        double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent((String)storePathPhysic);
        result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));
        String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
        double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent((String)storePathLogics);
        result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
        if (this.scheduleMessageService != null) {
            this.scheduleMessageService.buildRunningStats(result);
        }
        result.put(RunningStats.commitLogMinOffset.name(), String.valueOf(this.getMinPhyOffset()));
        result.put(RunningStats.commitLogMaxOffset.name(), String.valueOf(this.getMaxPhyOffset()));
        return result;
    }

    @Override
    public long getMaxPhyOffset() {
        return this.commitLog.getMaxOffset();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public long getEarliestMessageTime(String topic, int queueId) {
        long minLogicOffset;
        SelectMapedBufferResult result;
        ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId);
        if (logicQueue != null && (result = logicQueue.getIndexBuffer((minLogicOffset = logicQueue.getMinLogicOffset()) / 20L)) != null) {
            try {
                long storeTime;
                long phyOffset = result.getByteBuffer().getLong();
                int size = result.getByteBuffer().getInt();
                long l = storeTime = this.getCommitLog().pickupStoretimestamp(phyOffset, size);
                return l;
            }
            catch (Exception e) {
            }
            finally {
                result.release();
            }
        }
        return -1L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public long getMessageStoreTimeStamp(String topic, int queueId, long offset) {
        SelectMapedBufferResult result;
        ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId);
        if (logicQueue != null && (result = logicQueue.getIndexBuffer(offset)) != null) {
            try {
                long storeTime;
                long phyOffset = result.getByteBuffer().getLong();
                int size = result.getByteBuffer().getInt();
                long l = storeTime = this.getCommitLog().pickupStoretimestamp(phyOffset, size);
                return l;
            }
            catch (Exception e) {
            }
            finally {
                result.release();
            }
        }
        return -1L;
    }

    @Override
    public long getMessageTotalInQueue(String topic, int queueId) {
        ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId);
        if (logicQueue != null) {
            return logicQueue.getMessageTotalInQueue();
        }
        return -1L;
    }

    @Override
    public SelectMapedBufferResult getCommitLogData(long offset) {
        if (this.shutdown) {
            log.warn("message store has shutdown, so getPhyQueueData is forbidden");
            return null;
        }
        return this.commitLog.getData(offset);
    }

    @Override
    public boolean appendToCommitLog(long startOffset, byte[] data) {
        if (this.shutdown) {
            log.warn("message store has shutdown, so appendToPhyQueue is forbidden");
            return false;
        }
        boolean result = this.commitLog.appendData(startOffset, data);
        if (result) {
            this.reputMessageService.wakeup();
        } else {
            log.error("appendToPhyQueue failed " + startOffset + " " + data.length);
        }
        return result;
    }

    @Override
    public void excuteDeleteFilesManualy() {
        this.cleanCommitLogService.excuteDeleteFilesManualy();
    }

    @Override
    public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
        QueryOffsetResult queryOffsetResult;
        QueryMessageResult queryMessageResult = new QueryMessageResult();
        long lastQueryMsgTime = end;
        for (int i = 0; i < 3 && !(queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime)).getPhyOffsets().isEmpty(); ++i) {
            Collections.sort(queryOffsetResult.getPhyOffsets());
            queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset());
            queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp());
            for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); ++m) {
                long offset = queryOffsetResult.getPhyOffsets().get(m);
                try {
                    boolean match = true;
                    MessageExt msg = this.lookMessageByOffset(offset);
                    if (0 == m) {
                        lastQueryMsgTime = msg.getStoreTimestamp();
                    }
                    String[] keyArray = msg.getKeys().split(" ");
                    if (topic.equals(msg.getTopic())) {
                        for (String k : keyArray) {
                            if (!k.equals(key)) continue;
                            match = true;
                            break;
                        }
                    }
                    if (match) {
                        SelectMapedBufferResult result = this.commitLog.getData(offset, false);
                        if (result == null) continue;
                        int size = result.getByteBuffer().getInt(0);
                        result.getByteBuffer().limit(size);
                        result.setSize(size);
                        queryMessageResult.addMessage(result);
                        continue;
                    }
                    log.warn("queryMessage hash duplicate, {} {}", (Object)topic, (Object)key);
                    continue;
                }
                catch (Exception e) {
                    log.error("queryMessage exception", (Throwable)e);
                }
            }
            if (queryMessageResult.getBufferTotalSize() > 0 || lastQueryMsgTime < begin) break;
        }
        return queryMessageResult;
    }

    @Override
    public void updateHaMasterAddress(String newAddr) {
        this.haService.updateMasterAddress(newAddr);
    }

    @Override
    public long now() {
        return this.systemClock.now();
    }

    public CommitLog getCommitLog() {
        return this.commitLog;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public MessageExt lookMessageByOffset(long commitLogOffset, int size) {
        SelectMapedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, size);
        if (null != sbr) {
            try {
                MessageExt messageExt = MessageDecoder.decode((ByteBuffer)sbr.getByteBuffer(), (boolean)true, (boolean)false);
                return messageExt;
            }
            finally {
                sbr.release();
            }
        }
        return null;
    }

    public ConsumeQueue findConsumeQueue(String topic, int queueId) {
        ConsumeQueue logic;
        ConcurrentHashMap<Integer, ConsumeQueue> map = this.consumeQueueTable.get(topic);
        if (null == map) {
            ConcurrentHashMap newMap = new ConcurrentHashMap(128);
            ConcurrentHashMap oldMap = this.consumeQueueTable.putIfAbsent(topic, newMap);
            map = oldMap != null ? oldMap : newMap;
        }
        if (null == (logic = map.get(queueId))) {
            ConsumeQueue newLogic = new ConsumeQueue(topic, queueId, StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), this);
            ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
            logic = oldLogic != null ? oldLogic : newLogic;
        }
        return logic;
    }

    private boolean isTheBatchFull(int sizePy, int maxMsgNums, int bufferTotal, int messageTotal, boolean isInDisk) {
        if (0 == bufferTotal || 0 == messageTotal) {
            return false;
        }
        if (messageTotal + 1 >= maxMsgNums) {
            return true;
        }
        if (isInDisk) {
            if (bufferTotal + sizePy > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {
                return true;
            }
            if (messageTotal + 1 > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk()) {
                return true;
            }
        } else {
            if (bufferTotal + sizePy > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
                return true;
            }
            if (messageTotal + 1 > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory()) {
                return true;
            }
        }
        return false;
    }

    private void deleteFile(String fileName) {
        File file = new File(fileName);
        boolean result = file.delete();
        log.info(fileName + (result ? " delete OK" : " delete Failed"));
    }

    private void createTempFile() throws IOException {
        String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
        File file = new File(fileName);
        MapedFile.ensureDirOK(file.getParent());
        boolean result = file.createNewFile();
        log.info(fileName + (result ? " create OK" : " already exists"));
    }

    private boolean isTempFileExist() {
        String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
        File file = new File(fileName);
        return file.exists();
    }

    private boolean loadConsumeQueue() {
        File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
        File[] fileTopicList = dirLogic.listFiles();
        if (fileTopicList != null) {
            for (File fileTopic : fileTopicList) {
                String topic = fileTopic.getName();
                File[] fileQueueIdList = fileTopic.listFiles();
                if (fileQueueIdList == null) continue;
                for (File fileQueueId : fileQueueIdList) {
                    int queueId = Integer.parseInt(fileQueueId.getName());
                    ConsumeQueue logic = new ConsumeQueue(topic, queueId, StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), this);
                    this.putConsumeQueue(topic, queueId, logic);
                    if (logic.load()) continue;
                    return false;
                }
            }
        }
        log.info("load logics queue all over, OK");
        return true;
    }

    public MessageStoreConfig getMessageStoreConfig() {
        return this.messageStoreConfig;
    }

    private void putConsumeQueue(String topic, int queueId, ConsumeQueue consumeQueue) {
        ConcurrentHashMap<Integer, ConsumeQueue> map = this.consumeQueueTable.get(topic);
        if (null == map) {
            map = new ConcurrentHashMap();
            map.put(queueId, consumeQueue);
            this.consumeQueueTable.put(topic, map);
        } else {
            map.put(queueId, consumeQueue);
        }
    }

    private void recover(boolean lastExitOK) {
        this.recoverConsumeQueue();
        if (lastExitOK) {
            this.commitLog.recoverNormally();
        } else {
            this.commitLog.recoverAbnormally();
        }
        while (this.dispatchMessageService.hasRemainMessage()) {
            try {
                Thread.sleep(500L);
                log.info("waiting dispatching message over");
            }
            catch (InterruptedException interruptedException) {}
        }
        this.recoverTopicQueueTable();
    }

    private void recoverTopicQueueTable() {
        HashMap<String, Long> table = new HashMap<String, Long>(1024);
        long minPhyOffset = this.commitLog.getMinOffset();
        for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
            for (ConsumeQueue logic : maps.values()) {
                String key = logic.getTopic() + "-" + logic.getQueueId();
                table.put(key, logic.getMaxOffsetInQuque());
                logic.correctMinOffset(minPhyOffset);
            }
        }
        this.commitLog.setTopicQueueTable(table);
    }

    private void recoverConsumeQueue() {
        for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
            for (ConsumeQueue logic : maps.values()) {
                logic.recover();
            }
        }
    }

    public void putMessagePostionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp, long logicOffset) {
        ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
        cq.putMessagePostionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
    }

    public void putDispatchRequest(DispatchRequest dispatchRequest) {
        this.dispatchMessageService.putRequest(dispatchRequest);
    }

    public DispatchMessageService getDispatchMessageService() {
        return this.dispatchMessageService;
    }

    public AllocateMapedFileService getAllocateMapedFileService() {
        return this.allocateMapedFileService;
    }

    public StoreStatsService getStoreStatsService() {
        return this.storeStatsService;
    }

    public RunningFlags getAccessRights() {
        return this.runningFlags;
    }

    public ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> getConsumeQueueTable() {
        return this.consumeQueueTable;
    }

    public StoreCheckpoint getStoreCheckpoint() {
        return this.storeCheckpoint;
    }

    public HAService getHaService() {
        return this.haService;
    }

    public ScheduleMessageService getScheduleMessageService() {
        return this.scheduleMessageService;
    }

    public RunningFlags getRunningFlags() {
        return this.runningFlags;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public long getCommitLogOffsetInQueue(String topic, int queueId, long cqOffset) {
        SelectMapedBufferResult bufferConsumeQueue;
        ConsumeQueue consumeQueue = this.findConsumeQueue(topic, queueId);
        if (consumeQueue != null && (bufferConsumeQueue = consumeQueue.getIndexBuffer(cqOffset)) != null) {
            try {
                long offsetPy;
                long l = offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                return l;
            }
            finally {
                bufferConsumeQueue.release();
            }
        }
        return 0L;
    }

    @Override
    public long getMinPhyOffset() {
        return this.commitLog.getMinOffset();
    }

    @Override
    public long slaveFallBehindMuch() {
        return this.commitLog.getMaxOffset() - this.haService.getPush2SlaveMaxOffset().get();
    }

    @Override
    public int cleanUnusedTopic(Set<String> topics) {
        Iterator<Map.Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>> next = it.next();
            String topic = next.getKey();
            if (topics.contains(topic) || topic.equals("SCHEDULE_TOPIC_XXXX")) continue;
            ConcurrentHashMap<Integer, ConsumeQueue> queueTable = next.getValue();
            for (ConsumeQueue cq : queueTable.values()) {
                cq.destroy();
                log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", (Object)cq.getTopic(), (Object)cq.getQueueId());
                this.commitLog.removeQueurFromTopicQueueTable(cq.getTopic(), cq.getQueueId());
            }
            it.remove();
            log.info("cleanUnusedTopic: {},topic destroyed", (Object)topic);
        }
        return 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<String, Long> getMessageIds(String topic, int queueId, long minOffset, long maxOffset, SocketAddress storeHost) {
        HashMap<String, Long> messageIds = new HashMap<String, Long>();
        if (this.shutdown) {
            return messageIds;
        }
        ConsumeQueue consumeQueue = this.findConsumeQueue(topic, queueId);
        if (consumeQueue != null) {
            minOffset = Math.max(minOffset, consumeQueue.getMinOffsetInQuque());
            if ((maxOffset = Math.min(maxOffset, consumeQueue.getMaxOffsetInQuque())) == 0L) {
                return messageIds;
            }
            long nextOffset = minOffset;
            while (nextOffset < maxOffset) {
                SelectMapedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(nextOffset);
                if (bufferConsumeQueue != null) {
                    try {
                        for (int i = 0; i < bufferConsumeQueue.getSize(); i += 20) {
                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                            ByteBuffer msgIdMemory = ByteBuffer.allocate(16);
                            String msgId = MessageDecoder.createMessageId((ByteBuffer)msgIdMemory, (ByteBuffer)MessageExt.SocketAddress2ByteBuffer((SocketAddress)storeHost), (long)offsetPy);
                            messageIds.put(msgId, nextOffset++);
                            if (nextOffset <= maxOffset) continue;
                            HashMap<String, Long> hashMap = messageIds;
                            return hashMap;
                        }
                        continue;
                    }
                    finally {
                        bufferConsumeQueue.release();
                        continue;
                    }
                }
                return messageIds;
            }
        }
        return messageIds;
    }

    private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) {
        long memory = (long)((double)StoreUtil.TotalPhysicalMemorySize * ((double)this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
        return maxOffsetPy - offsetPy > memory;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean checkInDiskByConsumeOffset(String topic, int queueId, long consumeOffset) {
        block6: {
            long maxOffsetPy = this.commitLog.getMaxOffset();
            ConsumeQueue consumeQueue = this.findConsumeQueue(topic, queueId);
            if (consumeQueue != null) {
                SelectMapedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(consumeOffset);
                if (bufferConsumeQueue != null) {
                    try {
                        int i = 0;
                        if (i < bufferConsumeQueue.getSize()) {
                            i += 20;
                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                            boolean bl = this.checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
                            return bl;
                        }
                        break block6;
                    }
                    finally {
                        bufferConsumeQueue.release();
                    }
                }
                return false;
            }
        }
        return false;
    }

    public BrokerStatsManager getBrokerStatsManager() {
        return this.brokerStatsManager;
    }

    class ReputMessageService
    extends ServiceThread {
        private volatile long reputFromOffset = 0L;

        ReputMessageService() {
        }

        public long getReputFromOffset() {
            return this.reputFromOffset;
        }

        public void setReputFromOffset(long reputFromOffset) {
            this.reputFromOffset = reputFromOffset;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doReput() {
            boolean doNext = true;
            while (doNext) {
                SelectMapedBufferResult result = DefaultMessageStore.this.commitLog.getData(this.reputFromOffset);
                if (result != null) {
                    try {
                        this.reputFromOffset = result.getStartOffset();
                        int readSize = 0;
                        while (readSize < result.getSize() && doNext) {
                            DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                            int size = dispatchRequest.getMsgSize();
                            if (size > 0) {
                                DefaultMessageStore.this.putDispatchRequest(dispatchRequest);
                                this.reputFromOffset += (long)size;
                                readSize += size;
                                DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).addAndGet(dispatchRequest.getMsgSize());
                                continue;
                            }
                            if (size == -1) {
                                doNext = false;
                                continue;
                            }
                            if (size != 0) continue;
                            this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                            readSize = result.getSize();
                        }
                        continue;
                    }
                    finally {
                        result.release();
                        continue;
                    }
                }
                doNext = false;
            }
        }

        public void run() {
            log.info(this.getServiceName() + " service started");
            while (!this.isStoped()) {
                try {
                    this.waitForRunning(1000L);
                    this.doReput();
                }
                catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", (Throwable)e);
                }
            }
            log.info(this.getServiceName() + " service end");
        }

        public String getServiceName() {
            return ReputMessageService.class.getSimpleName();
        }
    }

    class DispatchMessageService
    extends ServiceThread {
        private volatile List<DispatchRequest> requestsWrite;
        private volatile List<DispatchRequest> requestsRead;

        public DispatchMessageService(int putMsgIndexHightWater) {
            putMsgIndexHightWater = (int)((double)putMsgIndexHightWater * 1.5);
            this.requestsWrite = new ArrayList<DispatchRequest>(putMsgIndexHightWater);
            this.requestsRead = new ArrayList<DispatchRequest>(putMsgIndexHightWater);
        }

        public boolean hasRemainMessage() {
            List<DispatchRequest> reqs = this.requestsWrite;
            if (reqs != null && !reqs.isEmpty()) {
                return true;
            }
            reqs = this.requestsRead;
            return reqs != null && !reqs.isEmpty();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void putRequest(DispatchRequest dispatchRequest) {
            int requestsWriteSize = 0;
            int putMsgIndexHightWater = DefaultMessageStore.this.getMessageStoreConfig().getPutMsgIndexHightWater();
            DispatchMessageService dispatchMessageService = this;
            synchronized (dispatchMessageService) {
                this.requestsWrite.add(dispatchRequest);
                requestsWriteSize = this.requestsWrite.size();
                if (!this.hasNotified) {
                    this.hasNotified = true;
                    ((Object)((Object)this)).notify();
                }
            }
            DefaultMessageStore.this.getStoreStatsService().setDispatchMaxBuffer(requestsWriteSize);
            if (requestsWriteSize > putMsgIndexHightWater) {
                try {
                    if (log.isDebugEnabled()) {
                        log.debug("Message index buffer size " + requestsWriteSize + " > high water " + putMsgIndexHightWater);
                    }
                    Thread.sleep(1L);
                }
                catch (InterruptedException e) {
                    // empty catch block
                }
            }
        }

        private void swapRequests() {
            List<DispatchRequest> tmp = this.requestsWrite;
            this.requestsWrite = this.requestsRead;
            this.requestsRead = tmp;
        }

        private void doDispatch() {
            if (!this.requestsRead.isEmpty()) {
                for (DispatchRequest req : this.requestsRead) {
                    int tranType = MessageSysFlag.getTransactionValue((int)req.getSysFlag());
                    switch (tranType) {
                        case 0: 
                        case 8: {
                            DefaultMessageStore.this.putMessagePostionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(), req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
                            break;
                        }
                    }
                }
                if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {
                    DefaultMessageStore.this.indexService.putRequest(this.requestsRead.toArray());
                }
                this.requestsRead.clear();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            log.info(this.getServiceName() + " service started");
            while (!this.isStoped()) {
                try {
                    this.waitForRunning(0L);
                    this.doDispatch();
                }
                catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", (Throwable)e);
                }
            }
            try {
                Thread.sleep(5000L);
            }
            catch (InterruptedException e) {
                log.warn("DispatchMessageService Exception, ", (Throwable)e);
            }
            DispatchMessageService dispatchMessageService = this;
            synchronized (dispatchMessageService) {
                this.swapRequests();
            }
            this.doDispatch();
            log.info(this.getServiceName() + " service end");
        }

        protected void onWaitEnd() {
            this.swapRequests();
        }

        public String getServiceName() {
            return DispatchMessageService.class.getSimpleName();
        }
    }

    class FlushConsumeQueueService
    extends ServiceThread {
        private static final int RetryTimesOver = 3;
        private long lastFlushTimestamp = 0L;

        FlushConsumeQueueService() {
        }

        private void doFlush(int retryTimes) {
            int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
            if (retryTimes == 3) {
                flushConsumeQueueLeastPages = 0;
            }
            long logicsMsgTimestamp = 0L;
            int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis >= this.lastFlushTimestamp + (long)flushConsumeQueueThoroughInterval) {
                this.lastFlushTimestamp = currentTimeMillis;
                flushConsumeQueueLeastPages = 0;
                logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
            }
            ConcurrentHashMap tables = DefaultMessageStore.this.consumeQueueTable;
            for (ConcurrentHashMap maps : tables.values()) {
                for (ConsumeQueue cq : maps.values()) {
                    boolean result = false;
                    for (int i = 0; i < retryTimes && !result; ++i) {
                        result = cq.commit(flushConsumeQueueLeastPages);
                    }
                }
            }
            if (0 == flushConsumeQueueLeastPages) {
                if (logicsMsgTimestamp > 0L) {
                    DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
                }
                DefaultMessageStore.this.getStoreCheckpoint().flush();
            }
        }

        public void run() {
            log.info(this.getServiceName() + " service started");
            while (!this.isStoped()) {
                try {
                    int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();
                    this.waitForRunning(interval);
                    this.doFlush(1);
                }
                catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", (Throwable)e);
                }
            }
            this.doFlush(3);
            log.info(this.getServiceName() + " service end");
        }

        public String getServiceName() {
            return FlushConsumeQueueService.class.getSimpleName();
        }

        public long getJointime() {
            return 60000L;
        }
    }

    class CleanConsumeQueueService {
        private long lastPhysicalMinOffset = 0L;

        CleanConsumeQueueService() {
        }

        private void deleteExpiredFiles() {
            int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
            long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
            if (minOffset > this.lastPhysicalMinOffset) {
                this.lastPhysicalMinOffset = minOffset;
                ConcurrentHashMap tables = DefaultMessageStore.this.consumeQueueTable;
                for (ConcurrentHashMap maps : tables.values()) {
                    for (ConsumeQueue logic : maps.values()) {
                        int deleteCount = logic.deleteExpiredFile(minOffset);
                        if (deleteCount <= 0 || deleteLogicsFilesInterval <= 0) continue;
                        try {
                            Thread.sleep(deleteLogicsFilesInterval);
                        }
                        catch (InterruptedException e) {}
                    }
                }
                DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
            }
        }

        public void run() {
            try {
                this.deleteExpiredFiles();
            }
            catch (Exception e) {
                log.warn(this.getServiceName() + " service has exception. ", (Throwable)e);
            }
        }

        public String getServiceName() {
            return CleanConsumeQueueService.class.getSimpleName();
        }
    }

    class CleanCommitLogService {
        private static final int MaxManualDeleteFileTimes = 20;
        private final double DiskSpaceWarningLevelRatio = Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90"));
        private final double DiskSpaceCleanForciblyRatio = Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85"));
        private long lastRedeleteTimestamp = 0L;
        private volatile int manualDeleteFileSeveralTimes = 0;
        private volatile boolean cleanImmediately = false;

        CleanCommitLogService() {
        }

        public void excuteDeleteFilesManualy() {
            this.manualDeleteFileSeveralTimes = 20;
            log.info("excuteDeleteFilesManualy was invoked");
        }

        public void run() {
            try {
                this.deleteExpiredFiles();
                this.redeleteHangedFile();
            }
            catch (Exception e) {
                log.warn(this.getServiceName() + " service has exception. ", (Throwable)e);
            }
        }

        public String getServiceName() {
            return CleanCommitLogService.class.getSimpleName();
        }

        private void redeleteHangedFile() {
            int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval();
            long currentTimestamp = System.currentTimeMillis();
            if (currentTimestamp - this.lastRedeleteTimestamp > (long)interval) {
                this.lastRedeleteTimestamp = currentTimestamp;
                int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
                if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) {
                    // empty if block
                }
            }
        }

        private void deleteExpiredFiles() {
            boolean manualDelete;
            int deleteCount = 0;
            long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
            int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
            int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
            boolean timeup = this.isTimeToDelete();
            boolean spacefull = this.isSpaceToDelete();
            boolean bl = manualDelete = this.manualDeleteFileSeveralTimes > 0;
            if (timeup || spacefull || manualDelete) {
                if (manualDelete) {
                    --this.manualDeleteFileSeveralTimes;
                }
                boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
                log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", new Object[]{fileReservedTime, timeup, spacefull, this.manualDeleteFileSeveralTimes, cleanAtOnce});
                deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime *= 3600000L, deletePhysicFilesInterval, destroyMapedFileIntervalForcibly, cleanAtOnce);
                if (deleteCount <= 0 && spacefull) {
                    log.warn("disk space will be full soon, but delete file failed.");
                }
            }
        }

        private boolean isSpaceToDelete() {
            boolean diskok;
            double ratio = (double)DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
            this.cleanImmediately = false;
            String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
            double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent((String)storePathPhysic);
            if (physicRatio > this.DiskSpaceWarningLevelRatio) {
                diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
                if (diskok) {
                    log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
                    System.gc();
                }
                this.cleanImmediately = true;
            } else if (physicRatio > this.DiskSpaceCleanForciblyRatio) {
                this.cleanImmediately = true;
            } else {
                diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
                if (!diskok) {
                    log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
                }
            }
            if (physicRatio < 0.0 || physicRatio > ratio) {
                log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
                return true;
            }
            String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
            double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent((String)storePathLogics);
            if (logicsRatio > this.DiskSpaceWarningLevelRatio) {
                diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
                if (diskok) {
                    log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full");
                    System.gc();
                }
                this.cleanImmediately = true;
            } else if (logicsRatio > this.DiskSpaceCleanForciblyRatio) {
                this.cleanImmediately = true;
            } else {
                diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
                if (!diskok) {
                    log.info("logics disk space OK " + logicsRatio + ", so mark disk ok");
                }
            }
            if (logicsRatio < 0.0 || logicsRatio > ratio) {
                log.info("logics disk maybe full soon, so reclaim space, " + logicsRatio);
                return true;
            }
            return false;
        }

        private boolean isTimeToDelete() {
            String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
            if (UtilAll.isItTimeToDo((String)when)) {
                log.info("it's time to reclaim disk space, " + when);
                return true;
            }
            return false;
        }

        public int getManualDeleteFileSeveralTimes() {
            return this.manualDeleteFileSeveralTimes;
        }

        public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) {
            this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes;
        }
    }
}

