src/org/sonews/daemon/ChannelLineBuffers.java
author cli
Sun Sep 11 17:01:19 2011 +0200 (2011-09-11)
changeset 49 8df94bfd3e2f
parent 35 ed84c8bdd87b
permissions -rwxr-xr-x
Fix for #14
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.daemon;
    20 
    21 import java.nio.ByteBuffer;
    22 import java.nio.channels.ClosedChannelException;
    23 import java.util.ArrayList;
    24 import java.util.List;
    25 
    26 /**
    27  * Class holding ByteBuffers for SocketChannels/NNTPConnection.
    28  * Due to the complex nature of AIO/NIO we must properly handle the line 
    29  * buffers for the input and output of the SocketChannels.
    30  * @author Christian Lins
    31  * @since sonews/0.5.0
    32  */
    33 public class ChannelLineBuffers
    34 {
    35 
    36 	/**
    37 	 * Size of one small buffer;
    38 	 * per default this is 512 bytes to fit one standard line.
    39 	 */
    40 	public static final int BUFFER_SIZE = 512;
    41 	private static int maxCachedBuffers = 2048; // Cached buffers maximum
    42 	private static final List<ByteBuffer> freeSmallBuffers = new ArrayList<ByteBuffer>(maxCachedBuffers);
    43 
    44 	/**
    45 	 * Allocates a predefined number of direct ByteBuffers (allocated via
    46 	 * ByteBuffer.allocateDirect()). This method is Thread-safe, but should only
    47 	 * called at startup.
    48 	 */
    49 	public static void allocateDirect()
    50 	{
    51 		synchronized (freeSmallBuffers) {
    52 			for (int n = 0; n < maxCachedBuffers; n++) {
    53 				ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
    54 				freeSmallBuffers.add(buffer);
    55 			}
    56 		}
    57 	}
    58 	private ByteBuffer inputBuffer = newLineBuffer();
    59 	private List<ByteBuffer> outputBuffers = new ArrayList<ByteBuffer>();
    60 
    61 	/**
    62 	 * Add the given ByteBuffer to the list of buffers to be send to the client.
    63 	 * This method is Thread-safe.
    64 	 * @param buffer
    65 	 * @throws java.nio.channels.ClosedChannelException If the client channel was
    66 	 * already closed.
    67 	 */
    68 	public void addOutputBuffer(ByteBuffer buffer)
    69 		throws ClosedChannelException
    70 	{
    71 		if (outputBuffers == null) {
    72 			throw new ClosedChannelException();
    73 		}
    74 
    75 		synchronized (outputBuffers) {
    76 			outputBuffers.add(buffer);
    77 		}
    78 	}
    79 
    80 	/**
    81 	 * Currently a channel has only one input buffer. This *may* be a bottleneck
    82 	 * and should investigated in the future.
    83 	 * @param channel
    84 	 * @return The input buffer associated with given channel.
    85 	 */
    86 	public ByteBuffer getInputBuffer()
    87 	{
    88 		return inputBuffer;
    89 	}
    90 
    91 	/**
    92 	 * Returns the current output buffer for writing(!) to SocketChannel.
    93 	 * @param channel
    94 	 * @return The next input buffer that contains unprocessed data or null
    95 	 * if the connection was closed or there are no more unprocessed buffers.
    96 	 */
    97 	public ByteBuffer getOutputBuffer()
    98 	{
    99 		synchronized (outputBuffers) {
   100 			if (outputBuffers == null || outputBuffers.isEmpty()) {
   101 				return null;
   102 			} else {
   103 				ByteBuffer buffer = outputBuffers.get(0);
   104 				if (buffer.remaining() == 0) {
   105 					outputBuffers.remove(0);
   106 					// Add old buffers to the list of free buffers
   107 					recycleBuffer(buffer);
   108 					buffer = getOutputBuffer();
   109 				}
   110 				return buffer;
   111 			}
   112 		}
   113 	}
   114 
   115 	/**
   116 	 * @return false if there are output buffers pending to be written to the
   117 	 * client.
   118 	 */
   119 	boolean isOutputBufferEmpty()
   120 	{
   121 		synchronized (outputBuffers) {
   122 			return outputBuffers.isEmpty();
   123 		}
   124 	}
   125 
   126 	/**
   127 	 * Goes through the input buffer of the given channel and searches
   128 	 * for next line terminator. If a '\n' is found, the bytes up to the
   129 	 * line terminator are returned as array of bytes (the line terminator
   130 	 * is omitted). If none is found the method returns null.
   131 	 * @param channel
   132 	 * @return A ByteBuffer wrapping the line.
   133 	 */
   134 	ByteBuffer nextInputLine()
   135 	{
   136 		if (inputBuffer == null) {
   137 			return null;
   138 		}
   139 
   140 		synchronized (inputBuffer) {
   141 			ByteBuffer buffer = inputBuffer;
   142 
   143 			// Mark the current write position
   144 			int mark = buffer.position();
   145 
   146 			// Set position to 0 and limit to current position
   147 			buffer.flip();
   148 
   149 			ByteBuffer lineBuffer = newLineBuffer();
   150 
   151 			while (buffer.position() < buffer.limit()) {
   152 				byte b = buffer.get();
   153 				if (b == 10) // '\n'
   154 				{
   155 					// The bytes between the buffer's current position and its limit,
   156 					// if any, are copied to the beginning of the buffer. That is, the
   157 					// byte at index p = position() is copied to index zero, the byte at
   158 					// index p + 1 is copied to index one, and so forth until the byte
   159 					// at index limit() - 1 is copied to index n = limit() - 1 - p.
   160 					// The buffer's position is then set to n+1 and its limit is set to
   161 					// its capacity.
   162 					buffer.compact();
   163 
   164 					lineBuffer.flip(); // limit to position, position to 0
   165 					return lineBuffer;
   166 				} else {
   167 					lineBuffer.put(b);
   168 				}
   169 			}
   170 
   171 			buffer.limit(BUFFER_SIZE);
   172 			buffer.position(mark);
   173 
   174 			if (buffer.hasRemaining()) {
   175 				return null;
   176 			} else {
   177 				// In the first 512 was no newline found, so the input is not standard
   178 				// compliant. We return the current buffer as new line and add a space
   179 				// to the beginning of the next line which corrects some overlong header
   180 				// lines.
   181 				inputBuffer = newLineBuffer();
   182 				inputBuffer.put((byte) ' ');
   183 				buffer.flip();
   184 				return buffer;
   185 			}
   186 		}
   187 	}
   188 
   189 	/**
   190 	 * Returns a at least 512 bytes long ByteBuffer ready for usage.
   191 	 * The method first try to reuse an already allocated (cached) buffer but
   192 	 * if that fails returns a newly allocated direct buffer.
   193 	 * Use recycleBuffer() method when you do not longer use the allocated buffer.
   194 	 */
   195 	static ByteBuffer newLineBuffer()
   196 	{
   197 		ByteBuffer buf = null;
   198 		synchronized (freeSmallBuffers) {
   199 			if (!freeSmallBuffers.isEmpty()) {
   200 				buf = freeSmallBuffers.remove(0);
   201 			}
   202 		}
   203 
   204 		if (buf == null) {
   205 			// Allocate a non-direct buffer
   206 			buf = ByteBuffer.allocate(BUFFER_SIZE);
   207 		}
   208 
   209 		assert buf.position() == 0;
   210 		assert buf.limit() >= BUFFER_SIZE;
   211 
   212 		return buf;
   213 	}
   214 
   215 	/**
   216 	 * Adds the given buffer to the list of free buffers if it is a valuable
   217 	 * direct allocated buffer.
   218 	 * @param buffer
   219 	 */
   220 	public static void recycleBuffer(ByteBuffer buffer)
   221 	{
   222 		assert buffer != null;
   223 
   224 		if (buffer.isDirect()) {
   225 			assert buffer.capacity() >= BUFFER_SIZE;
   226 
   227 			// Add old buffers to the list of free buffers
   228 			synchronized (freeSmallBuffers) {
   229 				buffer.clear(); // Set position to 0 and limit to capacity
   230 				freeSmallBuffers.add(buffer);
   231 			}
   232 		} // if(buffer.isDirect())
   233 	}
   234 
   235 	/**
   236 	 * Recycles all buffers of this ChannelLineBuffers object.
   237 	 */
   238 	public void recycleBuffers()
   239 	{
   240 		synchronized (inputBuffer) {
   241 			recycleBuffer(inputBuffer);
   242 			this.inputBuffer = null;
   243 		}
   244 
   245 		synchronized (outputBuffers) {
   246 			for (ByteBuffer buf : outputBuffers) {
   247 				recycleBuffer(buf);
   248 			}
   249 			outputBuffers = null;
   250 		}
   251 	}
   252 }