1.1 --- a/src/org/sonews/daemon/ChannelWriter.java Sun Aug 29 17:28:58 2010 +0200
1.2 +++ b/src/org/sonews/daemon/ChannelWriter.java Sun Sep 11 15:05:04 2011 +0200
1.3 @@ -35,176 +35,151 @@
1.4 class ChannelWriter extends AbstractDaemon
1.5 {
1.6
1.7 - private static ChannelWriter instance = new ChannelWriter();
1.8 + private static ChannelWriter instance = new ChannelWriter();
1.9
1.10 - /**
1.11 - * @return Returns the active ChannelWriter instance.
1.12 - */
1.13 - public static ChannelWriter getInstance()
1.14 - {
1.15 - return instance;
1.16 - }
1.17 -
1.18 - private Selector selector = null;
1.19 -
1.20 - protected ChannelWriter()
1.21 - {
1.22 - }
1.23 -
1.24 - /**
1.25 - * @return Selector associated with this instance.
1.26 - */
1.27 - public Selector getSelector()
1.28 - {
1.29 - return this.selector;
1.30 - }
1.31 -
1.32 - /**
1.33 - * Sets the selector that is used by this ChannelWriter.
1.34 - * @param selector
1.35 - */
1.36 - public void setSelector(final Selector selector)
1.37 - {
1.38 - this.selector = selector;
1.39 - }
1.40 -
1.41 - /**
1.42 - * Run loop.
1.43 - */
1.44 - @Override
1.45 - public void run()
1.46 - {
1.47 - assert selector != null;
1.48 + /**
1.49 + * @return Returns the active ChannelWriter instance.
1.50 + */
1.51 + public static ChannelWriter getInstance()
1.52 + {
1.53 + return instance;
1.54 + }
1.55 + private Selector selector = null;
1.56
1.57 - while(isRunning())
1.58 - {
1.59 - try
1.60 - {
1.61 - SelectionKey selKey = null;
1.62 - SocketChannel socketChannel = null;
1.63 - NNTPConnection connection = null;
1.64 + protected ChannelWriter()
1.65 + {
1.66 + }
1.67
1.68 - // select() blocks until some SelectableChannels are ready for
1.69 - // processing. There is no need to synchronize the selector as we
1.70 - // have only one thread per selector.
1.71 - selector.select(); // The return value of select can be ignored
1.72 + /**
1.73 + * @return Selector associated with this instance.
1.74 + */
1.75 + public Selector getSelector()
1.76 + {
1.77 + return this.selector;
1.78 + }
1.79
1.80 - // Get list of selection keys with pending OP_WRITE events.
1.81 - // The keySET is not thread-safe whereas the keys itself are.
1.82 - Iterator it = selector.selectedKeys().iterator();
1.83 + /**
1.84 + * Sets the selector that is used by this ChannelWriter.
1.85 + * @param selector
1.86 + */
1.87 + public void setSelector(final Selector selector)
1.88 + {
1.89 + this.selector = selector;
1.90 + }
1.91
1.92 - while (it.hasNext())
1.93 - {
1.94 - // We remove the first event from the set and store it for
1.95 - // later processing.
1.96 - selKey = (SelectionKey) it.next();
1.97 - socketChannel = (SocketChannel) selKey.channel();
1.98 - connection = Connections.getInstance().get(socketChannel);
1.99 + /**
1.100 + * Run loop.
1.101 + */
1.102 + @Override
1.103 + public void run()
1.104 + {
1.105 + assert selector != null;
1.106
1.107 - it.remove();
1.108 - if (connection != null)
1.109 - {
1.110 - break;
1.111 - }
1.112 - else
1.113 - {
1.114 - selKey = null;
1.115 - }
1.116 - }
1.117 -
1.118 - if (selKey != null)
1.119 - {
1.120 - try
1.121 - {
1.122 - // Process the selected key.
1.123 - // As there is only one OP_WRITE key for a given channel, we need
1.124 - // not to synchronize this processing to retain the order.
1.125 - processSelectionKey(connection, socketChannel, selKey);
1.126 - }
1.127 - catch (IOException ex)
1.128 - {
1.129 - Log.get().warning("Error writing to channel: " + ex);
1.130 + while (isRunning()) {
1.131 + try {
1.132 + SelectionKey selKey = null;
1.133 + SocketChannel socketChannel = null;
1.134 + NNTPConnection connection = null;
1.135
1.136 - // Cancel write events for this channel
1.137 - selKey.cancel();
1.138 - connection.shutdownInput();
1.139 - connection.shutdownOutput();
1.140 - }
1.141 - }
1.142 -
1.143 - // Eventually wait for a register operation
1.144 - synchronized(NNTPDaemon.RegisterGate) { /* do nothing */ }
1.145 - }
1.146 - catch(CancelledKeyException ex)
1.147 - {
1.148 - Log.get().info("ChannelWriter.run(): " + ex);
1.149 - }
1.150 - catch(Exception ex)
1.151 - {
1.152 - ex.printStackTrace();
1.153 - }
1.154 - } // while(isRunning())
1.155 - }
1.156 -
1.157 - private void processSelectionKey(final NNTPConnection connection,
1.158 - final SocketChannel socketChannel, final SelectionKey selKey)
1.159 - throws InterruptedException, IOException
1.160 - {
1.161 - assert connection != null;
1.162 - assert socketChannel != null;
1.163 - assert selKey != null;
1.164 - assert selKey.isWritable();
1.165 + // select() blocks until some SelectableChannels are ready for
1.166 + // processing. There is no need to synchronize the selector as we
1.167 + // have only one thread per selector.
1.168 + selector.select(); // The return value of select can be ignored
1.169
1.170 - // SocketChannel is ready for writing
1.171 - if(selKey.isValid())
1.172 - {
1.173 - // Lock the socket channel
1.174 - synchronized(socketChannel)
1.175 - {
1.176 - // Get next output buffer
1.177 - ByteBuffer buf = connection.getOutputBuffer();
1.178 - if(buf == null)
1.179 - {
1.180 - // Currently we have nothing to write, so we stop the writeable
1.181 - // events until we have something to write to the socket channel
1.182 - //selKey.cancel();
1.183 - selKey.interestOps(0);
1.184 - // Update activity timestamp to prevent too early disconnects
1.185 - // on slow client connections
1.186 - connection.setLastActivity(System.currentTimeMillis());
1.187 - return;
1.188 - }
1.189 -
1.190 - while(buf != null) // There is data to be send
1.191 - {
1.192 - // Write buffer to socket channel; this method does not block
1.193 - if(socketChannel.write(buf) <= 0)
1.194 - {
1.195 - // Perhaps there is data to be written, but the SocketChannel's
1.196 - // buffer is full, so we stop writing to until the next event.
1.197 - break;
1.198 - }
1.199 - else
1.200 - {
1.201 - // Retrieve next buffer if available; method may return the same
1.202 - // buffer instance if it still have some bytes remaining
1.203 - buf = connection.getOutputBuffer();
1.204 - }
1.205 - }
1.206 - }
1.207 - }
1.208 - else
1.209 - {
1.210 - Log.get().warning("Invalid OP_WRITE key: " + selKey);
1.211 + // Get list of selection keys with pending OP_WRITE events.
1.212 + // The keySET is not thread-safe whereas the keys itself are.
1.213 + Iterator it = selector.selectedKeys().iterator();
1.214
1.215 - if(socketChannel.socket().isClosed())
1.216 - {
1.217 - connection.shutdownInput();
1.218 - connection.shutdownOutput();
1.219 - socketChannel.close();
1.220 - Log.get().info("Connection closed.");
1.221 - }
1.222 - }
1.223 - }
1.224 -
1.225 + while (it.hasNext()) {
1.226 + // We remove the first event from the set and store it for
1.227 + // later processing.
1.228 + selKey = (SelectionKey) it.next();
1.229 + socketChannel = (SocketChannel) selKey.channel();
1.230 + connection = Connections.getInstance().get(socketChannel);
1.231 +
1.232 + it.remove();
1.233 + if (connection != null) {
1.234 + break;
1.235 + } else {
1.236 + selKey = null;
1.237 + }
1.238 + }
1.239 +
1.240 + if (selKey != null) {
1.241 + try {
1.242 + // Process the selected key.
1.243 + // As there is only one OP_WRITE key for a given channel, we need
1.244 + // not to synchronize this processing to retain the order.
1.245 + processSelectionKey(connection, socketChannel, selKey);
1.246 + } catch (IOException ex) {
1.247 + Log.get().warning("Error writing to channel: " + ex);
1.248 +
1.249 + // Cancel write events for this channel
1.250 + selKey.cancel();
1.251 + connection.shutdownInput();
1.252 + connection.shutdownOutput();
1.253 + }
1.254 + }
1.255 +
1.256 + // Eventually wait for a register operation
1.257 + synchronized (NNTPDaemon.RegisterGate) { /* do nothing */ }
1.258 + } catch (CancelledKeyException ex) {
1.259 + Log.get().info("ChannelWriter.run(): " + ex);
1.260 + } catch (Exception ex) {
1.261 + ex.printStackTrace();
1.262 + }
1.263 + } // while(isRunning())
1.264 + }
1.265 +
1.266 + private void processSelectionKey(final NNTPConnection connection,
1.267 + final SocketChannel socketChannel, final SelectionKey selKey)
1.268 + throws InterruptedException, IOException
1.269 + {
1.270 + assert connection != null;
1.271 + assert socketChannel != null;
1.272 + assert selKey != null;
1.273 + assert selKey.isWritable();
1.274 +
1.275 + // SocketChannel is ready for writing
1.276 + if (selKey.isValid()) {
1.277 + // Lock the socket channel
1.278 + synchronized (socketChannel) {
1.279 + // Get next output buffer
1.280 + ByteBuffer buf = connection.getOutputBuffer();
1.281 + if (buf == null) {
1.282 + // Currently we have nothing to write, so we stop the writeable
1.283 + // events until we have something to write to the socket channel
1.284 + //selKey.cancel();
1.285 + selKey.interestOps(0);
1.286 + // Update activity timestamp to prevent too early disconnects
1.287 + // on slow client connections
1.288 + connection.setLastActivity(System.currentTimeMillis());
1.289 + return;
1.290 + }
1.291 +
1.292 + while (buf != null) // There is data to be send
1.293 + {
1.294 + // Write buffer to socket channel; this method does not block
1.295 + if (socketChannel.write(buf) <= 0) {
1.296 + // Perhaps there is data to be written, but the SocketChannel's
1.297 + // buffer is full, so we stop writing to until the next event.
1.298 + break;
1.299 + } else {
1.300 + // Retrieve next buffer if available; method may return the same
1.301 + // buffer instance if it still have some bytes remaining
1.302 + buf = connection.getOutputBuffer();
1.303 + }
1.304 + }
1.305 + }
1.306 + } else {
1.307 + Log.get().warning("Invalid OP_WRITE key: " + selKey);
1.308 +
1.309 + if (socketChannel.socket().isClosed()) {
1.310 + connection.shutdownInput();
1.311 + connection.shutdownOutput();
1.312 + socketChannel.close();
1.313 + Log.get().info("Connection closed.");
1.314 + }
1.315 + }
1.316 + }
1.317 }