chris@1: /*
chris@1:  *   SONEWS News Server
chris@1:  *   see AUTHORS for the list of contributors
chris@1:  *
chris@1:  *   This program is free software: you can redistribute it and/or modify
chris@1:  *   it under the terms of the GNU General Public License as published by
chris@1:  *   the Free Software Foundation, either version 3 of the License, or
chris@1:  *   (at your option) any later version.
chris@1:  *
chris@1:  *   This program is distributed in the hope that it will be useful,
chris@1:  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1:  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1:  *   GNU General Public License for more details.
chris@1:  *
chris@1:  *   You should have received a copy of the GNU General Public License
chris@1:  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1:  */
chris@1: 
chris@1: package org.sonews.daemon;
chris@1: 
chris@1: import org.sonews.util.Log;
chris@1: import java.io.IOException;
chris@1: import java.nio.ByteBuffer;
chris@1: import java.nio.channels.CancelledKeyException;
chris@1: import java.nio.channels.SelectionKey;
chris@1: import java.nio.channels.Selector;
chris@1: import java.nio.channels.SocketChannel;
chris@1: import java.util.Iterator;
chris@1: 
chris@1: /**
chris@1:  * A Thread task that processes OP_WRITE events for SocketChannels.
chris@1:  * @author Christian Lins
chris@1:  * @since sonews/0.5.0
chris@1:  */
chris@1: class ChannelWriter extends AbstractDaemon
chris@1: {
chris@1: 
cli@37: 	private static ChannelWriter instance = new ChannelWriter();
chris@1: 
cli@37: 	/**
cli@37: 	 * @return Returns the active ChannelWriter instance.
cli@37: 	 */
cli@37: 	public static ChannelWriter getInstance()
cli@37: 	{
cli@37: 		return instance;
cli@37: 	}
cli@37: 	private Selector selector = null;
chris@1: 
cli@37: 	protected ChannelWriter()
cli@37: 	{
cli@37: 	}
chris@1: 
cli@37: 	/**
cli@37: 	 * @return Selector associated with this instance.
cli@37: 	 */
cli@37: 	public Selector getSelector()
cli@37: 	{
cli@37: 		return this.selector;
cli@37: 	}
chris@1: 
cli@37: 	/**
cli@37: 	 * Sets the selector that is used by this ChannelWriter.
cli@37: 	 * @param selector
cli@37: 	 */
cli@37: 	public void setSelector(final Selector selector)
cli@37: 	{
cli@37: 		this.selector = selector;
cli@37: 	}
chris@1: 
cli@37: 	/**
cli@37: 	 * Run loop.
cli@37: 	 */
cli@37: 	@Override
cli@37: 	public void run()
cli@37: 	{
cli@37: 		assert selector != null;
chris@1: 
cli@37: 		while (isRunning()) {
cli@37: 			try {
cli@37: 				SelectionKey selKey = null;
cli@37: 				SocketChannel socketChannel = null;
cli@37: 				NNTPConnection connection = null;
chris@1: 
cli@37: 				// select() blocks until some SelectableChannels are ready for
cli@37: 				// processing. There is no need to synchronize the selector as we
cli@37: 				// have only one thread per selector.
cli@37: 				selector.select(); // The return value of select can be ignored
chris@1: 
cli@37: 				// Get list of selection keys with pending OP_WRITE events.
cli@37: 				// The keySET is not thread-safe whereas the keys itself are.
cli@37: 				Iterator it = selector.selectedKeys().iterator();
chris@1: 
cli@37: 				while (it.hasNext()) {
cli@37: 					// We remove the first event from the set and store it for
cli@37: 					// later processing.
cli@37: 					selKey = (SelectionKey) it.next();
cli@37: 					socketChannel = (SocketChannel) selKey.channel();
cli@37: 					connection = Connections.getInstance().get(socketChannel);
cli@37: 
cli@37: 					it.remove();
cli@37: 					if (connection != null) {
cli@37: 						break;
cli@37: 					} else {
cli@37: 						selKey = null;
cli@37: 					}
cli@37: 				}
cli@37: 
cli@37: 				if (selKey != null) {
cli@37: 					try {
cli@37: 						// Process the selected key.
cli@37: 						// As there is only one OP_WRITE key for a given channel, we need
cli@37: 						// not to synchronize this processing to retain the order.
cli@37: 						processSelectionKey(connection, socketChannel, selKey);
cli@37: 					} catch (IOException ex) {
cli@37: 						Log.get().warning("Error writing to channel: " + ex);
cli@37: 
cli@37: 						// Cancel write events for this channel
cli@37: 						selKey.cancel();
cli@37: 						connection.shutdownInput();
cli@37: 						connection.shutdownOutput();
cli@37: 					}
cli@37: 				}
cli@37: 
cli@37: 				// Eventually wait for a register operation
cli@37: 				synchronized (NNTPDaemon.RegisterGate) { /* do nothing */ }
cli@37: 			} catch (CancelledKeyException ex) {
cli@37: 				Log.get().info("ChannelWriter.run(): " + ex);
cli@37: 			} catch (Exception ex) {
cli@37: 				ex.printStackTrace();
cli@37: 			}
cli@37: 		} // while(isRunning())
cli@37: 	}
cli@37: 
cli@37: 	private void processSelectionKey(final NNTPConnection connection,
cli@37: 		final SocketChannel socketChannel, final SelectionKey selKey)
cli@37: 		throws InterruptedException, IOException
cli@37: 	{
cli@37: 		assert connection != null;
cli@37: 		assert socketChannel != null;
cli@37: 		assert selKey != null;
cli@37: 		assert selKey.isWritable();
cli@37: 
cli@37: 		// SocketChannel is ready for writing
cli@37: 		if (selKey.isValid()) {
cli@37: 			// Lock the socket channel
cli@37: 			synchronized (socketChannel) {
cli@37: 				// Get next output buffer
cli@37: 				ByteBuffer buf = connection.getOutputBuffer();
cli@37: 				if (buf == null) {
cli@37: 					// Currently we have nothing to write, so we stop the writeable
cli@37: 					// events until we have something to write to the socket channel
cli@37: 					//selKey.cancel();
cli@37: 					selKey.interestOps(0);
cli@37: 					// Update activity timestamp to prevent too early disconnects
cli@37: 					// on slow client connections
cli@37: 					connection.setLastActivity(System.currentTimeMillis());
cli@37: 					return;
cli@37: 				}
cli@37: 
cli@37: 				while (buf != null) // There is data to be send
cli@37: 				{
cli@37: 					// Write buffer to socket channel; this method does not block
cli@37: 					if (socketChannel.write(buf) <= 0) {
cli@37: 						// Perhaps there is data to be written, but the SocketChannel's
cli@37: 						// buffer is full, so we stop writing to until the next event.
cli@37: 						break;
cli@37: 					} else {
cli@37: 						// Retrieve next buffer if available; method may return the same
cli@37: 						// buffer instance if it still have some bytes remaining
cli@37: 						buf = connection.getOutputBuffer();
cli@37: 					}
cli@37: 				}
cli@37: 			}
cli@37: 		} else {
cli@37: 			Log.get().warning("Invalid OP_WRITE key: " + selKey);
cli@37: 
cli@37: 			if (socketChannel.socket().isClosed()) {
cli@37: 				connection.shutdownInput();
cli@37: 				connection.shutdownOutput();
cli@37: 				socketChannel.close();
cli@37: 				Log.get().info("Connection closed.");
cli@37: 			}
cli@37: 		}
cli@37: 	}
chris@1: }