/*
 * Decompiled with CFR 0.152.
 */
package com.github.ltsopensource.nio.processor;

import com.github.ltsopensource.core.commons.utils.CollectionUtils;
import com.github.ltsopensource.core.constant.Constants;
import com.github.ltsopensource.core.factory.NamedThreadFactory;
import com.github.ltsopensource.core.logger.Logger;
import com.github.ltsopensource.core.logger.LoggerFactory;
import com.github.ltsopensource.core.support.SystemClock;
import com.github.ltsopensource.nio.NioException;
import com.github.ltsopensource.nio.channel.ChannelInitializer;
import com.github.ltsopensource.nio.channel.NioChannel;
import com.github.ltsopensource.nio.handler.Futures;
import com.github.ltsopensource.nio.handler.NioHandler;
import com.github.ltsopensource.nio.idle.IdleDetector;
import com.github.ltsopensource.nio.loop.NioSelectorLoop;
import com.github.ltsopensource.nio.processor.NioProcessor;
import com.github.ltsopensource.nio.processor.WriteQueue;
import com.github.ltsopensource.nio.processor.WriteRequest;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

public abstract class AbstractNioProcessor
implements NioProcessor {
    protected static final Logger LOGGER = LoggerFactory.getLogger(NioProcessor.class);
    private NioHandler eventHandler;
    protected NioSelectorLoop selectorLoop;
    private Executor executor;
    private ConcurrentMap<NioChannel, WriteQueue> QUEUE_MAP = new ConcurrentHashMap<NioChannel, WriteQueue>();
    private AtomicBoolean started = new AtomicBoolean(false);
    protected IdleDetector idleDetector;
    protected ChannelInitializer channelInitializer;

    public AbstractNioProcessor(NioHandler eventHandler, ChannelInitializer channelInitializer) {
        this.eventHandler = eventHandler;
        this.executor = Executors.newFixedThreadPool(Constants.AVAILABLE_PROCESSOR, new NamedThreadFactory("NioProcessorExecutor", true));
        this.selectorLoop = new NioSelectorLoop("AcceptSelectorLoop-I/O", this);
        this.idleDetector = new IdleDetector();
        this.channelInitializer = channelInitializer;
        this.idleDetector.start();
    }

    @Override
    public Futures.WriteFuture writeAndFlush(NioChannel channel, Object msg) {
        SelectionKey key = channel.socketChannel().keyFor(this.selectorLoop.selector());
        if (key != null && key.isValid()) {
            key.interestOps(4);
        }
        return this.write(channel, msg, true);
    }

    private Futures.WriteFuture write(NioChannel channel, Object msg, boolean flush) {
        Futures.WriteFuture future = Futures.newWriteFuture();
        if (msg == null) {
            future.setSuccess(true);
            future.setMsg("msg is null");
            future.notifyListeners();
            return future;
        }
        ByteBuffer buf = null;
        try {
            buf = channel.getEncoder().encode(channel, msg);
            if (buf == null) {
                future.setSuccess(false);
                future.setMsg("encode msg error");
                future.notifyListeners();
                return future;
            }
            ((WriteQueue)this.QUEUE_MAP.get(channel)).offer(new WriteRequest(buf, future));
        }
        catch (Exception e) {
            throw new NioException("encode msg " + msg + " error", e);
        }
        if (flush) {
            this.doFlush(channel);
        }
        return future;
    }

    @Override
    public void flush(NioChannel channel) {
        this.doFlush(channel);
    }

    private void doFlush(final NioChannel channel) {
        this.executor().execute(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                WriteQueue queue = (WriteQueue)AbstractNioProcessor.this.QUEUE_MAP.get(channel);
                if (!queue.tryLock()) {
                    return;
                }
                try {
                    SelectionKey key;
                    while (!queue.isEmpty()) {
                        WriteRequest msg = queue.peek();
                        if (msg == null) continue;
                        Futures.WriteFuture writeFuture = msg.getWriteFuture();
                        try {
                            ByteBuffer buf = msg.getMessage();
                            int written = channel.socketChannel().write(buf);
                            if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug("wrote bytes {}", written);
                            }
                            channel.setLastWriteTime(SystemClock.now());
                            if (buf.remaining() != 0) break;
                            queue.poll();
                            writeFuture.setSuccess(true);
                            writeFuture.notifyListeners();
                        }
                        catch (Exception e) {
                            LOGGER.error("IOE while writing", e);
                            writeFuture.setSuccess(false);
                            writeFuture.setCause(e);
                            writeFuture.notifyListeners();
                            AbstractNioProcessor.this.eventHandler().exceptionCaught(channel, e);
                            break;
                        }
                    }
                    if ((key = channel.socketChannel().keyFor(AbstractNioProcessor.this.selectorLoop.selector())) != null && key.isValid()) {
                        key.interestOps(1);
                    }
                }
                finally {
                    queue.unlock();
                }
            }
        });
    }

    @Override
    public void read(NioChannel channel) {
        try {
            ByteBuffer readBuffer = ByteBuffer.allocate(65536);
            int readCount = channel.socketChannel().read(readBuffer);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("read {} bytes", readCount);
            }
            if (readCount < 0) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("channel closed by the remote peer");
                }
                channel.close();
            } else if (readCount > 0) {
                readBuffer.flip();
                this.doMessageReceived(channel, readBuffer);
                readBuffer.clear();
            }
        }
        catch (IOException e) {
            LOGGER.error("IOE while reading : ", e);
            this.eventHandler().exceptionCaught(channel, e);
        }
    }

    private void doMessageReceived(final NioChannel channel, final ByteBuffer message) {
        this.executor().execute(new Runnable(){

            @Override
            public void run() {
                try {
                    List<Object> objs = channel.getDecoder().decode(channel, message);
                    if (CollectionUtils.isNotEmpty(objs)) {
                        for (Object obj : objs) {
                            AbstractNioProcessor.this.eventHandler().messageReceived(channel, obj);
                        }
                    }
                    channel.setLastReadTime(SystemClock.now());
                }
                catch (Exception e) {
                    AbstractNioProcessor.this.eventHandler().exceptionCaught(channel, e);
                }
            }
        });
    }

    @Override
    public Futures.ConnectFuture connect(SocketAddress remoteAddress) {
        Futures.ConnectFuture connectFuture = Futures.newConnectFuture();
        NioChannel channel = this.doConnect(remoteAddress, this.selectorLoop, connectFuture);
        this.QUEUE_MAP.putIfAbsent(channel, new WriteQueue());
        return connectFuture;
    }

    @Override
    public void accept(SelectionKey key) {
        NioChannel channel = this.doAccept(this.selectorLoop);
        this.QUEUE_MAP.putIfAbsent(channel, new WriteQueue());
    }

    public void start() {
        if (this.started.compareAndSet(false, true)) {
            this.selectorLoop.start();
        }
    }

    protected abstract NioChannel doAccept(NioSelectorLoop var1);

    protected abstract NioChannel doConnect(SocketAddress var1, NioSelectorLoop var2, Futures.ConnectFuture var3);

    protected NioHandler eventHandler() {
        return this.eventHandler;
    }

    protected Executor executor() {
        return this.executor;
    }
}

