/*
 * Decompiled with CFR 0.152.
 */
package com.github.ltsopensource.core.support;

import com.github.ltsopensource.core.AppContext;
import com.github.ltsopensource.core.cluster.Node;
import com.github.ltsopensource.core.commons.utils.CollectionUtils;
import com.github.ltsopensource.core.commons.utils.GenericsUtils;
import com.github.ltsopensource.core.domain.Pair;
import com.github.ltsopensource.core.factory.NamedThreadFactory;
import com.github.ltsopensource.core.failstore.AbstractFailStore;
import com.github.ltsopensource.core.failstore.FailStore;
import com.github.ltsopensource.core.failstore.FailStoreException;
import com.github.ltsopensource.core.failstore.FailStoreFactory;
import com.github.ltsopensource.core.json.JSON;
import com.github.ltsopensource.core.logger.Logger;
import com.github.ltsopensource.core.logger.LoggerFactory;
import com.github.ltsopensource.core.spi.ServiceLoader;
import com.github.ltsopensource.ec.EventInfo;
import com.github.ltsopensource.ec.EventSubscriber;
import com.github.ltsopensource.ec.Observer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

public abstract class RetryScheduler<T> {
    public static final Logger LOGGER = LoggerFactory.getLogger(RetryScheduler.class);
    private Class<?> type = GenericsUtils.getSuperClassGenericType(this.getClass());
    private ScheduledExecutorService RETRY_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LTS-RetryScheduler-retry", true));
    private ScheduledExecutorService MASTER_RETRY_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LTS-RetryScheduler-master-retry", true));
    private ScheduledFuture<?> masterScheduledFuture;
    private ScheduledFuture<?> scheduledFuture;
    private AtomicBoolean selfCheckStart = new AtomicBoolean(false);
    private AtomicBoolean masterCheckStart = new AtomicBoolean(false);
    private FailStore failStore;
    private String name;
    private int batchSize = 5;
    private ReentrantLock lock = new ReentrantLock();
    private AppContext appContext;

    public RetryScheduler(String name, final AppContext appContext, String storePath) {
        this.name = name;
        this.appContext = appContext;
        FailStoreFactory failStoreFactory = ServiceLoader.load(FailStoreFactory.class, appContext.getConfig());
        this.failStore = failStoreFactory.getFailStore(appContext.getConfig(), storePath);
        try {
            this.failStore.open();
        }
        catch (FailStoreException e) {
            throw new RuntimeException(e);
        }
        EventSubscriber subscriber = new EventSubscriber(RetryScheduler.class.getSimpleName().concat(appContext.getConfig().getIdentity()), new Observer(){

            @Override
            public void onObserved(EventInfo eventInfo) {
                Node masterNode = (Node)eventInfo.getParam("master");
                if (masterNode != null && masterNode.getIdentity().equals(appContext.getConfig().getIdentity())) {
                    RetryScheduler.this.startMasterCheck();
                } else {
                    RetryScheduler.this.stopMasterCheck();
                }
            }
        });
        appContext.getEventCenter().subscribe(subscriber, "MASTER_CHANGED");
        if (appContext.getMasterElector().isCurrentMaster()) {
            this.startMasterCheck();
        }
    }

    public RetryScheduler(String name, AppContext appContext, String storePath, int batchSize) {
        this(name, appContext, storePath);
        this.batchSize = batchSize;
    }

    public void start() {
        try {
            if (this.selfCheckStart.compareAndSet(false, true)) {
                this.scheduledFuture = this.RETRY_EXECUTOR_SERVICE.scheduleWithFixedDelay(new CheckSelfRunner(), 10L, 30L, TimeUnit.SECONDS);
                LOGGER.info("Start {} RetryScheduler success, identity=[{}]", this.name, this.appContext.getConfig().getIdentity());
            }
        }
        catch (Throwable t) {
            LOGGER.error("Start {} RetryScheduler failed, identity=[{}]", this.name, this.appContext.getConfig().getIdentity(), t);
        }
    }

    private void startMasterCheck() {
        try {
            if (this.masterCheckStart.compareAndSet(false, true)) {
                this.masterScheduledFuture = this.MASTER_RETRY_EXECUTOR_SERVICE.scheduleWithFixedDelay(new CheckDeadFailStoreRunner(), 30L, 60L, TimeUnit.SECONDS);
                LOGGER.info("Start {} master RetryScheduler success, identity=[{}]", this.name, this.appContext.getConfig().getIdentity());
            }
        }
        catch (Throwable t) {
            LOGGER.error("Start {} master RetryScheduler failed, identity=[{}]", this.name, this.appContext.getConfig().getIdentity(), t);
        }
    }

    private void stopMasterCheck() {
        try {
            if (this.masterCheckStart.compareAndSet(true, false)) {
                if (this.masterScheduledFuture != null) {
                    this.masterScheduledFuture.cancel(true);
                    this.MASTER_RETRY_EXECUTOR_SERVICE.shutdown();
                }
                LOGGER.info("Stop {} master RetryScheduler success, identity=[{}]", this.name, this.appContext.getConfig().getIdentity());
            }
        }
        catch (Throwable t) {
            LOGGER.error("Stop {} master RetryScheduler failed, identity=[{}]", this.name, this.appContext.getConfig().getIdentity(), t);
        }
    }

    public void stop() {
        try {
            if (this.selfCheckStart.compareAndSet(true, false)) {
                if (this.scheduledFuture != null) {
                    this.scheduledFuture.cancel(true);
                    this.failStore.close();
                    this.RETRY_EXECUTOR_SERVICE.shutdown();
                }
                LOGGER.info("Stop {} RetryScheduler success, identity=[{}]", this.name, this.appContext.getConfig().getIdentity());
            }
            this.stopMasterCheck();
        }
        catch (Throwable t) {
            LOGGER.error("Stop {} RetryScheduler failed, identity=[{}]", this.name, this.appContext.getConfig().getIdentity(), t);
        }
    }

    public void destroy() {
        try {
            this.stop();
            this.failStore.destroy();
        }
        catch (FailStoreException e) {
            LOGGER.error("destroy {} RetryScheduler failed, identity=[{}]", this.name, this.appContext.getConfig().getIdentity(), e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void inSchedule(String key, T value) {
        try {
            this.lock.tryLock();
            this.failStore.put(key, value);
            LOGGER.info("{} RetryScheduler, local files save success, identity=[{}], {}", this.name, this.appContext.getConfig().getIdentity(), JSON.toJSONString(value));
        }
        catch (FailStoreException e) {
            LOGGER.error("{} RetryScheduler in schedule error, identity=[{}]", this.name, e, this.appContext.getConfig().getIdentity());
        }
        finally {
            if (this.lock.isHeldByCurrentThread()) {
                this.lock.unlock();
            }
        }
    }

    protected abstract boolean isRemotingEnable();

    protected abstract boolean retry(List<T> var1);

    private class CheckDeadFailStoreRunner
    implements Runnable {
        private CheckDeadFailStoreRunner() {
        }

        @Override
        public void run() {
            try {
                if (!RetryScheduler.this.isRemotingEnable()) {
                    return;
                }
                List<FailStore> failStores = null;
                if (RetryScheduler.this.failStore instanceof AbstractFailStore) {
                    failStores = ((AbstractFailStore)RetryScheduler.this.failStore).getDeadFailStores();
                }
                if (CollectionUtils.isEmpty(failStores)) {
                    return;
                }
                block4: for (FailStore store : failStores) {
                    store.open();
                    while (true) {
                        List pairs;
                        if (CollectionUtils.isEmpty(pairs = store.fetchTop(RetryScheduler.this.batchSize, RetryScheduler.this.type))) {
                            store.destroy();
                            LOGGER.info("{} RetryScheduler, delete store dir[{}] success, identity=[{}] ", RetryScheduler.this.name, store.getPath(), RetryScheduler.this.appContext.getConfig().getIdentity());
                            continue block4;
                        }
                        ArrayList values = new ArrayList(pairs.size());
                        ArrayList<String> keys = new ArrayList<String>(pairs.size());
                        for (Pair pair : pairs) {
                            keys.add(pair.getKey());
                            values.add(pair.getValue());
                        }
                        if (!RetryScheduler.this.retry(values)) {
                            store.close();
                            continue block4;
                        }
                        LOGGER.info("{} RetryScheduler, dead local files send success, identity=[{}], size: {}, {}", RetryScheduler.this.name, RetryScheduler.this.appContext.getConfig().getIdentity(), values.size(), JSON.toJSONString(values));
                        store.delete(keys);
                        try {
                            Thread.sleep(500L);
                        }
                        catch (Exception ignored) {}
                    }
                }
            }
            catch (Throwable e) {
                LOGGER.error("Run {} master RetryScheduler error, identity=[{}] ", RetryScheduler.this.name, RetryScheduler.this.appContext.getConfig().getIdentity(), e);
            }
        }
    }

    private class CheckSelfRunner
    implements Runnable {
        private CheckSelfRunner() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                if (!RetryScheduler.this.isRemotingEnable()) {
                    return;
                }
                List pairs = null;
                do {
                    try {
                        RetryScheduler.this.lock.tryLock(1000L, TimeUnit.MILLISECONDS);
                        pairs = RetryScheduler.this.failStore.fetchTop(RetryScheduler.this.batchSize, RetryScheduler.this.type);
                        if (!CollectionUtils.isEmpty(pairs)) {
                            ArrayList values = new ArrayList(pairs.size());
                            ArrayList<String> keys = new ArrayList<String>(pairs.size());
                            for (Pair pair : pairs) {
                                keys.add(pair.getKey());
                                values.add(pair.getValue());
                            }
                            if (RetryScheduler.this.retry(values)) {
                                LOGGER.info("{} RetryScheduler, local files send success, identity=[{}], size: {}, {}", RetryScheduler.this.name, RetryScheduler.this.appContext.getConfig().getIdentity(), values.size(), JSON.toJSONString(values));
                                RetryScheduler.this.failStore.delete(keys);
                                continue;
                            }
                        }
                        break;
                    }
                    finally {
                        if (RetryScheduler.this.lock.isHeldByCurrentThread()) {
                            RetryScheduler.this.lock.unlock();
                        }
                    }
                } while (CollectionUtils.isNotEmpty(pairs));
            }
            catch (Throwable e) {
                LOGGER.error("Run {} RetryScheduler error , identity=[{}]", RetryScheduler.this.name, RetryScheduler.this.appContext.getConfig().getIdentity(), e);
            }
        }
    }
}

