src/org/sonews/daemon/ChannelLineBuffers.java
author cli
Sun Sep 11 17:36:47 2011 +0200 (2011-09-11)
changeset 50 0bf10add82d9
parent 35 ed84c8bdd87b
permissions -rwxr-xr-x
Fix for issue #17. Error when posting to mailinglist is now reported back to user as NNTP error.
chris@1
     1
/*
chris@1
     2
 *   SONEWS News Server
chris@1
     3
 *   see AUTHORS for the list of contributors
chris@1
     4
 *
chris@1
     5
 *   This program is free software: you can redistribute it and/or modify
chris@1
     6
 *   it under the terms of the GNU General Public License as published by
chris@1
     7
 *   the Free Software Foundation, either version 3 of the License, or
chris@1
     8
 *   (at your option) any later version.
chris@1
     9
 *
chris@1
    10
 *   This program is distributed in the hope that it will be useful,
chris@1
    11
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1
    12
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1
    13
 *   GNU General Public License for more details.
chris@1
    14
 *
chris@1
    15
 *   You should have received a copy of the GNU General Public License
chris@1
    16
 *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1
    17
 */
chris@1
    18
chris@1
    19
package org.sonews.daemon;
chris@1
    20
chris@1
    21
import java.nio.ByteBuffer;
chris@1
    22
import java.nio.channels.ClosedChannelException;
chris@1
    23
import java.util.ArrayList;
chris@1
    24
import java.util.List;
chris@1
    25
chris@1
    26
/**
chris@1
    27
 * Class holding ByteBuffers for SocketChannels/NNTPConnection.
chris@1
    28
 * Due to the complex nature of AIO/NIO we must properly handle the line 
chris@1
    29
 * buffers for the input and output of the SocketChannels.
chris@1
    30
 * @author Christian Lins
chris@1
    31
 * @since sonews/0.5.0
chris@1
    32
 */
cli@37
    33
public class ChannelLineBuffers
chris@1
    34
{
cli@25
    35
cli@37
    36
	/**
cli@37
    37
	 * Size of one small buffer;
cli@37
    38
	 * per default this is 512 bytes to fit one standard line.
cli@37
    39
	 */
cli@37
    40
	public static final int BUFFER_SIZE = 512;
cli@37
    41
	private static int maxCachedBuffers = 2048; // Cached buffers maximum
cli@37
    42
	private static final List<ByteBuffer> freeSmallBuffers = new ArrayList<ByteBuffer>(maxCachedBuffers);
chris@1
    43
cli@37
    44
	/**
cli@37
    45
	 * Allocates a predefined number of direct ByteBuffers (allocated via
cli@37
    46
	 * ByteBuffer.allocateDirect()). This method is Thread-safe, but should only
cli@37
    47
	 * called at startup.
cli@37
    48
	 */
cli@37
    49
	public static void allocateDirect()
cli@37
    50
	{
cli@37
    51
		synchronized (freeSmallBuffers) {
cli@37
    52
			for (int n = 0; n < maxCachedBuffers; n++) {
cli@37
    53
				ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
cli@37
    54
				freeSmallBuffers.add(buffer);
cli@37
    55
			}
cli@37
    56
		}
cli@37
    57
	}
cli@37
    58
	private ByteBuffer inputBuffer = newLineBuffer();
cli@37
    59
	private List<ByteBuffer> outputBuffers = new ArrayList<ByteBuffer>();
chris@1
    60
cli@37
    61
	/**
cli@37
    62
	 * Add the given ByteBuffer to the list of buffers to be send to the client.
cli@37
    63
	 * This method is Thread-safe.
cli@37
    64
	 * @param buffer
cli@37
    65
	 * @throws java.nio.channels.ClosedChannelException If the client channel was
cli@37
    66
	 * already closed.
cli@37
    67
	 */
cli@37
    68
	public void addOutputBuffer(ByteBuffer buffer)
cli@37
    69
		throws ClosedChannelException
cli@37
    70
	{
cli@37
    71
		if (outputBuffers == null) {
cli@37
    72
			throw new ClosedChannelException();
cli@37
    73
		}
chris@1
    74
cli@37
    75
		synchronized (outputBuffers) {
cli@37
    76
			outputBuffers.add(buffer);
cli@37
    77
		}
cli@37
    78
	}
chris@1
    79
cli@37
    80
	/**
cli@37
    81
	 * Currently a channel has only one input buffer. This *may* be a bottleneck
cli@37
    82
	 * and should investigated in the future.
cli@37
    83
	 * @param channel
cli@37
    84
	 * @return The input buffer associated with given channel.
cli@37
    85
	 */
cli@37
    86
	public ByteBuffer getInputBuffer()
cli@37
    87
	{
cli@37
    88
		return inputBuffer;
cli@37
    89
	}
chris@1
    90
cli@37
    91
	/**
cli@37
    92
	 * Returns the current output buffer for writing(!) to SocketChannel.
cli@37
    93
	 * @param channel
cli@37
    94
	 * @return The next input buffer that contains unprocessed data or null
cli@37
    95
	 * if the connection was closed or there are no more unprocessed buffers.
cli@37
    96
	 */
cli@37
    97
	public ByteBuffer getOutputBuffer()
cli@37
    98
	{
cli@37
    99
		synchronized (outputBuffers) {
cli@37
   100
			if (outputBuffers == null || outputBuffers.isEmpty()) {
cli@37
   101
				return null;
cli@37
   102
			} else {
cli@37
   103
				ByteBuffer buffer = outputBuffers.get(0);
cli@37
   104
				if (buffer.remaining() == 0) {
cli@37
   105
					outputBuffers.remove(0);
cli@37
   106
					// Add old buffers to the list of free buffers
cli@37
   107
					recycleBuffer(buffer);
cli@37
   108
					buffer = getOutputBuffer();
cli@37
   109
				}
cli@37
   110
				return buffer;
cli@37
   111
			}
cli@37
   112
		}
cli@37
   113
	}
chris@1
   114
cli@37
   115
	/**
cli@37
   116
	 * @return false if there are output buffers pending to be written to the
cli@37
   117
	 * client.
cli@37
   118
	 */
cli@37
   119
	boolean isOutputBufferEmpty()
cli@37
   120
	{
cli@37
   121
		synchronized (outputBuffers) {
cli@37
   122
			return outputBuffers.isEmpty();
cli@37
   123
		}
cli@37
   124
	}
chris@1
   125
cli@37
   126
	/**
cli@37
   127
	 * Goes through the input buffer of the given channel and searches
cli@37
   128
	 * for next line terminator. If a '\n' is found, the bytes up to the
cli@37
   129
	 * line terminator are returned as array of bytes (the line terminator
cli@37
   130
	 * is omitted). If none is found the method returns null.
cli@37
   131
	 * @param channel
cli@37
   132
	 * @return A ByteBuffer wrapping the line.
cli@37
   133
	 */
cli@37
   134
	ByteBuffer nextInputLine()
cli@37
   135
	{
cli@37
   136
		if (inputBuffer == null) {
cli@37
   137
			return null;
cli@37
   138
		}
chris@1
   139
cli@37
   140
		synchronized (inputBuffer) {
cli@37
   141
			ByteBuffer buffer = inputBuffer;
cli@37
   142
cli@37
   143
			// Mark the current write position
cli@37
   144
			int mark = buffer.position();
cli@37
   145
cli@37
   146
			// Set position to 0 and limit to current position
cli@37
   147
			buffer.flip();
cli@37
   148
cli@37
   149
			ByteBuffer lineBuffer = newLineBuffer();
cli@37
   150
cli@37
   151
			while (buffer.position() < buffer.limit()) {
cli@37
   152
				byte b = buffer.get();
cli@37
   153
				if (b == 10) // '\n'
cli@37
   154
				{
cli@37
   155
					// The bytes between the buffer's current position and its limit,
cli@37
   156
					// if any, are copied to the beginning of the buffer. That is, the
cli@37
   157
					// byte at index p = position() is copied to index zero, the byte at
cli@37
   158
					// index p + 1 is copied to index one, and so forth until the byte
cli@37
   159
					// at index limit() - 1 is copied to index n = limit() - 1 - p.
cli@37
   160
					// The buffer's position is then set to n+1 and its limit is set to
cli@37
   161
					// its capacity.
cli@37
   162
					buffer.compact();
cli@37
   163
cli@37
   164
					lineBuffer.flip(); // limit to position, position to 0
cli@37
   165
					return lineBuffer;
cli@37
   166
				} else {
cli@37
   167
					lineBuffer.put(b);
cli@37
   168
				}
cli@37
   169
			}
cli@37
   170
cli@37
   171
			buffer.limit(BUFFER_SIZE);
cli@37
   172
			buffer.position(mark);
cli@37
   173
cli@37
   174
			if (buffer.hasRemaining()) {
cli@37
   175
				return null;
cli@37
   176
			} else {
cli@37
   177
				// In the first 512 was no newline found, so the input is not standard
cli@37
   178
				// compliant. We return the current buffer as new line and add a space
cli@37
   179
				// to the beginning of the next line which corrects some overlong header
cli@37
   180
				// lines.
cli@37
   181
				inputBuffer = newLineBuffer();
cli@37
   182
				inputBuffer.put((byte) ' ');
cli@37
   183
				buffer.flip();
cli@37
   184
				return buffer;
cli@37
   185
			}
cli@37
   186
		}
cli@37
   187
	}
cli@37
   188
cli@37
   189
	/**
cli@37
   190
	 * Returns a at least 512 bytes long ByteBuffer ready for usage.
cli@37
   191
	 * The method first try to reuse an already allocated (cached) buffer but
cli@37
   192
	 * if that fails returns a newly allocated direct buffer.
cli@37
   193
	 * Use recycleBuffer() method when you do not longer use the allocated buffer.
cli@37
   194
	 */
cli@37
   195
	static ByteBuffer newLineBuffer()
cli@37
   196
	{
cli@37
   197
		ByteBuffer buf = null;
cli@37
   198
		synchronized (freeSmallBuffers) {
cli@37
   199
			if (!freeSmallBuffers.isEmpty()) {
cli@37
   200
				buf = freeSmallBuffers.remove(0);
cli@37
   201
			}
cli@37
   202
		}
cli@37
   203
cli@37
   204
		if (buf == null) {
cli@37
   205
			// Allocate a non-direct buffer
cli@37
   206
			buf = ByteBuffer.allocate(BUFFER_SIZE);
cli@37
   207
		}
cli@37
   208
cli@37
   209
		assert buf.position() == 0;
cli@37
   210
		assert buf.limit() >= BUFFER_SIZE;
cli@37
   211
cli@37
   212
		return buf;
cli@37
   213
	}
cli@37
   214
cli@37
   215
	/**
cli@37
   216
	 * Adds the given buffer to the list of free buffers if it is a valuable
cli@37
   217
	 * direct allocated buffer.
cli@37
   218
	 * @param buffer
cli@37
   219
	 */
cli@37
   220
	public static void recycleBuffer(ByteBuffer buffer)
cli@37
   221
	{
cli@37
   222
		assert buffer != null;
cli@37
   223
cli@37
   224
		if (buffer.isDirect()) {
cli@37
   225
			assert buffer.capacity() >= BUFFER_SIZE;
cli@37
   226
cli@37
   227
			// Add old buffers to the list of free buffers
cli@37
   228
			synchronized (freeSmallBuffers) {
cli@37
   229
				buffer.clear(); // Set position to 0 and limit to capacity
cli@37
   230
				freeSmallBuffers.add(buffer);
cli@37
   231
			}
cli@37
   232
		} // if(buffer.isDirect())
cli@37
   233
	}
cli@37
   234
cli@37
   235
	/**
cli@37
   236
	 * Recycles all buffers of this ChannelLineBuffers object.
cli@37
   237
	 */
cli@37
   238
	public void recycleBuffers()
cli@37
   239
	{
cli@37
   240
		synchronized (inputBuffer) {
cli@37
   241
			recycleBuffer(inputBuffer);
cli@37
   242
			this.inputBuffer = null;
cli@37
   243
		}
cli@37
   244
cli@37
   245
		synchronized (outputBuffers) {
cli@37
   246
			for (ByteBuffer buf : outputBuffers) {
cli@37
   247
				recycleBuffer(buf);
cli@37
   248
			}
cli@37
   249
			outputBuffers = null;
cli@37
   250
		}
cli@37
   251
	}
chris@1
   252
}