diff -r f2293e8566f5 -r c404a87db5b7 src/org/sonews/daemon/ChannelReader.java
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/org/sonews/daemon/ChannelReader.java Sun Aug 29 17:43:58 2010 +0200
@@ -0,0 +1,202 @@
+/*
+ * SONEWS News Server
+ * see AUTHORS for the list of contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package org.sonews.daemon;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.logging.Level;
+import org.sonews.util.Log;
+
+/**
+ * A Thread task listening for OP_READ events from SocketChannels.
+ * @author Christian Lins
+ * @since sonews/0.5.0
+ */
+class ChannelReader extends AbstractDaemon
+{
+
+ private static ChannelReader instance = new ChannelReader();
+
+ /**
+ * @return Active ChannelReader instance.
+ */
+ public static ChannelReader getInstance()
+ {
+ return instance;
+ }
+
+ private Selector selector = null;
+
+ protected ChannelReader()
+ {
+ }
+
+ /**
+ * Sets the selector which is used by this reader to determine the channel
+ * to read from.
+ * @param selector
+ */
+ public void setSelector(final Selector selector)
+ {
+ this.selector = selector;
+ }
+
+ /**
+ * Run loop. Blocks until some data is available in a channel.
+ */
+ @Override
+ public void run()
+ {
+ assert selector != null;
+
+ while(isRunning())
+ {
+ try
+ {
+ // select() blocks until some SelectableChannels are ready for
+ // processing. There is no need to lock the selector as we have only
+ // one thread per selector.
+ selector.select();
+
+ // Get list of selection keys with pending events.
+ // Note: the selected key set is not thread-safe
+ SocketChannel channel = null;
+ NNTPConnection conn = null;
+ final Set selKeys = selector.selectedKeys();
+ SelectionKey selKey = null;
+
+ synchronized (selKeys)
+ {
+ Iterator it = selKeys.iterator();
+
+ // Process the first pending event
+ while (it.hasNext())
+ {
+ selKey = (SelectionKey) it.next();
+ channel = (SocketChannel) selKey.channel();
+ conn = Connections.getInstance().get(channel);
+
+ // Because we cannot lock the selKey as that would cause a deadlock
+ // we lock the connection. To preserve the order of the received
+ // byte blocks a selection key for a connection that has pending
+ // read events is skipped.
+ if (conn == null || conn.tryReadLock())
+ {
+ // Remove from set to indicate that it's being processed
+ it.remove();
+ if (conn != null)
+ {
+ break; // End while loop
+ }
+ }
+ else
+ {
+ selKey = null;
+ channel = null;
+ conn = null;
+ }
+ }
+ }
+
+ // Do not lock the selKeys while processing because this causes
+ // a deadlock in sun.nio.ch.SelectorImpl.lockAndDoSelect()
+ if (selKey != null && channel != null && conn != null)
+ {
+ processSelectionKey(conn, channel, selKey);
+ conn.unlockReadLock();
+ }
+
+ }
+ catch(CancelledKeyException ex)
+ {
+ Log.get().warning("ChannelReader.run(): " + ex);
+ Log.get().log(Level.INFO, "", ex);
+ }
+ catch(Exception ex)
+ {
+ ex.printStackTrace();
+ }
+
+ // Eventually wait for a register operation
+ synchronized (NNTPDaemon.RegisterGate)
+ {
+ // Do nothing; FindBugs may warn about an empty synchronized
+ // statement, but we cannot use a wait()/notify() mechanism here.
+ // If we used something like RegisterGate.wait() we block here
+ // until the NNTPDaemon calls notify(). But the daemon only
+ // calls notify() if itself is NOT blocked in the listening socket.
+ }
+ } // while(isRunning())
+ }
+
+ private void processSelectionKey(final NNTPConnection connection,
+ final SocketChannel socketChannel, final SelectionKey selKey)
+ throws InterruptedException, IOException
+ {
+ assert selKey != null;
+ assert selKey.isReadable();
+
+ // Some bytes are available for reading
+ if(selKey.isValid())
+ {
+ // Lock the channel
+ //synchronized(socketChannel)
+ {
+ // Read the data into the appropriate buffer
+ ByteBuffer buf = connection.getInputBuffer();
+ int read = -1;
+ try
+ {
+ read = socketChannel.read(buf);
+ }
+ catch(IOException ex)
+ {
+ // The connection was probably closed by the remote host
+ // in a non-clean fashion
+ Log.get().info("ChannelReader.processSelectionKey(): " + ex);
+ }
+ catch(Exception ex)
+ {
+ Log.get().warning("ChannelReader.processSelectionKey(): " + ex);
+ }
+
+ if(read == -1) // End of stream
+ {
+ selKey.cancel();
+ }
+ else if(read > 0) // If some data was read
+ {
+ ConnectionWorker.addChannel(socketChannel);
+ }
+ }
+ }
+ else
+ {
+ // Should not happen
+ Log.get().severe("Should not happen: " + selKey.toString());
+ }
+ }
+
+}