chris@1: /*
chris@1:  *   SONEWS News Server
chris@1:  *   see AUTHORS for the list of contributors
chris@1:  *
chris@1:  *   This program is free software: you can redistribute it and/or modify
chris@1:  *   it under the terms of the GNU General Public License as published by
chris@1:  *   the Free Software Foundation, either version 3 of the License, or
chris@1:  *   (at your option) any later version.
chris@1:  *
chris@1:  *   This program is distributed in the hope that it will be useful,
chris@1:  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1:  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1:  *   GNU General Public License for more details.
chris@1:  *
chris@1:  *   You should have received a copy of the GNU General Public License
chris@1:  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1:  */
chris@1: 
chris@1: package org.sonews.daemon;
chris@1: 
chris@1: import java.io.IOException;
chris@1: import java.nio.ByteBuffer;
chris@1: import java.nio.channels.CancelledKeyException;
chris@1: import java.nio.channels.SelectionKey;
chris@1: import java.nio.channels.Selector;
chris@1: import java.nio.channels.SocketChannel;
chris@1: import java.util.Iterator;
chris@1: import java.util.Set;
cli@15: import java.util.logging.Level;
chris@3: import org.sonews.util.Log;
chris@1: 
chris@1: /**
chris@1:  * A Thread task listening for OP_READ events from SocketChannels.
chris@1:  * @author Christian Lins
chris@1:  * @since sonews/0.5.0
chris@1:  */
chris@1: class ChannelReader extends AbstractDaemon
chris@1: {
chris@1: 
cli@37: 	private static ChannelReader instance = new ChannelReader();
chris@1: 
cli@37: 	/**
cli@37: 	 * @return Active ChannelReader instance.
cli@37: 	 */
cli@37: 	public static ChannelReader getInstance()
cli@37: 	{
cli@37: 		return instance;
cli@37: 	}
cli@37: 	private Selector selector = null;
chris@1: 
cli@37: 	protected ChannelReader()
cli@37: 	{
cli@37: 	}
chris@1: 
cli@37: 	/**
cli@37: 	 * Sets the selector which is used by this reader to determine the channel
cli@37: 	 * to read from.
cli@37: 	 * @param selector
cli@37: 	 */
cli@37: 	public void setSelector(final Selector selector)
cli@37: 	{
cli@37: 		this.selector = selector;
cli@37: 	}
chris@1: 
cli@37: 	/**
cli@37: 	 * Run loop. Blocks until some data is available in a channel.
cli@37: 	 */
cli@37: 	@Override
cli@37: 	public void run()
cli@37: 	{
cli@37: 		assert selector != null;
chris@1: 
cli@37: 		while (isRunning()) {
cli@37: 			try {
cli@37: 				// select() blocks until some SelectableChannels are ready for
cli@37: 				// processing. There is no need to lock the selector as we have only
cli@37: 				// one thread per selector.
cli@37: 				selector.select();
chris@1: 
cli@37: 				// Get list of selection keys with pending events.
cli@37: 				// Note: the selected key set is not thread-safe
cli@37: 				SocketChannel channel = null;
cli@37: 				NNTPConnection conn = null;
cli@37: 				final Set<SelectionKey> selKeys = selector.selectedKeys();
cli@37: 				SelectionKey selKey = null;
chris@1: 
cli@37: 				synchronized (selKeys) {
cli@37: 					Iterator it = selKeys.iterator();
chris@1: 
cli@37: 					// Process the first pending event
cli@37: 					while (it.hasNext()) {
cli@37: 						selKey = (SelectionKey) it.next();
cli@37: 						channel = (SocketChannel) selKey.channel();
cli@37: 						conn = Connections.getInstance().get(channel);
cli@37: 
cli@37: 						// Because we cannot lock the selKey as that would cause a deadlock
cli@37: 						// we lock the connection. To preserve the order of the received
cli@37: 						// byte blocks a selection key for a connection that has pending
cli@37: 						// read events is skipped.
cli@37: 						if (conn == null || conn.tryReadLock()) {
cli@37: 							// Remove from set to indicate that it's being processed
cli@37: 							it.remove();
cli@37: 							if (conn != null) {
cli@37: 								break; // End while loop
cli@37: 							}
cli@37: 						} else {
cli@37: 							selKey = null;
cli@37: 							channel = null;
cli@37: 							conn = null;
cli@37: 						}
cli@37: 					}
cli@37: 				}
cli@37: 
cli@37: 				// Do not lock the selKeys while processing because this causes
cli@37: 				// a deadlock in sun.nio.ch.SelectorImpl.lockAndDoSelect()
cli@37: 				if (selKey != null && channel != null && conn != null) {
cli@37: 					processSelectionKey(conn, channel, selKey);
cli@37: 					conn.unlockReadLock();
cli@37: 				}
cli@37: 
cli@37: 			} catch (CancelledKeyException ex) {
cli@37: 				Log.get().warning("ChannelReader.run(): " + ex);
cli@37: 				Log.get().log(Level.INFO, "", ex);
cli@37: 			} catch (Exception ex) {
cli@37: 				ex.printStackTrace();
cli@37: 			}
cli@37: 
cli@37: 			// Eventually wait for a register operation
cli@37: 			synchronized (NNTPDaemon.RegisterGate) {
cli@37: 				// Do nothing; FindBugs may warn about an empty synchronized
cli@37: 				// statement, but we cannot use a wait()/notify() mechanism here.
cli@37: 				// If we used something like RegisterGate.wait() we block here
cli@37: 				// until the NNTPDaemon calls notify(). But the daemon only
cli@37: 				// calls notify() if itself is NOT blocked in the listening socket.
cli@37: 			}
cli@37: 		} // while(isRunning())
cli@37: 	}
cli@37: 
cli@37: 	private void processSelectionKey(final NNTPConnection connection,
cli@37: 		final SocketChannel socketChannel, final SelectionKey selKey)
cli@37: 		throws InterruptedException, IOException
cli@37: 	{
cli@37: 		assert selKey != null;
cli@37: 		assert selKey.isReadable();
cli@37: 
cli@37: 		// Some bytes are available for reading
cli@37: 		if (selKey.isValid()) {
cli@37: 			// Lock the channel
cli@37: 			//synchronized(socketChannel)
cli@37: 			{
cli@37: 				// Read the data into the appropriate buffer
cli@37: 				ByteBuffer buf = connection.getInputBuffer();
cli@37: 				int read = -1;
cli@37: 				try {
cli@37: 					read = socketChannel.read(buf);
cli@37: 				} catch (IOException ex) {
cli@37: 					// The connection was probably closed by the remote host
cli@37: 					// in a non-clean fashion
cli@37: 					Log.get().info("ChannelReader.processSelectionKey(): " + ex);
cli@37: 				} catch (Exception ex) {
cli@37: 					Log.get().warning("ChannelReader.processSelectionKey(): " + ex);
cli@37: 				}
cli@37: 
cli@37: 				if (read == -1) // End of stream
cli@37: 				{
cli@37: 					selKey.cancel();
cli@37: 				} else if (read > 0) // If some data was read
cli@37: 				{
cli@37: 					ConnectionWorker.addChannel(socketChannel);
cli@37: 				}
cli@37: 			}
cli@37: 		} else {
cli@37: 			// Should not happen
cli@37: 			Log.get().severe("Should not happen: " + selKey.toString());
cli@37: 		}
cli@37: 	}
chris@1: }