chris@1: /* chris@1: * SONEWS News Server chris@1: * see AUTHORS for the list of contributors chris@1: * chris@1: * This program is free software: you can redistribute it and/or modify chris@1: * it under the terms of the GNU General Public License as published by chris@1: * the Free Software Foundation, either version 3 of the License, or chris@1: * (at your option) any later version. chris@1: * chris@1: * This program is distributed in the hope that it will be useful, chris@1: * but WITHOUT ANY WARRANTY; without even the implied warranty of chris@1: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the chris@1: * GNU General Public License for more details. chris@1: * chris@1: * You should have received a copy of the GNU General Public License chris@1: * along with this program. If not, see . chris@1: */ chris@1: chris@1: package org.sonews.daemon; chris@1: chris@1: import java.io.IOException; chris@1: import java.nio.ByteBuffer; chris@1: import java.nio.channels.CancelledKeyException; chris@1: import java.nio.channels.SelectionKey; chris@1: import java.nio.channels.Selector; chris@1: import java.nio.channels.SocketChannel; chris@1: import java.util.Iterator; chris@1: import java.util.Set; cli@15: import java.util.logging.Level; chris@3: import org.sonews.util.Log; chris@1: chris@1: /** chris@1: * A Thread task listening for OP_READ events from SocketChannels. chris@1: * @author Christian Lins chris@1: * @since sonews/0.5.0 chris@1: */ chris@1: class ChannelReader extends AbstractDaemon chris@1: { chris@1: cli@37: private static ChannelReader instance = new ChannelReader(); chris@1: cli@37: /** cli@37: * @return Active ChannelReader instance. cli@37: */ cli@37: public static ChannelReader getInstance() cli@37: { cli@37: return instance; cli@37: } cli@37: private Selector selector = null; chris@1: cli@37: protected ChannelReader() cli@37: { cli@37: } chris@1: cli@37: /** cli@37: * Sets the selector which is used by this reader to determine the channel cli@37: * to read from. cli@37: * @param selector cli@37: */ cli@37: public void setSelector(final Selector selector) cli@37: { cli@37: this.selector = selector; cli@37: } chris@1: cli@37: /** cli@37: * Run loop. Blocks until some data is available in a channel. cli@37: */ cli@37: @Override cli@37: public void run() cli@37: { cli@37: assert selector != null; chris@1: cli@37: while (isRunning()) { cli@37: try { cli@37: // select() blocks until some SelectableChannels are ready for cli@37: // processing. There is no need to lock the selector as we have only cli@37: // one thread per selector. cli@37: selector.select(); chris@1: cli@37: // Get list of selection keys with pending events. cli@37: // Note: the selected key set is not thread-safe cli@37: SocketChannel channel = null; cli@37: NNTPConnection conn = null; cli@37: final Set selKeys = selector.selectedKeys(); cli@37: SelectionKey selKey = null; chris@1: cli@37: synchronized (selKeys) { cli@37: Iterator it = selKeys.iterator(); chris@1: cli@37: // Process the first pending event cli@37: while (it.hasNext()) { cli@37: selKey = (SelectionKey) it.next(); cli@37: channel = (SocketChannel) selKey.channel(); cli@37: conn = Connections.getInstance().get(channel); cli@37: cli@37: // Because we cannot lock the selKey as that would cause a deadlock cli@37: // we lock the connection. To preserve the order of the received cli@37: // byte blocks a selection key for a connection that has pending cli@37: // read events is skipped. cli@37: if (conn == null || conn.tryReadLock()) { cli@37: // Remove from set to indicate that it's being processed cli@37: it.remove(); cli@37: if (conn != null) { cli@37: break; // End while loop cli@37: } cli@37: } else { cli@37: selKey = null; cli@37: channel = null; cli@37: conn = null; cli@37: } cli@37: } cli@37: } cli@37: cli@37: // Do not lock the selKeys while processing because this causes cli@37: // a deadlock in sun.nio.ch.SelectorImpl.lockAndDoSelect() cli@37: if (selKey != null && channel != null && conn != null) { cli@37: processSelectionKey(conn, channel, selKey); cli@37: conn.unlockReadLock(); cli@37: } cli@37: cli@37: } catch (CancelledKeyException ex) { cli@37: Log.get().warning("ChannelReader.run(): " + ex); cli@37: Log.get().log(Level.INFO, "", ex); cli@37: } catch (Exception ex) { cli@37: ex.printStackTrace(); cli@37: } cli@37: cli@37: // Eventually wait for a register operation cli@37: synchronized (NNTPDaemon.RegisterGate) { cli@37: // Do nothing; FindBugs may warn about an empty synchronized cli@37: // statement, but we cannot use a wait()/notify() mechanism here. cli@37: // If we used something like RegisterGate.wait() we block here cli@37: // until the NNTPDaemon calls notify(). But the daemon only cli@37: // calls notify() if itself is NOT blocked in the listening socket. cli@37: } cli@37: } // while(isRunning()) cli@37: } cli@37: cli@37: private void processSelectionKey(final NNTPConnection connection, cli@37: final SocketChannel socketChannel, final SelectionKey selKey) cli@37: throws InterruptedException, IOException cli@37: { cli@37: assert selKey != null; cli@37: assert selKey.isReadable(); cli@37: cli@37: // Some bytes are available for reading cli@37: if (selKey.isValid()) { cli@37: // Lock the channel cli@37: //synchronized(socketChannel) cli@37: { cli@37: // Read the data into the appropriate buffer cli@37: ByteBuffer buf = connection.getInputBuffer(); cli@37: int read = -1; cli@37: try { cli@37: read = socketChannel.read(buf); cli@37: } catch (IOException ex) { cli@37: // The connection was probably closed by the remote host cli@37: // in a non-clean fashion cli@37: Log.get().info("ChannelReader.processSelectionKey(): " + ex); cli@37: } catch (Exception ex) { cli@37: Log.get().warning("ChannelReader.processSelectionKey(): " + ex); cli@37: } cli@37: cli@37: if (read == -1) // End of stream cli@37: { cli@37: selKey.cancel(); cli@37: } else if (read > 0) // If some data was read cli@37: { cli@37: ConnectionWorker.addChannel(socketChannel); cli@37: } cli@37: } cli@37: } else { cli@37: // Should not happen cli@37: Log.get().severe("Should not happen: " + selKey.toString()); cli@37: } cli@37: } chris@1: }