/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.rocketmq.store;

import com.alibaba.rocketmq.common.ServiceThread;
import com.alibaba.rocketmq.common.UtilAll;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageAccessor;
import com.alibaba.rocketmq.common.message.MessageDecoder;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
import com.alibaba.rocketmq.store.AppendMessageCallback;
import com.alibaba.rocketmq.store.AppendMessageResult;
import com.alibaba.rocketmq.store.AppendMessageStatus;
import com.alibaba.rocketmq.store.DefaultMessageStore;
import com.alibaba.rocketmq.store.DispatchRequest;
import com.alibaba.rocketmq.store.MapedFile;
import com.alibaba.rocketmq.store.MapedFileQueue;
import com.alibaba.rocketmq.store.MessageExtBrokerInner;
import com.alibaba.rocketmq.store.PutMessageResult;
import com.alibaba.rocketmq.store.PutMessageStatus;
import com.alibaba.rocketmq.store.SelectMapedBufferResult;
import com.alibaba.rocketmq.store.StoreStatsService;
import com.alibaba.rocketmq.store.config.BrokerRole;
import com.alibaba.rocketmq.store.config.FlushDiskType;
import com.alibaba.rocketmq.store.ha.HAService;
import com.alibaba.rocketmq.store.schedule.ScheduleMessageService;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitLog {
    private static final Logger log = LoggerFactory.getLogger((String)"RocketmqStore");
    public static final int MessageMagicCode = -626843481;
    private static final int BlankMagicCode = -875286124;
    private final MapedFileQueue mapedFileQueue;
    private final DefaultMessageStore defaultMessageStore;
    private final FlushCommitLogService flushCommitLogService;
    private final AppendMessageCallback appendMessageCallback;
    private HashMap<String, Long> topicQueueTable = new HashMap(1024);

    public CommitLog(DefaultMessageStore defaultMessageStore) {
        this.mapedFileQueue = new MapedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMapedFileService());
        this.defaultMessageStore = defaultMessageStore;
        this.flushCommitLogService = FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType() ? new GroupCommitService() : new FlushRealTimeService();
        this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
    }

    public boolean load() {
        boolean result = this.mapedFileQueue.load();
        log.info("load commit log " + (result ? "OK" : "Failed"));
        return result;
    }

    public void start() {
        this.flushCommitLogService.start();
    }

    public void shutdown() {
        this.flushCommitLogService.shutdown();
    }

    public long getMinOffset() {
        MapedFile mapedFile = this.mapedFileQueue.getFirstMapedFileOnLock();
        if (mapedFile != null) {
            if (mapedFile.isAvailable()) {
                return mapedFile.getFileFromOffset();
            }
            return this.rollNextFile(mapedFile.getFileFromOffset());
        }
        return -1L;
    }

    public long rollNextFile(long offset) {
        int mapedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
        return offset + (long)mapedFileSize - offset % (long)mapedFileSize;
    }

    public long getMaxOffset() {
        return this.mapedFileQueue.getMaxOffset();
    }

    public int deleteExpiredFile(long expiredTime, int deleteFilesInterval, long intervalForcibly, boolean cleanImmediately) {
        return this.mapedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
    }

    public SelectMapedBufferResult getData(long offset) {
        return this.getData(offset, 0L == offset);
    }

    public SelectMapedBufferResult getData(long offset, boolean returnFirstOnNotFound) {
        int mapedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
        MapedFile mapedFile = this.mapedFileQueue.findMapedFileByOffset(offset, returnFirstOnNotFound);
        if (mapedFile != null) {
            int pos = (int)(offset % (long)mapedFileSize);
            SelectMapedBufferResult result = mapedFile.selectMapedBuffer(pos);
            return result;
        }
        return null;
    }

    public void recoverNormally() {
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        List<MapedFile> mapedFiles = this.mapedFileQueue.getMapedFiles();
        if (!mapedFiles.isEmpty()) {
            int index = mapedFiles.size() - 3;
            if (index < 0) {
                index = 0;
            }
            MapedFile mapedFile = mapedFiles.get(index);
            ByteBuffer byteBuffer = mapedFile.sliceByteBuffer();
            long processOffset = mapedFile.getFileFromOffset();
            long mapedFileOffset = 0L;
            while (true) {
                DispatchRequest dispatchRequest;
                int size;
                if ((size = (dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover)).getMsgSize()) > 0) {
                    mapedFileOffset += (long)size;
                    continue;
                }
                if (size == -1) {
                    log.info("recover physics file end, " + mapedFile.getFileName());
                    break;
                }
                if (size != 0) continue;
                if (++index >= mapedFiles.size()) {
                    log.info("recover last 3 physics file over, last maped file " + mapedFile.getFileName());
                    break;
                }
                mapedFile = mapedFiles.get(index);
                byteBuffer = mapedFile.sliceByteBuffer();
                processOffset = mapedFile.getFileFromOffset();
                mapedFileOffset = 0L;
                log.info("recover next physics file, " + mapedFile.getFileName());
            }
            this.mapedFileQueue.setCommittedWhere(processOffset += mapedFileOffset);
            this.mapedFileQueue.truncateDirtyFiles(processOffset);
        }
    }

    public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, boolean checkCRC) {
        return this.checkMessageAndReturnSize(byteBuffer, checkCRC, true);
    }

    public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, boolean checkCRC, boolean readBody) {
        try {
            ByteBuffer byteBufferMessage = ((DefaultAppendMessageCallback)this.appendMessageCallback).getMsgStoreItemMemory();
            byte[] bytesContent = byteBufferMessage.array();
            int totalSize = byteBuffer.getInt();
            int magicCode = byteBuffer.getInt();
            switch (magicCode) {
                case -626843481: {
                    break;
                }
                case -875286124: {
                    return new DispatchRequest(0);
                }
                default: {
                    log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode));
                    return new DispatchRequest(-1);
                }
            }
            int bodyCRC = byteBuffer.getInt();
            int queueId = byteBuffer.getInt();
            int flag = byteBuffer.getInt();
            flag += 0;
            long queueOffset = byteBuffer.getLong();
            long physicOffset = byteBuffer.getLong();
            int sysFlag = byteBuffer.getInt();
            long bornTimeStamp = byteBuffer.getLong();
            bornTimeStamp += 0L;
            byteBuffer.get(bytesContent, 0, 8);
            long storeTimestamp = byteBuffer.getLong();
            byteBuffer.get(bytesContent, 0, 8);
            int reconsumeTimes = byteBuffer.getInt();
            long preparedTransactionOffset = byteBuffer.getLong();
            int bodyLen = byteBuffer.getInt();
            if (bodyLen > 0) {
                if (readBody) {
                    int crc;
                    byteBuffer.get(bytesContent, 0, bodyLen);
                    if (checkCRC && (crc = UtilAll.crc32((byte[])bytesContent, (int)0, (int)bodyLen)) != bodyCRC) {
                        log.warn("CRC check failed " + crc + " " + bodyCRC);
                        return new DispatchRequest(-1);
                    }
                } else {
                    byteBuffer.position(byteBuffer.position() + bodyLen);
                }
            }
            byte topicLen = byteBuffer.get();
            byteBuffer.get(bytesContent, 0, topicLen);
            String topic = new String(bytesContent, 0, (int)topicLen);
            long tagsCode = 0L;
            String keys = "";
            short propertiesLength = byteBuffer.getShort();
            if (propertiesLength > 0) {
                byteBuffer.get(bytesContent, 0, propertiesLength);
                String properties = new String(bytesContent, 0, (int)propertiesLength);
                Map propertiesMap = MessageDecoder.string2messageProperties((String)properties);
                keys = (String)propertiesMap.get("KEYS");
                String tags = (String)propertiesMap.get("TAGS");
                if (tags != null && tags.length() > 0) {
                    tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType((int)sysFlag), tags);
                }
                String t = (String)propertiesMap.get("DELAY");
                if ("SCHEDULE_TOPIC_XXXX".equals(topic) && t != null) {
                    int delayLevel = Integer.parseInt(t);
                    if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                        delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
                    }
                    if (delayLevel > 0) {
                        tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel, storeTimestamp);
                    }
                }
            }
            return new DispatchRequest(topic, queueId, physicOffset, totalSize, tagsCode, storeTimestamp, queueOffset, keys, sysFlag, preparedTransactionOffset);
        }
        catch (BufferUnderflowException e) {
            byteBuffer.position(byteBuffer.limit());
        }
        catch (Exception e) {
            byteBuffer.position(byteBuffer.limit());
        }
        return new DispatchRequest(-1);
    }

    public void recoverAbnormally() {
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        List<MapedFile> mapedFiles = this.mapedFileQueue.getMapedFiles();
        if (!mapedFiles.isEmpty()) {
            int index;
            MapedFile mapedFile = null;
            for (index = mapedFiles.size() - 1; index >= 0; --index) {
                mapedFile = mapedFiles.get(index);
                if (!this.isMapedFileMatchedRecover(mapedFile)) continue;
                log.info("recover from this maped file " + mapedFile.getFileName());
                break;
            }
            if (index < 0) {
                index = 0;
                mapedFile = mapedFiles.get(index);
            }
            ByteBuffer byteBuffer = mapedFile.sliceByteBuffer();
            long processOffset = mapedFile.getFileFromOffset();
            long mapedFileOffset = 0L;
            while (true) {
                DispatchRequest dispatchRequest;
                int size;
                if ((size = (dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover)).getMsgSize()) > 0) {
                    mapedFileOffset += (long)size;
                    this.defaultMessageStore.putDispatchRequest(dispatchRequest);
                    continue;
                }
                if (size == -1) {
                    log.info("recover physics file end, " + mapedFile.getFileName());
                    break;
                }
                if (size != 0) continue;
                if (++index >= mapedFiles.size()) {
                    log.info("recover physics file over, last maped file " + mapedFile.getFileName());
                    break;
                }
                mapedFile = mapedFiles.get(index);
                byteBuffer = mapedFile.sliceByteBuffer();
                processOffset = mapedFile.getFileFromOffset();
                mapedFileOffset = 0L;
                log.info("recover next physics file, " + mapedFile.getFileName());
            }
            this.mapedFileQueue.setCommittedWhere(processOffset += mapedFileOffset);
            this.mapedFileQueue.truncateDirtyFiles(processOffset);
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        } else {
            this.mapedFileQueue.setCommittedWhere(0L);
            this.defaultMessageStore.destroyLogics();
        }
    }

    private boolean isMapedFileMatchedRecover(MapedFile mapedFile) {
        ByteBuffer byteBuffer = mapedFile.sliceByteBuffer();
        int magicCode = byteBuffer.getInt(4);
        if (magicCode != -626843481) {
            return false;
        }
        long storeTimestamp = byteBuffer.getLong(56);
        if (0L == storeTimestamp) {
            return false;
        }
        if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable() && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
            if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
                log.info("find check timestamp, {} {}", (Object)storeTimestamp, (Object)UtilAll.timeMillisToHumanString((long)storeTimestamp));
                return true;
            }
        } else if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
            log.info("find check timestamp, {} {}", (Object)storeTimestamp, (Object)UtilAll.timeMillisToHumanString((long)storeTimestamp));
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
        Object service;
        msg.setStoreTimestamp(System.currentTimeMillis());
        msg.setBodyCRC(UtilAll.crc32((byte[])msg.getBody()));
        AppendMessageResult result = null;
        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
        String topic = msg.getTopic();
        int queueId = msg.getQueueId();
        long tagsCode = msg.getTagsCode();
        int tranType = MessageSysFlag.getTransactionValue((int)msg.getSysFlag());
        if ((tranType == 0 || tranType == 8) && msg.getDelayTimeLevel() > 0) {
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }
            topic = "SCHEDULE_TOPIC_XXXX";
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
            tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(msg.getDelayTimeLevel(), msg.getStoreTimestamp());
            MessageAccessor.putProperty((Message)msg, (String)"REAL_TOPIC", (String)msg.getTopic());
            MessageAccessor.putProperty((Message)msg, (String)"REAL_QID", (String)String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String((Map)msg.getProperties()));
            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
        long eclipseTimeInLock = 0L;
        CommitLog commitLog = this;
        synchronized (commitLog) {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            msg.setStoreTimestamp(beginLockTimestamp);
            MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();
            if (null == mapedFile) {
                log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }
            result = mapedFile.appendMessage((Object)msg, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK: {
                    break;
                }
                case END_OF_FILE: {
                    mapedFile = this.mapedFileQueue.getLastMapedFile();
                    if (null == mapedFile) {
                        log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                    result = mapedFile.appendMessage((Object)msg, this.appendMessageCallback);
                    break;
                }
                case MESSAGE_SIZE_EXCEEDED: {
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                }
                case UNKNOWN_ERROR: {
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                }
                default: {
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                }
            }
            DispatchRequest dispatchRequest = new DispatchRequest(topic, queueId, result.getWroteOffset(), result.getWroteBytes(), tagsCode, msg.getStoreTimestamp(), result.getLogicsOffset(), msg.getKeys(), msg.getSysFlag(), msg.getPreparedTransactionOffset());
            this.defaultMessageStore.putDispatchRequest(dispatchRequest);
            eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
        }
        if (eclipseTimeInLock > 1000L) {
            log.warn("putMessage in lock eclipse time(ms) " + eclipseTimeInLock);
        }
        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
        GroupCommitRequest request = null;
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            service = (GroupCommitService)this.flushCommitLogService;
            if (msg.isWaitStoreMsgOK()) {
                request = new GroupCommitRequest(result.getWroteOffset() + (long)result.getWroteBytes());
                ((GroupCommitService)((Object)service)).putRequest(request);
                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags() + " client address: " + msg.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {
                service.wakeup();
            }
        } else {
            this.flushCommitLogService.wakeup();
        }
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            service = this.defaultMessageStore.getHaService();
            if (msg.isWaitStoreMsgOK()) {
                if (((HAService)service).isSlaveOK(result.getWroteOffset() + (long)result.getWroteBytes())) {
                    if (null == request) {
                        request = new GroupCommitRequest(result.getWroteOffset() + (long)result.getWroteBytes());
                    }
                    ((HAService)service).putRequest(request);
                    ((HAService)service).getWaitNotifyObject().wakeupAll();
                    boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    if (!flushOK) {
                        log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: " + msg.getTags() + " client address: " + msg.getBornHostString());
                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                    }
                } else {
                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }
        return putMessageResult;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long pickupStoretimestamp(long offset, int size) {
        SelectMapedBufferResult result;
        if (offset > this.getMinOffset() && null != (result = this.getMessage(offset, size))) {
            try {
                long l = result.getByteBuffer().getLong(56);
                return l;
            }
            finally {
                result.release();
            }
        }
        return -1L;
    }

    public SelectMapedBufferResult getMessage(long offset, int size) {
        int mapedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
        MapedFile mapedFile = this.mapedFileQueue.findMapedFileByOffset(offset, 0L == offset);
        if (mapedFile != null) {
            int pos = (int)(offset % (long)mapedFileSize);
            SelectMapedBufferResult result = mapedFile.selectMapedBuffer(pos, size);
            return result;
        }
        return null;
    }

    public HashMap<String, Long> getTopicQueueTable() {
        return this.topicQueueTable;
    }

    public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) {
        this.topicQueueTable = topicQueueTable;
    }

    public void destroy() {
        this.mapedFileQueue.destroy();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean appendData(long startOffset, byte[] data) {
        CommitLog commitLog = this;
        synchronized (commitLog) {
            MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile(startOffset);
            if (null == mapedFile) {
                log.error("appendData getLastMapedFile error  " + startOffset);
                return false;
            }
            return mapedFile.appendMessage(data);
        }
    }

    public boolean retryDeleteFirstFile(long intervalForcibly) {
        return this.mapedFileQueue.retryDeleteFirstFile(intervalForcibly);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeQueurFromTopicQueueTable(String topic, int queueId) {
        String key = topic + "-" + queueId;
        CommitLog commitLog = this;
        synchronized (commitLog) {
            this.topicQueueTable.remove(key);
        }
        log.info("removeQueurFromTopicQueueTable OK Topic: {} QueueId: {}", (Object)topic, (Object)queueId);
    }

    class DefaultAppendMessageCallback
    implements AppendMessageCallback {
        private static final int END_FILE_MIN_BLANK_LENGTH = 8;
        private final ByteBuffer msgIdMemory = ByteBuffer.allocate(16);
        private final ByteBuffer msgStoreItemMemory;
        private final int maxMessageSize;

        DefaultAppendMessageCallback(int size) {
            this.msgStoreItemMemory = ByteBuffer.allocate(size + 8);
            this.maxMessageSize = size;
        }

        public ByteBuffer getMsgStoreItemMemory() {
            return this.msgStoreItemMemory;
        }

        @Override
        public AppendMessageResult doAppend(long fileFromOffset, ByteBuffer byteBuffer, int maxBlank, Object msg) {
            MessageExtBrokerInner msgInner = (MessageExtBrokerInner)((Object)msg);
            long wroteOffset = fileFromOffset + (long)byteBuffer.position();
            String msgId = MessageDecoder.createMessageId((ByteBuffer)this.msgIdMemory, (ByteBuffer)msgInner.getStoreHostBytes(), (long)wroteOffset);
            String key = msgInner.getTopic() + "-" + msgInner.getQueueId();
            Long queueOffset = (Long)CommitLog.this.topicQueueTable.get(key);
            if (null == queueOffset) {
                queueOffset = 0L;
                CommitLog.this.topicQueueTable.put(key, queueOffset);
            }
            int tranType = MessageSysFlag.getTransactionValue((int)msgInner.getSysFlag());
            switch (tranType) {
                case 4: 
                case 12: {
                    queueOffset = 0L;
                    break;
                }
            }
            byte[] propertiesData = msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes();
            int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
            byte[] topicData = msgInner.getTopic().getBytes();
            int topicLength = topicData == null ? 0 : topicData.length;
            int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
            int msgLen = 88 + bodyLength + 1 + topicLength + 2 + propertiesLength + 0;
            if (msgLen > this.maxMessageSize) {
                log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + ", maxMessageSize: " + this.maxMessageSize);
                return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
            }
            if (msgLen + 8 > maxBlank) {
                this.resetMsgStoreItemMemory(maxBlank);
                this.msgStoreItemMemory.putInt(maxBlank);
                this.msgStoreItemMemory.putInt(-875286124);
                byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), queueOffset);
            }
            this.resetMsgStoreItemMemory(msgLen);
            this.msgStoreItemMemory.putInt(msgLen);
            this.msgStoreItemMemory.putInt(-626843481);
            this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
            this.msgStoreItemMemory.putInt(msgInner.getQueueId());
            this.msgStoreItemMemory.putInt(msgInner.getFlag());
            this.msgStoreItemMemory.putLong(queueOffset);
            this.msgStoreItemMemory.putLong(fileFromOffset + (long)byteBuffer.position());
            this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
            this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
            this.msgStoreItemMemory.put(msgInner.getBornHostBytes());
            this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
            this.msgStoreItemMemory.put(msgInner.getStoreHostBytes());
            this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
            this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
            this.msgStoreItemMemory.putInt(bodyLength);
            if (bodyLength > 0) {
                this.msgStoreItemMemory.put(msgInner.getBody());
            }
            this.msgStoreItemMemory.put((byte)topicLength);
            this.msgStoreItemMemory.put(topicData);
            this.msgStoreItemMemory.putShort((short)propertiesLength);
            if (propertiesLength > 0) {
                this.msgStoreItemMemory.put(propertiesData);
            }
            byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, msgInner.getStoreTimestamp(), queueOffset);
            switch (tranType) {
                case 4: 
                case 12: {
                    break;
                }
                case 0: 
                case 8: {
                    queueOffset = queueOffset + 1L;
                    CommitLog.this.topicQueueTable.put(key, queueOffset);
                    break;
                }
            }
            return result;
        }

        private void resetMsgStoreItemMemory(int length) {
            this.msgStoreItemMemory.flip();
            this.msgStoreItemMemory.limit(length);
        }
    }

    class GroupCommitService
    extends FlushCommitLogService {
        private volatile List<GroupCommitRequest> requestsWrite;
        private volatile List<GroupCommitRequest> requestsRead;

        GroupCommitService() {
            this.requestsWrite = new ArrayList<GroupCommitRequest>();
            this.requestsRead = new ArrayList<GroupCommitRequest>();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void putRequest(GroupCommitRequest request) {
            GroupCommitService groupCommitService = this;
            synchronized (groupCommitService) {
                this.requestsWrite.add(request);
                if (!this.hasNotified) {
                    this.hasNotified = true;
                    ((Object)((Object)this)).notify();
                }
            }
        }

        private void swapRequests() {
            List<GroupCommitRequest> tmp = this.requestsWrite;
            this.requestsWrite = this.requestsRead;
            this.requestsRead = tmp;
        }

        private void doCommit() {
            if (!this.requestsRead.isEmpty()) {
                for (GroupCommitRequest req : this.requestsRead) {
                    boolean flushOK = false;
                    for (int i = 0; i < 2 && !flushOK; ++i) {
                        boolean bl = flushOK = CommitLog.this.mapedFileQueue.getCommittedWhere() >= req.getNextOffset();
                        if (flushOK) continue;
                        CommitLog.this.mapedFileQueue.commit(0);
                    }
                    req.wakeupCustomer(flushOK);
                }
                long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp();
                if (storeTimestamp > 0L) {
                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
                this.requestsRead.clear();
            } else {
                CommitLog.this.mapedFileQueue.commit(0);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            log.info(this.getServiceName() + " service started");
            while (!this.isStoped()) {
                try {
                    this.waitForRunning(0L);
                    this.doCommit();
                }
                catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", (Throwable)e);
                }
            }
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException e) {
                log.warn("GroupCommitService Exception, ", (Throwable)e);
            }
            GroupCommitService groupCommitService = this;
            synchronized (groupCommitService) {
                this.swapRequests();
            }
            this.doCommit();
            log.info(this.getServiceName() + " service end");
        }

        protected void onWaitEnd() {
            this.swapRequests();
        }

        public String getServiceName() {
            return GroupCommitService.class.getSimpleName();
        }

        public long getJointime() {
            return 300000L;
        }
    }

    public class GroupCommitRequest {
        private final long nextOffset;
        private final CountDownLatch countDownLatch = new CountDownLatch(1);
        private volatile boolean flushOK = false;

        public GroupCommitRequest(long nextOffset) {
            this.nextOffset = nextOffset;
        }

        public long getNextOffset() {
            return this.nextOffset;
        }

        public void wakeupCustomer(boolean flushOK) {
            this.flushOK = flushOK;
            this.countDownLatch.countDown();
        }

        public boolean waitForFlush(long timeout) {
            try {
                boolean result = this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
                return result || this.flushOK;
            }
            catch (InterruptedException e) {
                e.printStackTrace();
                return false;
            }
        }
    }

    class FlushRealTimeService
    extends FlushCommitLogService {
        private static final int RetryTimesOver = 3;
        private long lastFlushTimestamp;
        private long printTimes;

        FlushRealTimeService() {
            this.lastFlushTimestamp = 0L;
            this.printTimes = 0L;
        }

        public void run() {
            log.info(this.getServiceName() + " service started");
            while (!this.isStoped()) {
                boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
                int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
                int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
                boolean printFlushProgress = false;
                long currentTimeMillis = System.currentTimeMillis();
                if (currentTimeMillis >= this.lastFlushTimestamp + (long)flushPhysicQueueThoroughInterval) {
                    this.lastFlushTimestamp = currentTimeMillis;
                    flushPhysicQueueLeastPages = 0;
                    printFlushProgress = this.printTimes++ % 10L == 0L;
                }
                try {
                    if (flushCommitLogTimed) {
                        Thread.sleep(interval);
                    } else {
                        this.waitForRunning(interval);
                    }
                    if (printFlushProgress) {
                        this.printFlushProgress();
                    }
                    CommitLog.this.mapedFileQueue.commit(flushPhysicQueueLeastPages);
                    long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp();
                    if (storeTimestamp <= 0L) continue;
                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
                catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", (Throwable)e);
                    this.printFlushProgress();
                }
            }
            boolean result = false;
            for (int i = 0; i < 3 && !result; ++i) {
                result = CommitLog.this.mapedFileQueue.commit(0);
                log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
            }
            this.printFlushProgress();
            log.info(this.getServiceName() + " service end");
        }

        public String getServiceName() {
            return FlushCommitLogService.class.getSimpleName();
        }

        private void printFlushProgress() {
            log.info("how much disk fall behind memory, " + CommitLog.this.mapedFileQueue.howMuchFallBehind());
        }

        public long getJointime() {
            return 300000L;
        }
    }

    abstract class FlushCommitLogService
    extends ServiceThread {
        FlushCommitLogService() {
        }
    }
}

