/*
 * Decompiled with CFR 0.152.
 */
package com.github.ltsopensource.kv.txlog;

import com.github.ltsopensource.core.commons.file.FileUtils;
import com.github.ltsopensource.core.commons.io.UnsafeByteArrayInputStream;
import com.github.ltsopensource.core.commons.io.UnsafeByteArrayOutputStream;
import com.github.ltsopensource.core.json.TypeReference;
import com.github.ltsopensource.kv.CapacityNotEnoughException;
import com.github.ltsopensource.kv.Cursor;
import com.github.ltsopensource.kv.DBException;
import com.github.ltsopensource.kv.EmptyCursor;
import com.github.ltsopensource.kv.Operation;
import com.github.ltsopensource.kv.StoreConfig;
import com.github.ltsopensource.kv.serializer.StoreSerializer;
import com.github.ltsopensource.kv.txlog.StoreTxLog;
import com.github.ltsopensource.kv.txlog.StoreTxLogCursorEntry;
import com.github.ltsopensource.kv.txlog.StoreTxLogEntry;
import com.github.ltsopensource.kv.txlog.StoreTxLogPosition;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;

public class StoreTxLogEngine<K, V> {
    private volatile StoreTxLog storeTxLog;
    private StoreSerializer serializer;
    private AtomicBoolean initialed = new AtomicBoolean(false);
    private List<StoreTxLog> storeTxLogs;
    private StoreConfig storeConfig;
    private static final String LOG_FILE_SUFFIX = ".log";
    private File logPath;

    public StoreTxLogEngine(StoreSerializer serializer, StoreConfig storeConfig) {
        this.logPath = storeConfig.getLogPath();
        this.storeConfig = storeConfig;
        this.serializer = serializer;
        this.storeTxLogs = new CopyOnWriteArrayList<StoreTxLog>();
    }

    public void init() throws IOException {
        if (!this.initialed.compareAndSet(false, true)) {
            return;
        }
        FileUtils.createDirIfNotExist(this.logPath);
        String[] logFiles = this.logPath.list(new FilenameFilter(){

            @Override
            public boolean accept(File file, String name) {
                return name.endsWith(StoreTxLogEngine.LOG_FILE_SUFFIX);
            }
        });
        if (logFiles == null) {
            throw new IOException("can't list file in " + this.logPath);
        }
        if (logFiles.length > 0) {
            Arrays.sort(logFiles, new Comparator<String>(){

                @Override
                public int compare(String left, String right) {
                    return left.compareTo(right);
                }
            });
            for (int i = 0; i < logFiles.length; ++i) {
                String logFile = logFiles[i];
                boolean isWritable = ++i == logFiles.length;
                StoreTxLog storeTxLog = new StoreTxLog(this.storeConfig, new File(this.logPath, logFile), !isWritable, false, 0L);
                if (i > 1) {
                    this.storeTxLogs.get(i - 1).setNext(storeTxLog);
                }
                this.storeTxLogs.add(storeTxLog);
                if (!isWritable) continue;
                this.storeTxLog = storeTxLog;
            }
        } else {
            String name = System.currentTimeMillis() + LOG_FILE_SUFFIX;
            this.storeTxLog = new StoreTxLog(this.storeConfig, new File(this.logPath, name), false, true, 0L);
            this.storeTxLogs.add(this.storeTxLog);
        }
    }

    private StoreTxLog nextNewStoreTxLog() throws IOException {
        long firstRecordId = this.storeTxLog.getNextRecordId();
        String name = System.currentTimeMillis() + LOG_FILE_SUFFIX;
        StoreTxLog newStoreTxLog = new StoreTxLog(this.storeConfig, new File(this.logPath, name), false, true, firstRecordId);
        this.storeTxLogs.add(newStoreTxLog);
        this.storeTxLog.setNext(newStoreTxLog);
        this.storeTxLog = newStoreTxLog;
        return this.storeTxLog;
    }

    public StoreTxLogPosition append(Operation op, K key) throws DBException {
        return this.append(op, key, null);
    }

    public StoreTxLogPosition append(Operation op, K key, V value) throws DBException {
        try {
            try {
                return this.append(this.storeTxLog, op, key, value);
            }
            catch (CapacityNotEnoughException notEnough) {
                return this.append(this.nextNewStoreTxLog(), op, key, value);
            }
        }
        catch (Exception e) {
            throw new DBException("append dbLog error:" + e.getMessage(), e);
        }
    }

    private StoreTxLogPosition append(StoreTxLog storeTxLog, Operation op, K key, V value) throws IOException {
        StoreTxLogEntry entry = null;
        long timestamp = System.currentTimeMillis();
        switch (op) {
            case PUT: {
                entry = new StoreTxLogEntry<K, V>(op, key, value, timestamp);
                break;
            }
            case REMOVE: {
                entry = new StoreTxLogEntry(op, key, timestamp);
            }
        }
        UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
        this.serializer.serialize(entry, out);
        byte[] entryBytes = out.toByteArray();
        return storeTxLog.append(entryBytes);
    }

    public Cursor<StoreTxLogCursorEntry<K, V>> cursor(StoreTxLogPosition position) {
        long recordId = position.getRecordId();
        if (this.storeTxLogs.size() == 0) {
            return new EmptyCursor<StoreTxLogCursorEntry<K, V>>();
        }
        StoreTxLog targetTxLog = null;
        for (StoreTxLog txLog : this.storeTxLogs) {
            if (recordId < txLog.getFirstRecordId() || recordId >= txLog.getNextRecordId()) continue;
            targetTxLog = txLog;
            break;
        }
        if (targetTxLog == null) {
            return new EmptyCursor<StoreTxLogCursorEntry<K, V>>();
        }
        return new StoreTxLogCursor(targetTxLog, recordId - targetTxLog.getFirstRecordId());
    }

    private class StoreTxLogCursor
    implements Cursor<StoreTxLogCursorEntry<K, V>> {
        private StoreTxLog currentTxLog;
        private long position;

        public StoreTxLogCursor(StoreTxLog currentTxLog, long position) {
            this.currentTxLog = currentTxLog;
            if (position <= 0L) {
                position = currentTxLog.getHeaderLength();
            }
            this.position = position;
        }

        @Override
        public boolean hasNext() {
            if (this.currentTxLog == null) {
                return false;
            }
            if (this.position < this.currentTxLog.getFileLength()) {
                return true;
            }
            if (this.currentTxLog.next() == null) {
                return false;
            }
            this.currentTxLog = this.currentTxLog.next();
            this.position = this.currentTxLog.getHeaderLength();
            return true;
        }

        @Override
        public StoreTxLogCursorEntry<K, V> next() {
            try {
                byte[] entry = this.currentTxLog.readEntry(this.position);
                int entryLength = entry.length;
                UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream(entry);
                StoreTxLogEntry storeTxLogEntry = (StoreTxLogEntry)StoreTxLogEngine.this.serializer.deserialize(is, new TypeReference<StoreTxLogEntry<K, V>>(){}.getType());
                StoreTxLogCursorEntry storeTxLogCursorEntry = new StoreTxLogCursorEntry();
                storeTxLogCursorEntry.setStoreTxLogEntry(storeTxLogEntry);
                storeTxLogCursorEntry.setPosition(new StoreTxLogPosition(this.currentTxLog.getFirstRecordId() + this.position));
                this.position = this.currentTxLog.nextEntryPosition(this.position, entryLength);
                return storeTxLogCursorEntry;
            }
            catch (IOException e) {
                throw new DBException("Cursor next() error:" + e.getMessage(), e);
            }
        }
    }
}

