src/org/sonews/daemon/ChannelWriter.java
author cli
Sun Sep 11 15:05:04 2011 +0200 (2011-09-11)
changeset 48 b78e77619152
parent 35 ed84c8bdd87b
permissions -rwxr-xr-x
Merge Channel and Group classes.
chris@1
     1
/*
chris@1
     2
 *   SONEWS News Server
chris@1
     3
 *   see AUTHORS for the list of contributors
chris@1
     4
 *
chris@1
     5
 *   This program is free software: you can redistribute it and/or modify
chris@1
     6
 *   it under the terms of the GNU General Public License as published by
chris@1
     7
 *   the Free Software Foundation, either version 3 of the License, or
chris@1
     8
 *   (at your option) any later version.
chris@1
     9
 *
chris@1
    10
 *   This program is distributed in the hope that it will be useful,
chris@1
    11
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1
    12
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1
    13
 *   GNU General Public License for more details.
chris@1
    14
 *
chris@1
    15
 *   You should have received a copy of the GNU General Public License
chris@1
    16
 *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1
    17
 */
chris@1
    18
chris@1
    19
package org.sonews.daemon;
chris@1
    20
chris@1
    21
import org.sonews.util.Log;
chris@1
    22
import java.io.IOException;
chris@1
    23
import java.nio.ByteBuffer;
chris@1
    24
import java.nio.channels.CancelledKeyException;
chris@1
    25
import java.nio.channels.SelectionKey;
chris@1
    26
import java.nio.channels.Selector;
chris@1
    27
import java.nio.channels.SocketChannel;
chris@1
    28
import java.util.Iterator;
chris@1
    29
chris@1
    30
/**
chris@1
    31
 * A Thread task that processes OP_WRITE events for SocketChannels.
chris@1
    32
 * @author Christian Lins
chris@1
    33
 * @since sonews/0.5.0
chris@1
    34
 */
chris@1
    35
class ChannelWriter extends AbstractDaemon
chris@1
    36
{
chris@1
    37
cli@37
    38
	private static ChannelWriter instance = new ChannelWriter();
chris@1
    39
cli@37
    40
	/**
cli@37
    41
	 * @return Returns the active ChannelWriter instance.
cli@37
    42
	 */
cli@37
    43
	public static ChannelWriter getInstance()
cli@37
    44
	{
cli@37
    45
		return instance;
cli@37
    46
	}
cli@37
    47
	private Selector selector = null;
chris@1
    48
cli@37
    49
	protected ChannelWriter()
cli@37
    50
	{
cli@37
    51
	}
chris@1
    52
cli@37
    53
	/**
cli@37
    54
	 * @return Selector associated with this instance.
cli@37
    55
	 */
cli@37
    56
	public Selector getSelector()
cli@37
    57
	{
cli@37
    58
		return this.selector;
cli@37
    59
	}
chris@1
    60
cli@37
    61
	/**
cli@37
    62
	 * Sets the selector that is used by this ChannelWriter.
cli@37
    63
	 * @param selector
cli@37
    64
	 */
cli@37
    65
	public void setSelector(final Selector selector)
cli@37
    66
	{
cli@37
    67
		this.selector = selector;
cli@37
    68
	}
chris@1
    69
cli@37
    70
	/**
cli@37
    71
	 * Run loop.
cli@37
    72
	 */
cli@37
    73
	@Override
cli@37
    74
	public void run()
cli@37
    75
	{
cli@37
    76
		assert selector != null;
chris@1
    77
cli@37
    78
		while (isRunning()) {
cli@37
    79
			try {
cli@37
    80
				SelectionKey selKey = null;
cli@37
    81
				SocketChannel socketChannel = null;
cli@37
    82
				NNTPConnection connection = null;
chris@1
    83
cli@37
    84
				// select() blocks until some SelectableChannels are ready for
cli@37
    85
				// processing. There is no need to synchronize the selector as we
cli@37
    86
				// have only one thread per selector.
cli@37
    87
				selector.select(); // The return value of select can be ignored
chris@1
    88
cli@37
    89
				// Get list of selection keys with pending OP_WRITE events.
cli@37
    90
				// The keySET is not thread-safe whereas the keys itself are.
cli@37
    91
				Iterator it = selector.selectedKeys().iterator();
chris@1
    92
cli@37
    93
				while (it.hasNext()) {
cli@37
    94
					// We remove the first event from the set and store it for
cli@37
    95
					// later processing.
cli@37
    96
					selKey = (SelectionKey) it.next();
cli@37
    97
					socketChannel = (SocketChannel) selKey.channel();
cli@37
    98
					connection = Connections.getInstance().get(socketChannel);
cli@37
    99
cli@37
   100
					it.remove();
cli@37
   101
					if (connection != null) {
cli@37
   102
						break;
cli@37
   103
					} else {
cli@37
   104
						selKey = null;
cli@37
   105
					}
cli@37
   106
				}
cli@37
   107
cli@37
   108
				if (selKey != null) {
cli@37
   109
					try {
cli@37
   110
						// Process the selected key.
cli@37
   111
						// As there is only one OP_WRITE key for a given channel, we need
cli@37
   112
						// not to synchronize this processing to retain the order.
cli@37
   113
						processSelectionKey(connection, socketChannel, selKey);
cli@37
   114
					} catch (IOException ex) {
cli@37
   115
						Log.get().warning("Error writing to channel: " + ex);
cli@37
   116
cli@37
   117
						// Cancel write events for this channel
cli@37
   118
						selKey.cancel();
cli@37
   119
						connection.shutdownInput();
cli@37
   120
						connection.shutdownOutput();
cli@37
   121
					}
cli@37
   122
				}
cli@37
   123
cli@37
   124
				// Eventually wait for a register operation
cli@37
   125
				synchronized (NNTPDaemon.RegisterGate) { /* do nothing */ }
cli@37
   126
			} catch (CancelledKeyException ex) {
cli@37
   127
				Log.get().info("ChannelWriter.run(): " + ex);
cli@37
   128
			} catch (Exception ex) {
cli@37
   129
				ex.printStackTrace();
cli@37
   130
			}
cli@37
   131
		} // while(isRunning())
cli@37
   132
	}
cli@37
   133
cli@37
   134
	private void processSelectionKey(final NNTPConnection connection,
cli@37
   135
		final SocketChannel socketChannel, final SelectionKey selKey)
cli@37
   136
		throws InterruptedException, IOException
cli@37
   137
	{
cli@37
   138
		assert connection != null;
cli@37
   139
		assert socketChannel != null;
cli@37
   140
		assert selKey != null;
cli@37
   141
		assert selKey.isWritable();
cli@37
   142
cli@37
   143
		// SocketChannel is ready for writing
cli@37
   144
		if (selKey.isValid()) {
cli@37
   145
			// Lock the socket channel
cli@37
   146
			synchronized (socketChannel) {
cli@37
   147
				// Get next output buffer
cli@37
   148
				ByteBuffer buf = connection.getOutputBuffer();
cli@37
   149
				if (buf == null) {
cli@37
   150
					// Currently we have nothing to write, so we stop the writeable
cli@37
   151
					// events until we have something to write to the socket channel
cli@37
   152
					//selKey.cancel();
cli@37
   153
					selKey.interestOps(0);
cli@37
   154
					// Update activity timestamp to prevent too early disconnects
cli@37
   155
					// on slow client connections
cli@37
   156
					connection.setLastActivity(System.currentTimeMillis());
cli@37
   157
					return;
cli@37
   158
				}
cli@37
   159
cli@37
   160
				while (buf != null) // There is data to be send
cli@37
   161
				{
cli@37
   162
					// Write buffer to socket channel; this method does not block
cli@37
   163
					if (socketChannel.write(buf) <= 0) {
cli@37
   164
						// Perhaps there is data to be written, but the SocketChannel's
cli@37
   165
						// buffer is full, so we stop writing to until the next event.
cli@37
   166
						break;
cli@37
   167
					} else {
cli@37
   168
						// Retrieve next buffer if available; method may return the same
cli@37
   169
						// buffer instance if it still have some bytes remaining
cli@37
   170
						buf = connection.getOutputBuffer();
cli@37
   171
					}
cli@37
   172
				}
cli@37
   173
			}
cli@37
   174
		} else {
cli@37
   175
			Log.get().warning("Invalid OP_WRITE key: " + selKey);
cli@37
   176
cli@37
   177
			if (socketChannel.socket().isClosed()) {
cli@37
   178
				connection.shutdownInput();
cli@37
   179
				connection.shutdownOutput();
cli@37
   180
				socketChannel.close();
cli@37
   181
				Log.get().info("Connection closed.");
cli@37
   182
			}
cli@37
   183
		}
cli@37
   184
	}
chris@1
   185
}