/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.ip.tcp.connection;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.integration.ip.tcp.connection.NoListenerException;
import org.springframework.integration.ip.tcp.connection.TcpConnectionSupport;
import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;
import org.springframework.integration.util.CompositeExecutor;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

public class TcpNioConnection
extends TcpConnectionSupport {
    private static final long DEFAULT_PIPE_TIMEOUT = 60000L;
    private final SocketChannel socketChannel;
    private final ChannelOutputStream channelOutputStream;
    private final ChannelInputStream channelInputStream = new ChannelInputStream();
    private volatile OutputStream bufferedOutputStream;
    private volatile boolean usingDirectBuffers;
    private volatile CompositeExecutor taskExecutor;
    private volatile ByteBuffer rawBuffer;
    private volatile int maxMessageSize = 61440;
    private volatile long lastRead;
    private volatile long lastSend;
    private final AtomicInteger executionControl = new AtomicInteger();
    private volatile boolean writingToPipe;
    private volatile CountDownLatch writingLatch;
    private volatile long pipeTimeout = 60000L;

    public TcpNioConnection(SocketChannel socketChannel, boolean server, boolean lookupHost, ApplicationEventPublisher applicationEventPublisher, String connectionFactoryName) throws Exception {
        super(socketChannel.socket(), server, lookupHost, applicationEventPublisher, connectionFactoryName);
        this.socketChannel = socketChannel;
        int receiveBufferSize = socketChannel.socket().getReceiveBufferSize();
        if (receiveBufferSize <= 0) {
            receiveBufferSize = this.maxMessageSize;
        }
        this.channelOutputStream = new ChannelOutputStream();
    }

    public void setPipeTimeout(long pipeTimeout) {
        this.pipeTimeout = pipeTimeout;
    }

    @Override
    public void close() {
        this.setNoReadErrorOnClose(true);
        this.doClose();
    }

    private void doClose() {
        try {
            this.channelInputStream.close();
        }
        catch (IOException iOException) {
            // empty catch block
        }
        try {
            this.socketChannel.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
        super.close();
    }

    @Override
    public boolean isOpen() {
        return this.socketChannel.isOpen();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void send(Message<?> message) throws Exception {
        SocketChannel socketChannel = this.socketChannel;
        synchronized (socketChannel) {
            if (this.bufferedOutputStream == null) {
                int writeBufferSize = this.socketChannel.socket().getSendBufferSize();
                this.bufferedOutputStream = new BufferedOutputStream(this.getChannelOutputStream(), writeBufferSize > 0 ? writeBufferSize : 8192);
            }
            Object object = this.getMapper().fromMessage(message);
            this.lastSend = System.currentTimeMillis();
            try {
                this.getSerializer().serialize(object, this.bufferedOutputStream);
            }
            catch (Exception e) {
                this.publishConnectionExceptionEvent(e);
                this.closeConnection(true);
                throw e;
            }
            this.afterSend(message);
        }
    }

    @Override
    public Object getPayload() throws Exception {
        return this.getDeserializer().deserialize((InputStream)this.channelInputStream);
    }

    @Override
    public int getPort() {
        return this.socketChannel.socket().getPort();
    }

    @Override
    public Object getDeserializerStateKey() {
        return this.channelInputStream;
    }

    protected ByteBuffer allocate(int length) {
        ByteBuffer buffer = this.usingDirectBuffers ? ByteBuffer.allocateDirect(length) : ByteBuffer.allocate(length);
        return buffer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace((Object)(this.getConnectionId() + " Nio message assembler running..."));
        }
        boolean moreDataAvailable = true;
        while (moreDataAvailable) {
            try {
                if (this.getListener() == null && !this.isSingleUse()) {
                    this.logger.debug((Object)"TcpListener exiting - no listener and not single use");
                    return;
                }
                try {
                    if (this.dataAvailable()) {
                        Message<?> message;
                        block50: {
                            message = this.convert();
                            if (this.dataAvailable()) {
                                this.executionControl.incrementAndGet();
                                try {
                                    this.taskExecutor.execute2((Runnable)this);
                                }
                                catch (RejectedExecutionException e2) {
                                    this.executionControl.decrementAndGet();
                                    if (!this.logger.isInfoEnabled()) break block50;
                                    this.logger.info((Object)(this.getConnectionId() + " Insufficient threads in the assembler fixed thread pool; consider " + "increasing this task executor pool size; data avail: " + this.channelInputStream.available()));
                                }
                            }
                        }
                        this.executionControl.decrementAndGet();
                        if (message == null) continue;
                        this.sendToChannel(message);
                        continue;
                    }
                    this.executionControl.decrementAndGet();
                }
                catch (Exception e) {
                    if (this.logger.isTraceEnabled()) {
                        this.logger.error((Object)("Read exception " + this.getConnectionId()), (Throwable)e);
                    } else if (!this.isNoReadErrorOnClose()) {
                        this.logger.error((Object)("Read exception " + this.getConnectionId() + " " + e.getClass().getSimpleName() + ":" + e.getCause() + ":" + e.getMessage()));
                    } else if (this.logger.isDebugEnabled()) {
                        this.logger.debug((Object)("Read exception " + this.getConnectionId() + " " + e.getClass().getSimpleName() + ":" + e.getCause() + ":" + e.getMessage()));
                    }
                    this.closeConnection(true);
                    this.sendExceptionToListener(e);
                    moreDataAvailable = false;
                    try {
                        if (this.dataAvailable()) {
                            AtomicInteger e2 = this.executionControl;
                            synchronized (e2) {
                                if (this.executionControl.incrementAndGet() <= 1) {
                                    this.executionControl.set(1);
                                    moreDataAvailable = true;
                                } else {
                                    this.executionControl.decrementAndGet();
                                }
                            }
                        }
                        if (moreDataAvailable) {
                            if (this.logger.isTraceEnabled()) {
                                this.logger.trace((Object)(this.getConnectionId() + " Nio message assembler continuing..."));
                            }
                        } else if (this.logger.isTraceEnabled()) {
                            this.logger.trace((Object)(this.getConnectionId() + " Nio message assembler exiting... avail: " + this.channelInputStream.available()));
                        }
                    }
                    catch (IOException e3) {
                        this.logger.error((Object)"Exception when checking for assembler", (Throwable)e3);
                    }
                    return;
                }
            }
            finally {
                moreDataAvailable = false;
                try {
                    if (this.dataAvailable()) {
                        AtomicInteger atomicInteger = this.executionControl;
                        synchronized (atomicInteger) {
                            if (this.executionControl.incrementAndGet() <= 1) {
                                this.executionControl.set(1);
                                moreDataAvailable = true;
                            } else {
                                this.executionControl.decrementAndGet();
                            }
                        }
                    }
                    if (moreDataAvailable) {
                        if (this.logger.isTraceEnabled()) {
                            this.logger.trace((Object)(this.getConnectionId() + " Nio message assembler continuing..."));
                        }
                        continue;
                    }
                    if (!this.logger.isTraceEnabled()) continue;
                    this.logger.trace((Object)(this.getConnectionId() + " Nio message assembler exiting... avail: " + this.channelInputStream.available()));
                }
                catch (IOException e) {
                    this.logger.error((Object)"Exception when checking for assembler", (Throwable)e);
                }
            }
        }
    }

    private boolean dataAvailable() throws IOException {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace((Object)(this.getConnectionId() + " checking data avail: " + this.channelInputStream.available() + " pending: " + this.writingToPipe));
        }
        return this.writingToPipe || this.channelInputStream.available() > 0;
    }

    private synchronized Message<?> convert() throws Exception {
        block12: {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace((Object)(this.getConnectionId() + " checking data avail (convert): " + this.channelInputStream.available() + " pending: " + this.writingToPipe));
            }
            if (this.channelInputStream.available() <= 0) {
                try {
                    if (this.writingLatch.await(60L, TimeUnit.SECONDS)) {
                        if (this.channelInputStream.available() <= 0) {
                            return null;
                        }
                        break block12;
                    }
                    throw new IOException("Timed out waiting for IO");
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Interrupted waiting for IO");
                }
            }
        }
        Message<?> message = null;
        try {
            message = this.getMapper().toMessage(this);
        }
        catch (Exception e) {
            this.closeConnection(true);
            if (e instanceof SocketTimeoutException && this.isSingleUse()) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("Closing single use socket after timeout " + this.getConnectionId()));
                }
            } else if (!(e instanceof SoftEndOfStreamException)) {
                throw e;
            }
            return null;
        }
        return message;
    }

    private void sendToChannel(Message<?> message) {
        boolean intercepted = false;
        try {
            if (message != null) {
                intercepted = this.getListener().onMessage(message);
            }
        }
        catch (Exception e) {
            if (e instanceof NoListenerException) {
                if (this.isSingleUse()) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug((Object)("Closing single use channel after inbound message " + this.getConnectionId()));
                    }
                    this.closeConnection(true);
                }
            }
            this.logger.error((Object)("Exception sending message: " + message), (Throwable)e);
        }
        if (this.isSingleUse() && (!this.isServer() && !intercepted || this.isServer() && this.getSender() == null)) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Closing single use channel after inbound message " + this.getConnectionId()));
            }
            this.closeConnection(false);
        }
    }

    private void doRead() throws Exception {
        if (this.rawBuffer == null) {
            this.rawBuffer = this.allocate(this.maxMessageSize);
        }
        this.writingLatch = new CountDownLatch(1);
        this.writingToPipe = true;
        try {
            int len;
            if (this.taskExecutor == null) {
                ExecutorService executor = Executors.newCachedThreadPool();
                this.taskExecutor = new CompositeExecutor((Executor)executor, (Executor)executor);
            }
            this.checkForAssembler();
            if (this.logger.isTraceEnabled()) {
                this.logger.trace((Object)("Before read:" + this.rawBuffer.position() + "/" + this.rawBuffer.limit()));
            }
            if ((len = this.socketChannel.read(this.rawBuffer)) < 0) {
                this.writingToPipe = false;
                this.closeConnection(true);
            }
            if (this.logger.isTraceEnabled()) {
                this.logger.trace((Object)("After read:" + this.rawBuffer.position() + "/" + this.rawBuffer.limit()));
            }
            this.rawBuffer.flip();
            if (this.logger.isTraceEnabled()) {
                this.logger.trace((Object)("After flip:" + this.rawBuffer.position() + "/" + this.rawBuffer.limit()));
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Read " + this.rawBuffer.limit() + " into raw buffer"));
            }
            this.sendToPipe(this.rawBuffer);
        }
        catch (RejectedExecutionException e) {
            throw e;
        }
        catch (Exception e) {
            this.publishConnectionExceptionEvent(e);
            throw e;
        }
        finally {
            this.writingToPipe = false;
            this.writingLatch.countDown();
        }
    }

    protected void sendToPipe(ByteBuffer rawBuffer) throws IOException {
        Assert.notNull((Object)rawBuffer, (String)"rawBuffer cannot be null");
        if (this.logger.isTraceEnabled()) {
            this.logger.trace((Object)(this.getConnectionId() + " Sending " + rawBuffer.limit() + " to pipe"));
        }
        this.channelInputStream.write(rawBuffer.array(), rawBuffer.limit());
        rawBuffer.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkForAssembler() {
        AtomicInteger atomicInteger = this.executionControl;
        synchronized (atomicInteger) {
            if (this.executionControl.incrementAndGet() <= 1) {
                this.executionControl.set(1);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)(this.getConnectionId() + " Running an assembler"));
                }
                try {
                    this.taskExecutor.execute2((Runnable)this);
                }
                catch (RejectedExecutionException e) {
                    this.executionControl.decrementAndGet();
                    if (this.logger.isInfoEnabled()) {
                        this.logger.info((Object)"Insufficient threads in the assembler fixed thread pool; consider increasing this task executor pool size");
                    }
                    throw e;
                }
            } else {
                this.executionControl.decrementAndGet();
            }
        }
    }

    public void readPacket() {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)(this.getConnectionId() + " Reading..."));
        }
        try {
            this.doRead();
        }
        catch (ClosedChannelException cce) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)(this.getConnectionId() + " Channel is closed"));
            }
            this.closeConnection(true);
        }
        catch (RejectedExecutionException e) {
            throw e;
        }
        catch (Exception e) {
            this.logger.error((Object)("Exception on Read " + this.getConnectionId() + " " + e.getMessage()), (Throwable)e);
            this.closeConnection(true);
        }
    }

    void timeout() {
        this.closeConnection(true);
    }

    public void setTaskExecutor(Executor taskExecutor) {
        this.taskExecutor = taskExecutor instanceof CompositeExecutor ? (CompositeExecutor)taskExecutor : new CompositeExecutor(taskExecutor, taskExecutor);
    }

    public void setUsingDirectBuffers(boolean usingDirectBuffers) {
        this.usingDirectBuffers = usingDirectBuffers;
    }

    protected boolean isUsingDirectBuffers() {
        return this.usingDirectBuffers;
    }

    protected ChannelOutputStream getChannelOutputStream() {
        return this.channelOutputStream;
    }

    public long getLastRead() {
        return this.lastRead;
    }

    public void setLastRead(long lastRead) {
        this.lastRead = lastRead;
    }

    public long getLastSend() {
        return this.lastSend;
    }

    class ChannelInputStream
    extends InputStream {
        private static final int BUFFER_LIMIT = 5;
        private final BlockingQueue<byte[]> buffers = new LinkedBlockingQueue<byte[]>(5);
        private volatile byte[] currentBuffer;
        private volatile int currentOffset;
        private final AtomicInteger available = new AtomicInteger();
        private volatile boolean isClosed;

        ChannelInputStream() {
        }

        @Override
        public int read(byte[] b, int off, int len) throws IOException {
            Assert.notNull((Object)b, (String)"byte[] cannot be null");
            if (off < 0 || len < 0 || len > b.length - off) {
                throw new IndexOutOfBoundsException();
            }
            if (len == 0) {
                return 0;
            }
            int n = 0;
            while ((this.available.get() > 0 || n == 0) && n < len) {
                int bite = this.read();
                if (bite < 0) {
                    if (n == 0) {
                        return -1;
                    }
                    return n;
                }
                b[off + n++] = (byte)bite;
            }
            return n;
        }

        @Override
        public synchronized int read() throws IOException {
            if (this.isClosed && this.available.get() == 0) {
                return -1;
            }
            if (this.currentBuffer == null) {
                this.currentBuffer = this.getNextBuffer();
                this.currentOffset = 0;
                if (this.currentBuffer == null) {
                    return -1;
                }
            }
            int bite = this.currentBuffer[this.currentOffset++] & 0xFF;
            this.available.decrementAndGet();
            if (this.currentOffset >= this.currentBuffer.length) {
                this.currentBuffer = null;
            }
            return bite;
        }

        private byte[] getNextBuffer() throws IOException {
            byte[] buffer = null;
            while (buffer == null) {
                try {
                    buffer = this.buffers.poll(1L, TimeUnit.SECONDS);
                    if (buffer != null || !this.isClosed) continue;
                    return null;
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Interrupted while waiting for data", e);
                }
            }
            return buffer;
        }

        public void write(byte[] array, int bytesToWrite) throws IOException {
            if (bytesToWrite > 0) {
                byte[] buffer = new byte[bytesToWrite];
                System.arraycopy(array, 0, buffer, 0, bytesToWrite);
                this.available.addAndGet(bytesToWrite);
                if (TcpNioConnection.this.writingLatch != null) {
                    TcpNioConnection.this.writingLatch.countDown();
                }
                try {
                    if (!this.buffers.offer(buffer, TcpNioConnection.this.pipeTimeout, TimeUnit.MILLISECONDS)) {
                        throw new IOException("Timed out waiting for buffer space");
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Interrupted while waiting for buffer space", e);
                }
                TcpNioConnection.this.writingLatch = new CountDownLatch(1);
            }
        }

        @Override
        public void close() throws IOException {
            super.close();
            this.isClosed = true;
        }

        @Override
        public int available() throws IOException {
            return this.available.get();
        }
    }

    class ChannelOutputStream
    extends OutputStream {
        private Selector selector;
        private int soTimeout;

        ChannelOutputStream() {
        }

        @Override
        public void write(int b) throws IOException {
            byte[] bytes = new byte[]{(byte)b};
            ByteBuffer buffer = ByteBuffer.wrap(bytes);
            this.doWrite(buffer);
        }

        @Override
        public void close() throws IOException {
            TcpNioConnection.this.doClose();
        }

        @Override
        public void flush() throws IOException {
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
            this.doWrite(buffer);
        }

        @Override
        public void write(byte[] b) throws IOException {
            ByteBuffer buffer = ByteBuffer.wrap(b);
            this.doWrite(buffer);
        }

        protected synchronized void doWrite(ByteBuffer buffer) throws IOException {
            if (TcpNioConnection.this.logger.isDebugEnabled()) {
                TcpNioConnection.this.logger.debug((Object)(TcpNioConnection.this.getConnectionId() + " writing " + buffer.remaining()));
            }
            TcpNioConnection.this.socketChannel.write(buffer);
            int remaining = buffer.remaining();
            if (remaining == 0) {
                return;
            }
            if (this.selector == null) {
                this.selector = Selector.open();
                this.soTimeout = TcpNioConnection.this.socketChannel.socket().getSoTimeout();
            }
            TcpNioConnection.this.socketChannel.register(this.selector, 4);
            while (remaining > 0) {
                int selectionCount = this.selector.select(this.soTimeout);
                if (selectionCount == 0) {
                    throw new SocketTimeoutException("Timeout on write");
                }
                this.selector.selectedKeys().clear();
                TcpNioConnection.this.socketChannel.write(buffer);
                remaining = buffer.remaining();
            }
        }
    }
}

