chris@1: /* chris@1: * SONEWS News Server chris@1: * see AUTHORS for the list of contributors chris@1: * chris@1: * This program is free software: you can redistribute it and/or modify chris@1: * it under the terms of the GNU General Public License as published by chris@1: * the Free Software Foundation, either version 3 of the License, or chris@1: * (at your option) any later version. chris@1: * chris@1: * This program is distributed in the hope that it will be useful, chris@1: * but WITHOUT ANY WARRANTY; without even the implied warranty of chris@1: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the chris@1: * GNU General Public License for more details. chris@1: * chris@1: * You should have received a copy of the GNU General Public License chris@1: * along with this program. If not, see . chris@1: */ chris@1: chris@1: package org.sonews.daemon; chris@1: chris@1: import org.sonews.util.Log; chris@1: import java.io.IOException; chris@1: import java.nio.ByteBuffer; chris@1: import java.nio.channels.CancelledKeyException; chris@1: import java.nio.channels.SelectionKey; chris@1: import java.nio.channels.Selector; chris@1: import java.nio.channels.SocketChannel; chris@1: import java.util.Iterator; chris@1: chris@1: /** chris@1: * A Thread task that processes OP_WRITE events for SocketChannels. chris@1: * @author Christian Lins chris@1: * @since sonews/0.5.0 chris@1: */ chris@1: class ChannelWriter extends AbstractDaemon chris@1: { chris@1: chris@1: private static ChannelWriter instance = new ChannelWriter(); chris@1: chris@1: /** chris@1: * @return Returns the active ChannelWriter instance. chris@1: */ chris@1: public static ChannelWriter getInstance() chris@1: { chris@1: return instance; chris@1: } chris@1: chris@1: private Selector selector = null; chris@1: chris@1: protected ChannelWriter() chris@1: { chris@1: } chris@1: chris@1: /** chris@1: * @return Selector associated with this instance. chris@1: */ chris@1: public Selector getSelector() chris@1: { chris@1: return this.selector; chris@1: } chris@1: chris@1: /** chris@1: * Sets the selector that is used by this ChannelWriter. chris@1: * @param selector chris@1: */ chris@1: public void setSelector(final Selector selector) chris@1: { chris@1: this.selector = selector; chris@1: } chris@1: chris@1: /** chris@1: * Run loop. chris@1: */ chris@1: @Override chris@1: public void run() chris@1: { chris@1: assert selector != null; chris@1: chris@1: while(isRunning()) chris@1: { chris@1: try chris@1: { chris@1: SelectionKey selKey = null; chris@1: SocketChannel socketChannel = null; chris@1: NNTPConnection connection = null; chris@1: chris@1: // select() blocks until some SelectableChannels are ready for chris@1: // processing. There is no need to synchronize the selector as we chris@1: // have only one thread per selector. chris@1: selector.select(); // The return value of select can be ignored chris@1: chris@1: // Get list of selection keys with pending OP_WRITE events. chris@1: // The keySET is not thread-safe whereas the keys itself are. chris@1: Iterator it = selector.selectedKeys().iterator(); chris@1: chris@1: while (it.hasNext()) chris@1: { chris@1: // We remove the first event from the set and store it for chris@1: // later processing. chris@1: selKey = (SelectionKey) it.next(); chris@1: socketChannel = (SocketChannel) selKey.channel(); chris@1: connection = Connections.getInstance().get(socketChannel); chris@1: chris@1: it.remove(); chris@1: if (connection != null) chris@1: { chris@1: break; chris@1: } chris@1: else chris@1: { chris@1: selKey = null; chris@1: } chris@1: } chris@1: chris@1: if (selKey != null) chris@1: { chris@1: try chris@1: { chris@1: // Process the selected key. chris@1: // As there is only one OP_WRITE key for a given channel, we need chris@1: // not to synchronize this processing to retain the order. chris@1: processSelectionKey(connection, socketChannel, selKey); chris@1: } chris@1: catch (IOException ex) chris@1: { cli@15: Log.get().warning("Error writing to channel: " + ex); chris@1: chris@1: // Cancel write events for this channel chris@1: selKey.cancel(); chris@1: connection.shutdownInput(); chris@1: connection.shutdownOutput(); chris@1: } chris@1: } chris@1: chris@1: // Eventually wait for a register operation chris@1: synchronized(NNTPDaemon.RegisterGate) { /* do nothing */ } chris@1: } chris@1: catch(CancelledKeyException ex) chris@1: { cli@15: Log.get().info("ChannelWriter.run(): " + ex); chris@1: } chris@1: catch(Exception ex) chris@1: { chris@1: ex.printStackTrace(); chris@1: } chris@1: } // while(isRunning()) chris@1: } chris@1: chris@1: private void processSelectionKey(final NNTPConnection connection, chris@1: final SocketChannel socketChannel, final SelectionKey selKey) chris@1: throws InterruptedException, IOException chris@1: { chris@1: assert connection != null; chris@1: assert socketChannel != null; chris@1: assert selKey != null; chris@1: assert selKey.isWritable(); chris@1: chris@1: // SocketChannel is ready for writing chris@1: if(selKey.isValid()) chris@1: { chris@1: // Lock the socket channel chris@1: synchronized(socketChannel) chris@1: { chris@1: // Get next output buffer chris@1: ByteBuffer buf = connection.getOutputBuffer(); chris@1: if(buf == null) chris@1: { chris@1: // Currently we have nothing to write, so we stop the writeable chris@1: // events until we have something to write to the socket channel chris@1: //selKey.cancel(); chris@1: selKey.interestOps(0); cli@25: // Update activity timestamp to prevent too early disconnects cli@25: // on slow client connections cli@25: connection.setLastActivity(System.currentTimeMillis()); chris@1: return; chris@1: } chris@1: chris@1: while(buf != null) // There is data to be send chris@1: { chris@1: // Write buffer to socket channel; this method does not block chris@1: if(socketChannel.write(buf) <= 0) chris@1: { chris@1: // Perhaps there is data to be written, but the SocketChannel's chris@1: // buffer is full, so we stop writing to until the next event. chris@1: break; chris@1: } chris@1: else chris@1: { chris@1: // Retrieve next buffer if available; method may return the same chris@1: // buffer instance if it still have some bytes remaining chris@1: buf = connection.getOutputBuffer(); chris@1: } chris@1: } chris@1: } chris@1: } chris@1: else chris@1: { cli@15: Log.get().warning("Invalid OP_WRITE key: " + selKey); chris@1: cli@15: if(socketChannel.socket().isClosed()) chris@1: { chris@1: connection.shutdownInput(); chris@1: connection.shutdownOutput(); chris@1: socketChannel.close(); cli@15: Log.get().info("Connection closed."); chris@1: } chris@1: } chris@1: } chris@1: chris@1: }