src/org/sonews/daemon/ChannelWriter.java
author cli
Sun Sep 11 17:01:19 2011 +0200 (2011-09-11)
changeset 49 8df94bfd3e2f
parent 35 ed84c8bdd87b
permissions -rwxr-xr-x
Fix for #14
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.daemon;
    20 
    21 import org.sonews.util.Log;
    22 import java.io.IOException;
    23 import java.nio.ByteBuffer;
    24 import java.nio.channels.CancelledKeyException;
    25 import java.nio.channels.SelectionKey;
    26 import java.nio.channels.Selector;
    27 import java.nio.channels.SocketChannel;
    28 import java.util.Iterator;
    29 
    30 /**
    31  * A Thread task that processes OP_WRITE events for SocketChannels.
    32  * @author Christian Lins
    33  * @since sonews/0.5.0
    34  */
    35 class ChannelWriter extends AbstractDaemon
    36 {
    37 
    38 	private static ChannelWriter instance = new ChannelWriter();
    39 
    40 	/**
    41 	 * @return Returns the active ChannelWriter instance.
    42 	 */
    43 	public static ChannelWriter getInstance()
    44 	{
    45 		return instance;
    46 	}
    47 	private Selector selector = null;
    48 
    49 	protected ChannelWriter()
    50 	{
    51 	}
    52 
    53 	/**
    54 	 * @return Selector associated with this instance.
    55 	 */
    56 	public Selector getSelector()
    57 	{
    58 		return this.selector;
    59 	}
    60 
    61 	/**
    62 	 * Sets the selector that is used by this ChannelWriter.
    63 	 * @param selector
    64 	 */
    65 	public void setSelector(final Selector selector)
    66 	{
    67 		this.selector = selector;
    68 	}
    69 
    70 	/**
    71 	 * Run loop.
    72 	 */
    73 	@Override
    74 	public void run()
    75 	{
    76 		assert selector != null;
    77 
    78 		while (isRunning()) {
    79 			try {
    80 				SelectionKey selKey = null;
    81 				SocketChannel socketChannel = null;
    82 				NNTPConnection connection = null;
    83 
    84 				// select() blocks until some SelectableChannels are ready for
    85 				// processing. There is no need to synchronize the selector as we
    86 				// have only one thread per selector.
    87 				selector.select(); // The return value of select can be ignored
    88 
    89 				// Get list of selection keys with pending OP_WRITE events.
    90 				// The keySET is not thread-safe whereas the keys itself are.
    91 				Iterator it = selector.selectedKeys().iterator();
    92 
    93 				while (it.hasNext()) {
    94 					// We remove the first event from the set and store it for
    95 					// later processing.
    96 					selKey = (SelectionKey) it.next();
    97 					socketChannel = (SocketChannel) selKey.channel();
    98 					connection = Connections.getInstance().get(socketChannel);
    99 
   100 					it.remove();
   101 					if (connection != null) {
   102 						break;
   103 					} else {
   104 						selKey = null;
   105 					}
   106 				}
   107 
   108 				if (selKey != null) {
   109 					try {
   110 						// Process the selected key.
   111 						// As there is only one OP_WRITE key for a given channel, we need
   112 						// not to synchronize this processing to retain the order.
   113 						processSelectionKey(connection, socketChannel, selKey);
   114 					} catch (IOException ex) {
   115 						Log.get().warning("Error writing to channel: " + ex);
   116 
   117 						// Cancel write events for this channel
   118 						selKey.cancel();
   119 						connection.shutdownInput();
   120 						connection.shutdownOutput();
   121 					}
   122 				}
   123 
   124 				// Eventually wait for a register operation
   125 				synchronized (NNTPDaemon.RegisterGate) { /* do nothing */ }
   126 			} catch (CancelledKeyException ex) {
   127 				Log.get().info("ChannelWriter.run(): " + ex);
   128 			} catch (Exception ex) {
   129 				ex.printStackTrace();
   130 			}
   131 		} // while(isRunning())
   132 	}
   133 
   134 	private void processSelectionKey(final NNTPConnection connection,
   135 		final SocketChannel socketChannel, final SelectionKey selKey)
   136 		throws InterruptedException, IOException
   137 	{
   138 		assert connection != null;
   139 		assert socketChannel != null;
   140 		assert selKey != null;
   141 		assert selKey.isWritable();
   142 
   143 		// SocketChannel is ready for writing
   144 		if (selKey.isValid()) {
   145 			// Lock the socket channel
   146 			synchronized (socketChannel) {
   147 				// Get next output buffer
   148 				ByteBuffer buf = connection.getOutputBuffer();
   149 				if (buf == null) {
   150 					// Currently we have nothing to write, so we stop the writeable
   151 					// events until we have something to write to the socket channel
   152 					//selKey.cancel();
   153 					selKey.interestOps(0);
   154 					// Update activity timestamp to prevent too early disconnects
   155 					// on slow client connections
   156 					connection.setLastActivity(System.currentTimeMillis());
   157 					return;
   158 				}
   159 
   160 				while (buf != null) // There is data to be send
   161 				{
   162 					// Write buffer to socket channel; this method does not block
   163 					if (socketChannel.write(buf) <= 0) {
   164 						// Perhaps there is data to be written, but the SocketChannel's
   165 						// buffer is full, so we stop writing to until the next event.
   166 						break;
   167 					} else {
   168 						// Retrieve next buffer if available; method may return the same
   169 						// buffer instance if it still have some bytes remaining
   170 						buf = connection.getOutputBuffer();
   171 					}
   172 				}
   173 			}
   174 		} else {
   175 			Log.get().warning("Invalid OP_WRITE key: " + selKey);
   176 
   177 			if (socketChannel.socket().isClosed()) {
   178 				connection.shutdownInput();
   179 				connection.shutdownOutput();
   180 				socketChannel.close();
   181 				Log.get().info("Connection closed.");
   182 			}
   183 		}
   184 	}
   185 }