diff -r ed84c8bdd87b -r 969f001a0f5f src/org/sonews/daemon/ChannelReader.java --- a/src/org/sonews/daemon/ChannelReader.java Sun Aug 29 17:28:58 2010 +0200 +++ b/src/org/sonews/daemon/ChannelReader.java Fri Oct 14 13:32:34 2011 +0200 @@ -37,166 +37,141 @@ class ChannelReader extends AbstractDaemon { - private static ChannelReader instance = new ChannelReader(); + private static ChannelReader instance = new ChannelReader(); - /** - * @return Active ChannelReader instance. - */ - public static ChannelReader getInstance() - { - return instance; - } - - private Selector selector = null; - - protected ChannelReader() - { - } - - /** - * Sets the selector which is used by this reader to determine the channel - * to read from. - * @param selector - */ - public void setSelector(final Selector selector) - { - this.selector = selector; - } - - /** - * Run loop. Blocks until some data is available in a channel. - */ - @Override - public void run() - { - assert selector != null; + /** + * @return Active ChannelReader instance. + */ + public static ChannelReader getInstance() + { + return instance; + } + private Selector selector = null; - while(isRunning()) - { - try - { - // select() blocks until some SelectableChannels are ready for - // processing. There is no need to lock the selector as we have only - // one thread per selector. - selector.select(); + protected ChannelReader() + { + } - // Get list of selection keys with pending events. - // Note: the selected key set is not thread-safe - SocketChannel channel = null; - NNTPConnection conn = null; - final Set selKeys = selector.selectedKeys(); - SelectionKey selKey = null; + /** + * Sets the selector which is used by this reader to determine the channel + * to read from. + * @param selector + */ + public void setSelector(final Selector selector) + { + this.selector = selector; + } - synchronized (selKeys) - { - Iterator it = selKeys.iterator(); + /** + * Run loop. Blocks until some data is available in a channel. + */ + @Override + public void run() + { + assert selector != null; - // Process the first pending event - while (it.hasNext()) - { - selKey = (SelectionKey) it.next(); - channel = (SocketChannel) selKey.channel(); - conn = Connections.getInstance().get(channel); + while (isRunning()) { + try { + // select() blocks until some SelectableChannels are ready for + // processing. There is no need to lock the selector as we have only + // one thread per selector. + selector.select(); - // Because we cannot lock the selKey as that would cause a deadlock - // we lock the connection. To preserve the order of the received - // byte blocks a selection key for a connection that has pending - // read events is skipped. - if (conn == null || conn.tryReadLock()) - { - // Remove from set to indicate that it's being processed - it.remove(); - if (conn != null) - { - break; // End while loop - } - } - else - { - selKey = null; - channel = null; - conn = null; - } - } - } + // Get list of selection keys with pending events. + // Note: the selected key set is not thread-safe + SocketChannel channel = null; + NNTPConnection conn = null; + final Set selKeys = selector.selectedKeys(); + SelectionKey selKey = null; - // Do not lock the selKeys while processing because this causes - // a deadlock in sun.nio.ch.SelectorImpl.lockAndDoSelect() - if (selKey != null && channel != null && conn != null) - { - processSelectionKey(conn, channel, selKey); - conn.unlockReadLock(); - } + synchronized (selKeys) { + Iterator it = selKeys.iterator(); - } - catch(CancelledKeyException ex) - { - Log.get().warning("ChannelReader.run(): " + ex); - Log.get().log(Level.INFO, "", ex); - } - catch(Exception ex) - { - ex.printStackTrace(); - } - - // Eventually wait for a register operation - synchronized (NNTPDaemon.RegisterGate) - { - // Do nothing; FindBugs may warn about an empty synchronized - // statement, but we cannot use a wait()/notify() mechanism here. - // If we used something like RegisterGate.wait() we block here - // until the NNTPDaemon calls notify(). But the daemon only - // calls notify() if itself is NOT blocked in the listening socket. - } - } // while(isRunning()) - } - - private void processSelectionKey(final NNTPConnection connection, - final SocketChannel socketChannel, final SelectionKey selKey) - throws InterruptedException, IOException - { - assert selKey != null; - assert selKey.isReadable(); - - // Some bytes are available for reading - if(selKey.isValid()) - { - // Lock the channel - //synchronized(socketChannel) - { - // Read the data into the appropriate buffer - ByteBuffer buf = connection.getInputBuffer(); - int read = -1; - try - { - read = socketChannel.read(buf); - } - catch(IOException ex) - { - // The connection was probably closed by the remote host - // in a non-clean fashion - Log.get().info("ChannelReader.processSelectionKey(): " + ex); - } - catch(Exception ex) - { - Log.get().warning("ChannelReader.processSelectionKey(): " + ex); - } - - if(read == -1) // End of stream - { - selKey.cancel(); - } - else if(read > 0) // If some data was read - { - ConnectionWorker.addChannel(socketChannel); - } - } - } - else - { - // Should not happen - Log.get().severe("Should not happen: " + selKey.toString()); - } - } - + // Process the first pending event + while (it.hasNext()) { + selKey = (SelectionKey) it.next(); + channel = (SocketChannel) selKey.channel(); + conn = Connections.getInstance().get(channel); + + // Because we cannot lock the selKey as that would cause a deadlock + // we lock the connection. To preserve the order of the received + // byte blocks a selection key for a connection that has pending + // read events is skipped. + if (conn == null || conn.tryReadLock()) { + // Remove from set to indicate that it's being processed + it.remove(); + if (conn != null) { + break; // End while loop + } + } else { + selKey = null; + channel = null; + conn = null; + } + } + } + + // Do not lock the selKeys while processing because this causes + // a deadlock in sun.nio.ch.SelectorImpl.lockAndDoSelect() + if (selKey != null && channel != null && conn != null) { + processSelectionKey(conn, channel, selKey); + conn.unlockReadLock(); + } + + } catch (CancelledKeyException ex) { + Log.get().warning("ChannelReader.run(): " + ex); + Log.get().log(Level.INFO, "", ex); + } catch (Exception ex) { + ex.printStackTrace(); + } + + // Eventually wait for a register operation + synchronized (NNTPDaemon.RegisterGate) { + // Do nothing; FindBugs may warn about an empty synchronized + // statement, but we cannot use a wait()/notify() mechanism here. + // If we used something like RegisterGate.wait() we block here + // until the NNTPDaemon calls notify(). But the daemon only + // calls notify() if itself is NOT blocked in the listening socket. + } + } // while(isRunning()) + } + + private void processSelectionKey(final NNTPConnection connection, + final SocketChannel socketChannel, final SelectionKey selKey) + throws InterruptedException, IOException + { + assert selKey != null; + assert selKey.isReadable(); + + // Some bytes are available for reading + if (selKey.isValid()) { + // Lock the channel + //synchronized(socketChannel) + { + // Read the data into the appropriate buffer + ByteBuffer buf = connection.getInputBuffer(); + int read = -1; + try { + read = socketChannel.read(buf); + } catch (IOException ex) { + // The connection was probably closed by the remote host + // in a non-clean fashion + Log.get().info("ChannelReader.processSelectionKey(): " + ex); + } catch (Exception ex) { + Log.get().warning("ChannelReader.processSelectionKey(): " + ex); + } + + if (read == -1) // End of stream + { + selKey.cancel(); + } else if (read > 0) // If some data was read + { + ConnectionWorker.addChannel(socketChannel); + } + } + } else { + // Should not happen + Log.get().severe("Should not happen: " + selKey.toString()); + } + } }