/*
 * Decompiled with CFR 0.152.
 */
package com.github.ltsopensource.core.remoting;

import com.github.ltsopensource.core.AppContext;
import com.github.ltsopensource.core.cluster.Node;
import com.github.ltsopensource.core.cluster.NodeType;
import com.github.ltsopensource.core.commons.utils.CollectionUtils;
import com.github.ltsopensource.core.factory.NamedThreadFactory;
import com.github.ltsopensource.core.logger.Logger;
import com.github.ltsopensource.core.logger.LoggerFactory;
import com.github.ltsopensource.core.protocol.JobProtos;
import com.github.ltsopensource.core.protocol.command.HeartBeatRequest;
import com.github.ltsopensource.core.remoting.RemotingClientDelegate;
import com.github.ltsopensource.ec.EventInfo;
import com.github.ltsopensource.ec.EventSubscriber;
import com.github.ltsopensource.ec.Observer;
import com.github.ltsopensource.remoting.protocol.RemotingCommand;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class HeartBeatMonitor {
    private static final Logger LOGGER = LoggerFactory.getLogger(HeartBeatMonitor.class.getSimpleName());
    private final ScheduledExecutorService PING_EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1, new NamedThreadFactory("LTS-HeartBeat-Ping", true));
    private ScheduledFuture<?> pingScheduledFuture;
    private final ScheduledExecutorService FAST_PING_EXECUTOR = Executors.newScheduledThreadPool(1, new NamedThreadFactory("LTS-HeartBeat-Fast-Ping", true));
    private ScheduledFuture<?> fastPingScheduledFuture;
    private RemotingClientDelegate remotingClient;
    private AppContext appContext;
    private EventSubscriber jobTrackerUnavailableEventSubscriber;
    private AtomicBoolean pingStart = new AtomicBoolean(false);
    private AtomicBoolean fastPingStart = new AtomicBoolean(false);
    private AtomicBoolean running = new AtomicBoolean(false);

    public HeartBeatMonitor(RemotingClientDelegate remotingClient, AppContext appContext) {
        this.remotingClient = remotingClient;
        this.appContext = appContext;
        this.jobTrackerUnavailableEventSubscriber = new EventSubscriber(HeartBeatMonitor.class.getName() + "_PING_" + appContext.getConfig().getIdentity(), new Observer(){

            @Override
            public void onObserved(EventInfo eventInfo) {
                HeartBeatMonitor.this.startFastPing();
                HeartBeatMonitor.this.stopPing();
            }
        });
        appContext.getEventCenter().subscribe(new EventSubscriber(HeartBeatMonitor.class.getName() + "_NODE_ADD_" + appContext.getConfig().getIdentity(), new Observer(){

            @Override
            public void onObserved(EventInfo eventInfo) {
                Node node = (Node)eventInfo.getParam("node");
                if (node == null || NodeType.JOB_TRACKER != node.getNodeType()) {
                    return;
                }
                try {
                    HeartBeatMonitor.this.check(node);
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
            }
        }), "NODE_ADD");
    }

    public void start() {
        this.startFastPing();
    }

    public void stop() {
        this.stopPing();
        this.stopFastPing();
    }

    private void startPing() {
        try {
            if (this.pingStart.compareAndSet(false, true)) {
                this.appContext.getEventCenter().subscribe(this.jobTrackerUnavailableEventSubscriber, "NO_JOB_TRACKER_AVAILABLE");
                if (this.pingScheduledFuture == null) {
                    this.pingScheduledFuture = this.PING_EXECUTOR_SERVICE.scheduleWithFixedDelay(new Runnable(){

                        @Override
                        public void run() {
                            if (HeartBeatMonitor.this.pingStart.get()) {
                                HeartBeatMonitor.this.ping();
                            }
                        }
                    }, 30L, 30L, TimeUnit.SECONDS);
                }
                LOGGER.debug("Start slow ping success.");
            }
        }
        catch (Throwable t) {
            LOGGER.error("Start slow ping failed.", t);
        }
    }

    private void stopPing() {
        try {
            if (this.pingStart.compareAndSet(true, false)) {
                this.appContext.getEventCenter().unSubscribe("NO_JOB_TRACKER_AVAILABLE", this.jobTrackerUnavailableEventSubscriber);
                LOGGER.debug("Stop slow ping success.");
            }
        }
        catch (Throwable t) {
            LOGGER.error("Stop slow ping failed.", t);
        }
    }

    private void startFastPing() {
        if (this.fastPingStart.compareAndSet(false, true)) {
            try {
                if (this.fastPingScheduledFuture == null) {
                    this.fastPingScheduledFuture = this.FAST_PING_EXECUTOR.scheduleWithFixedDelay(new Runnable(){

                        @Override
                        public void run() {
                            if (HeartBeatMonitor.this.fastPingStart.get()) {
                                HeartBeatMonitor.this.ping();
                            }
                        }
                    }, 500L, 500L, TimeUnit.MILLISECONDS);
                }
                LOGGER.debug("Start fast ping success.");
            }
            catch (Throwable t) {
                LOGGER.error("Start fast ping failed.", t);
            }
        }
    }

    private void stopFastPing() {
        try {
            if (this.fastPingStart.compareAndSet(true, false)) {
                LOGGER.debug("Stop fast ping success.");
            }
        }
        catch (Throwable t) {
            LOGGER.error("Stop fast ping failed.", t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void ping() {
        block5: {
            try {
                if (!this.running.compareAndSet(false, true)) break block5;
                try {
                    this.check();
                }
                finally {
                    this.running.compareAndSet(true, false);
                }
            }
            catch (Throwable t) {
                LOGGER.error("Ping JobTracker error", t);
            }
        }
    }

    private void check() {
        List<Node> jobTrackers = this.appContext.getSubscribedNodeManager().getNodeList(NodeType.JOB_TRACKER);
        if (CollectionUtils.isEmpty(jobTrackers)) {
            return;
        }
        for (Node jobTracker : jobTrackers) {
            this.check(jobTracker);
        }
    }

    private void check(Node jobTracker) {
        if (this.beat(this.remotingClient, jobTracker.getAddress())) {
            this.remotingClient.addJobTracker(jobTracker);
            if (!this.remotingClient.isServerEnable()) {
                this.remotingClient.setServerEnable(true);
                this.appContext.getEventCenter().publishAsync(new EventInfo("JOB_TRACKER_AVAILABLE"));
            } else {
                this.remotingClient.setServerEnable(true);
            }
            this.stopFastPing();
            this.startPing();
        } else {
            this.remotingClient.removeJobTracker(jobTracker);
        }
    }

    private boolean beat(RemotingClientDelegate remotingClient, String addr) {
        HeartBeatRequest commandBody = this.appContext.getCommandBodyWrapper().wrapper(new HeartBeatRequest());
        RemotingCommand request = RemotingCommand.createRequestCommand(JobProtos.RequestCode.HEART_BEAT.code(), commandBody);
        try {
            RemotingCommand response = remotingClient.getRemotingClient().invokeSync(addr, request, 5000L);
            if (response != null && JobProtos.ResponseCode.HEART_BEAT_SUCCESS == JobProtos.ResponseCode.valueOf(response.getCode())) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("heart beat success. ");
                }
                return true;
            }
        }
        catch (Exception e) {
            LOGGER.warn(e.getMessage());
        }
        return false;
    }
}

