chris@1: /* chris@1: * SONEWS News Server chris@1: * see AUTHORS for the list of contributors chris@1: * chris@1: * This program is free software: you can redistribute it and/or modify chris@1: * it under the terms of the GNU General Public License as published by chris@1: * the Free Software Foundation, either version 3 of the License, or chris@1: * (at your option) any later version. chris@1: * chris@1: * This program is distributed in the hope that it will be useful, chris@1: * but WITHOUT ANY WARRANTY; without even the implied warranty of chris@1: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the chris@1: * GNU General Public License for more details. chris@1: * chris@1: * You should have received a copy of the GNU General Public License chris@1: * along with this program. If not, see . chris@1: */ chris@1: chris@1: package org.sonews.daemon; chris@1: chris@1: import org.sonews.util.Log; chris@1: import java.io.IOException; chris@1: import java.nio.ByteBuffer; chris@1: import java.nio.channels.CancelledKeyException; chris@1: import java.nio.channels.SelectionKey; chris@1: import java.nio.channels.Selector; chris@1: import java.nio.channels.SocketChannel; chris@1: import java.util.Iterator; chris@1: chris@1: /** chris@1: * A Thread task that processes OP_WRITE events for SocketChannels. chris@1: * @author Christian Lins chris@1: * @since sonews/0.5.0 chris@1: */ chris@1: class ChannelWriter extends AbstractDaemon chris@1: { chris@1: cli@37: private static ChannelWriter instance = new ChannelWriter(); chris@1: cli@37: /** cli@37: * @return Returns the active ChannelWriter instance. cli@37: */ cli@37: public static ChannelWriter getInstance() cli@37: { cli@37: return instance; cli@37: } cli@37: private Selector selector = null; chris@1: cli@37: protected ChannelWriter() cli@37: { cli@37: } chris@1: cli@37: /** cli@37: * @return Selector associated with this instance. cli@37: */ cli@37: public Selector getSelector() cli@37: { cli@37: return this.selector; cli@37: } chris@1: cli@37: /** cli@37: * Sets the selector that is used by this ChannelWriter. cli@37: * @param selector cli@37: */ cli@37: public void setSelector(final Selector selector) cli@37: { cli@37: this.selector = selector; cli@37: } chris@1: cli@37: /** cli@37: * Run loop. cli@37: */ cli@37: @Override cli@37: public void run() cli@37: { cli@37: assert selector != null; chris@1: cli@37: while (isRunning()) { cli@37: try { cli@37: SelectionKey selKey = null; cli@37: SocketChannel socketChannel = null; cli@37: NNTPConnection connection = null; chris@1: cli@37: // select() blocks until some SelectableChannels are ready for cli@37: // processing. There is no need to synchronize the selector as we cli@37: // have only one thread per selector. cli@37: selector.select(); // The return value of select can be ignored chris@1: cli@37: // Get list of selection keys with pending OP_WRITE events. cli@37: // The keySET is not thread-safe whereas the keys itself are. cli@37: Iterator it = selector.selectedKeys().iterator(); chris@1: cli@37: while (it.hasNext()) { cli@37: // We remove the first event from the set and store it for cli@37: // later processing. cli@37: selKey = (SelectionKey) it.next(); cli@37: socketChannel = (SocketChannel) selKey.channel(); cli@37: connection = Connections.getInstance().get(socketChannel); cli@37: cli@37: it.remove(); cli@37: if (connection != null) { cli@37: break; cli@37: } else { cli@37: selKey = null; cli@37: } cli@37: } cli@37: cli@37: if (selKey != null) { cli@37: try { cli@37: // Process the selected key. cli@37: // As there is only one OP_WRITE key for a given channel, we need cli@37: // not to synchronize this processing to retain the order. cli@37: processSelectionKey(connection, socketChannel, selKey); cli@37: } catch (IOException ex) { cli@37: Log.get().warning("Error writing to channel: " + ex); cli@37: cli@37: // Cancel write events for this channel cli@37: selKey.cancel(); cli@37: connection.shutdownInput(); cli@37: connection.shutdownOutput(); cli@37: } cli@37: } cli@37: cli@37: // Eventually wait for a register operation cli@37: synchronized (NNTPDaemon.RegisterGate) { /* do nothing */ } cli@37: } catch (CancelledKeyException ex) { cli@37: Log.get().info("ChannelWriter.run(): " + ex); cli@37: } catch (Exception ex) { cli@37: ex.printStackTrace(); cli@37: } cli@37: } // while(isRunning()) cli@37: } cli@37: cli@37: private void processSelectionKey(final NNTPConnection connection, cli@37: final SocketChannel socketChannel, final SelectionKey selKey) cli@37: throws InterruptedException, IOException cli@37: { cli@37: assert connection != null; cli@37: assert socketChannel != null; cli@37: assert selKey != null; cli@37: assert selKey.isWritable(); cli@37: cli@37: // SocketChannel is ready for writing cli@37: if (selKey.isValid()) { cli@37: // Lock the socket channel cli@37: synchronized (socketChannel) { cli@37: // Get next output buffer cli@37: ByteBuffer buf = connection.getOutputBuffer(); cli@37: if (buf == null) { cli@37: // Currently we have nothing to write, so we stop the writeable cli@37: // events until we have something to write to the socket channel cli@37: //selKey.cancel(); cli@37: selKey.interestOps(0); cli@37: // Update activity timestamp to prevent too early disconnects cli@37: // on slow client connections cli@37: connection.setLastActivity(System.currentTimeMillis()); cli@37: return; cli@37: } cli@37: cli@37: while (buf != null) // There is data to be send cli@37: { cli@37: // Write buffer to socket channel; this method does not block cli@37: if (socketChannel.write(buf) <= 0) { cli@37: // Perhaps there is data to be written, but the SocketChannel's cli@37: // buffer is full, so we stop writing to until the next event. cli@37: break; cli@37: } else { cli@37: // Retrieve next buffer if available; method may return the same cli@37: // buffer instance if it still have some bytes remaining cli@37: buf = connection.getOutputBuffer(); cli@37: } cli@37: } cli@37: } cli@37: } else { cli@37: Log.get().warning("Invalid OP_WRITE key: " + selKey); cli@37: cli@37: if (socketChannel.socket().isClosed()) { cli@37: connection.shutdownInput(); cli@37: connection.shutdownOutput(); cli@37: socketChannel.close(); cli@37: Log.get().info("Connection closed."); cli@37: } cli@37: } cli@37: } chris@1: }