3 * see AUTHORS for the list of contributors
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 package org.sonews.daemon;
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.CancelledKeyException;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.Selector;
26 import java.nio.channels.SocketChannel;
27 import java.util.Iterator;
29 import java.util.logging.Level;
30 import org.sonews.util.Log;
33 * A Thread task listening for OP_READ events from SocketChannels.
34 * @author Christian Lins
37 class ChannelReader extends AbstractDaemon
40 private static ChannelReader instance = new ChannelReader();
43 * @return Active ChannelReader instance.
45 public static ChannelReader getInstance()
49 private Selector selector = null;
51 protected ChannelReader()
56 * Sets the selector which is used by this reader to determine the channel
60 public void setSelector(final Selector selector)
62 this.selector = selector;
66 * Run loop. Blocks until some data is available in a channel.
71 assert selector != null;
75 // select() blocks until some SelectableChannels are ready for
76 // processing. There is no need to lock the selector as we have only
77 // one thread per selector.
80 // Get list of selection keys with pending events.
81 // Note: the selected key set is not thread-safe
82 SocketChannel channel = null;
83 NNTPConnection conn = null;
84 final Set<SelectionKey> selKeys = selector.selectedKeys();
85 SelectionKey selKey = null;
87 synchronized (selKeys) {
88 Iterator it = selKeys.iterator();
90 // Process the first pending event
91 while (it.hasNext()) {
92 selKey = (SelectionKey) it.next();
93 channel = (SocketChannel) selKey.channel();
94 conn = Connections.getInstance().get(channel);
96 // Because we cannot lock the selKey as that would cause a deadlock
97 // we lock the connection. To preserve the order of the received
98 // byte blocks a selection key for a connection that has pending
99 // read events is skipped.
100 if (conn == null || conn.tryReadLock()) {
101 // Remove from set to indicate that it's being processed
104 break; // End while loop
114 // Do not lock the selKeys while processing because this causes
115 // a deadlock in sun.nio.ch.SelectorImpl.lockAndDoSelect()
116 if (selKey != null && channel != null && conn != null) {
117 processSelectionKey(conn, channel, selKey);
118 conn.unlockReadLock();
121 } catch (CancelledKeyException ex) {
122 Log.get().warning("ChannelReader.run(): " + ex);
123 Log.get().log(Level.INFO, "", ex);
124 } catch (Exception ex) {
125 ex.printStackTrace();
128 // Eventually wait for a register operation
129 synchronized (NNTPDaemon.RegisterGate) {
130 // Do nothing; FindBugs may warn about an empty synchronized
131 // statement, but we cannot use a wait()/notify() mechanism here.
132 // If we used something like RegisterGate.wait() we block here
133 // until the NNTPDaemon calls notify(). But the daemon only
134 // calls notify() if itself is NOT blocked in the listening socket.
136 } // while(isRunning())
139 private void processSelectionKey(final NNTPConnection connection,
140 final SocketChannel socketChannel, final SelectionKey selKey)
141 throws InterruptedException, IOException
143 assert selKey != null;
144 assert selKey.isReadable();
146 // Some bytes are available for reading
147 if (selKey.isValid()) {
149 //synchronized(socketChannel)
151 // Read the data into the appropriate buffer
152 ByteBuffer buf = connection.getInputBuffer();
155 read = socketChannel.read(buf);
156 } catch (IOException ex) {
157 // The connection was probably closed by the remote host
158 // in a non-clean fashion
159 Log.get().info("ChannelReader.processSelectionKey(): " + ex);
160 } catch (Exception ex) {
161 Log.get().warning("ChannelReader.processSelectionKey(): " + ex);
164 if (read == -1) // End of stream
167 } else if (read > 0) // If some data was read
169 ConnectionWorker.addChannel(socketChannel);
174 Log.get().severe("Should not happen: " + selKey.toString());