chris@1: /* chris@1: * SONEWS News Server chris@1: * see AUTHORS for the list of contributors chris@1: * chris@1: * This program is free software: you can redistribute it and/or modify chris@1: * it under the terms of the GNU General Public License as published by chris@1: * the Free Software Foundation, either version 3 of the License, or chris@1: * (at your option) any later version. chris@1: * chris@1: * This program is distributed in the hope that it will be useful, chris@1: * but WITHOUT ANY WARRANTY; without even the implied warranty of chris@1: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the chris@1: * GNU General Public License for more details. chris@1: * chris@1: * You should have received a copy of the GNU General Public License chris@1: * along with this program. If not, see . chris@1: */ chris@1: chris@1: package org.sonews.daemon; chris@1: chris@1: import java.io.IOException; chris@1: import java.nio.ByteBuffer; chris@1: import java.nio.channels.CancelledKeyException; chris@1: import java.nio.channels.SelectionKey; chris@1: import java.nio.channels.Selector; chris@1: import java.nio.channels.SocketChannel; chris@1: import java.util.Iterator; chris@1: import java.util.Set; chris@3: import org.sonews.util.Log; chris@1: chris@1: /** chris@1: * A Thread task listening for OP_READ events from SocketChannels. chris@1: * @author Christian Lins chris@1: * @since sonews/0.5.0 chris@1: */ chris@1: class ChannelReader extends AbstractDaemon chris@1: { chris@1: chris@1: private static ChannelReader instance = new ChannelReader(); chris@1: chris@1: /** chris@1: * @return Active ChannelReader instance. chris@1: */ chris@1: public static ChannelReader getInstance() chris@1: { chris@1: return instance; chris@1: } chris@1: chris@1: private Selector selector = null; chris@1: chris@1: protected ChannelReader() chris@1: { chris@1: } chris@1: chris@1: /** chris@1: * Sets the selector which is used by this reader to determine the channel chris@1: * to read from. chris@1: * @param selector chris@1: */ chris@1: public void setSelector(final Selector selector) chris@1: { chris@1: this.selector = selector; chris@1: } chris@1: chris@1: /** chris@1: * Run loop. Blocks until some data is available in a channel. chris@1: */ chris@1: @Override chris@1: public void run() chris@1: { chris@1: assert selector != null; chris@1: chris@1: while(isRunning()) chris@1: { chris@1: try chris@1: { chris@1: // select() blocks until some SelectableChannels are ready for chris@1: // processing. There is no need to lock the selector as we have only chris@1: // one thread per selector. chris@1: selector.select(); chris@1: chris@1: // Get list of selection keys with pending events. chris@1: // Note: the selected key set is not thread-safe chris@1: SocketChannel channel = null; chris@1: NNTPConnection conn = null; chris@1: final Set selKeys = selector.selectedKeys(); chris@1: SelectionKey selKey = null; chris@1: chris@1: synchronized (selKeys) chris@1: { chris@1: Iterator it = selKeys.iterator(); chris@1: chris@1: // Process the first pending event chris@1: while (it.hasNext()) chris@1: { chris@1: selKey = (SelectionKey) it.next(); chris@1: channel = (SocketChannel) selKey.channel(); chris@1: conn = Connections.getInstance().get(channel); chris@1: chris@1: // Because we cannot lock the selKey as that would cause a deadlock chris@1: // we lock the connection. To preserve the order of the received chris@1: // byte blocks a selection key for a connection that has pending chris@1: // read events is skipped. chris@1: if (conn == null || conn.tryReadLock()) chris@1: { chris@1: // Remove from set to indicate that it's being processed chris@1: it.remove(); chris@1: if (conn != null) chris@1: { chris@1: break; // End while loop chris@1: } chris@1: } chris@1: else chris@1: { chris@1: selKey = null; chris@1: channel = null; chris@1: conn = null; chris@1: } chris@1: } chris@1: } chris@1: chris@1: // Do not lock the selKeys while processing because this causes chris@1: // a deadlock in sun.nio.ch.SelectorImpl.lockAndDoSelect() chris@1: if (selKey != null && channel != null && conn != null) chris@1: { chris@1: processSelectionKey(conn, channel, selKey); chris@1: conn.unlockReadLock(); chris@1: } chris@1: chris@1: } chris@1: catch(CancelledKeyException ex) chris@1: { chris@1: Log.msg("ChannelReader.run(): " + ex, false); chris@1: if(Log.isDebug()) chris@1: { chris@1: ex.printStackTrace(); chris@1: } chris@1: } chris@1: catch(Exception ex) chris@1: { chris@1: ex.printStackTrace(); chris@1: } chris@1: chris@1: // Eventually wait for a register operation chris@1: synchronized (NNTPDaemon.RegisterGate) chris@1: { chris@1: // Do nothing; FindBugs may warn about an empty synchronized chris@1: // statement, but we cannot use a wait()/notify() mechanism here. chris@1: // If we used something like RegisterGate.wait() we block here chris@1: // until the NNTPDaemon calls notify(). But the daemon only chris@1: // calls notify() if itself is NOT blocked in the listening socket. chris@1: } chris@1: } // while(isRunning()) chris@1: } chris@1: chris@1: private void processSelectionKey(final NNTPConnection connection, chris@1: final SocketChannel socketChannel, final SelectionKey selKey) chris@1: throws InterruptedException, IOException chris@1: { chris@1: assert selKey != null; chris@1: assert selKey.isReadable(); chris@1: chris@1: // Some bytes are available for reading chris@1: if(selKey.isValid()) chris@3: { chris@1: // Lock the channel chris@1: //synchronized(socketChannel) chris@1: { chris@1: // Read the data into the appropriate buffer chris@1: ByteBuffer buf = connection.getInputBuffer(); chris@1: int read = -1; chris@1: try chris@1: { chris@1: read = socketChannel.read(buf); chris@3: } chris@3: catch(IOException ex) chris@3: { chris@3: // The connection was probably closed by the remote host chris@3: // in a non-clean fashion chris@3: Log.msg("ChannelReader.processSelectionKey(): " + ex, true); chris@3: } chris@1: catch(Exception ex) chris@1: { chris@1: Log.msg("ChannelReader.processSelectionKey(): " + ex, false); chris@1: if(Log.isDebug()) chris@1: { chris@1: ex.printStackTrace(); chris@1: } chris@1: } chris@1: chris@1: if(read == -1) // End of stream chris@1: { chris@1: selKey.cancel(); chris@1: } chris@1: else if(read > 0) // If some data was read chris@1: { chris@1: ConnectionWorker.addChannel(socketChannel); chris@1: } chris@1: } chris@1: } chris@1: else chris@1: { chris@1: // Should not happen chris@1: Log.msg(selKey, false); chris@1: } chris@1: } chris@1: chris@1: }