diff -r dd05c3f2fa24 -r c404a87db5b7 src/org/sonews/daemon/ChannelLineBuffers.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/org/sonews/daemon/ChannelLineBuffers.java Sun Aug 29 17:43:58 2010 +0200 @@ -0,0 +1,283 @@ +/* + * SONEWS News Server + * see AUTHORS for the list of contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package org.sonews.daemon; + +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.List; + +/** + * Class holding ByteBuffers for SocketChannels/NNTPConnection. + * Due to the complex nature of AIO/NIO we must properly handle the line + * buffers for the input and output of the SocketChannels. + * @author Christian Lins + * @since sonews/0.5.0 + */ +public class ChannelLineBuffers +{ + + /** + * Size of one small buffer; + * per default this is 512 bytes to fit one standard line. + */ + public static final int BUFFER_SIZE = 512; + + private static int maxCachedBuffers = 2048; // Cached buffers maximum + + private static final List freeSmallBuffers + = new ArrayList(maxCachedBuffers); + + /** + * Allocates a predefined number of direct ByteBuffers (allocated via + * ByteBuffer.allocateDirect()). This method is Thread-safe, but should only + * called at startup. + */ + public static void allocateDirect() + { + synchronized(freeSmallBuffers) + { + for(int n = 0; n < maxCachedBuffers; n++) + { + ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE); + freeSmallBuffers.add(buffer); + } + } + } + + private ByteBuffer inputBuffer = newLineBuffer(); + private List outputBuffers = new ArrayList(); + + /** + * Add the given ByteBuffer to the list of buffers to be send to the client. + * This method is Thread-safe. + * @param buffer + * @throws java.nio.channels.ClosedChannelException If the client channel was + * already closed. + */ + public void addOutputBuffer(ByteBuffer buffer) + throws ClosedChannelException + { + if(outputBuffers == null) + { + throw new ClosedChannelException(); + } + + synchronized(outputBuffers) + { + outputBuffers.add(buffer); + } + } + + /** + * Currently a channel has only one input buffer. This *may* be a bottleneck + * and should investigated in the future. + * @param channel + * @return The input buffer associated with given channel. + */ + public ByteBuffer getInputBuffer() + { + return inputBuffer; + } + + /** + * Returns the current output buffer for writing(!) to SocketChannel. + * @param channel + * @return The next input buffer that contains unprocessed data or null + * if the connection was closed or there are no more unprocessed buffers. + */ + public ByteBuffer getOutputBuffer() + { + synchronized(outputBuffers) + { + if(outputBuffers == null || outputBuffers.isEmpty()) + { + return null; + } + else + { + ByteBuffer buffer = outputBuffers.get(0); + if(buffer.remaining() == 0) + { + outputBuffers.remove(0); + // Add old buffers to the list of free buffers + recycleBuffer(buffer); + buffer = getOutputBuffer(); + } + return buffer; + } + } + } + + /** + * @return false if there are output buffers pending to be written to the + * client. + */ + boolean isOutputBufferEmpty() + { + synchronized(outputBuffers) + { + return outputBuffers.isEmpty(); + } + } + + /** + * Goes through the input buffer of the given channel and searches + * for next line terminator. If a '\n' is found, the bytes up to the + * line terminator are returned as array of bytes (the line terminator + * is omitted). If none is found the method returns null. + * @param channel + * @return A ByteBuffer wrapping the line. + */ + ByteBuffer nextInputLine() + { + if(inputBuffer == null) + { + return null; + } + + synchronized(inputBuffer) + { + ByteBuffer buffer = inputBuffer; + + // Mark the current write position + int mark = buffer.position(); + + // Set position to 0 and limit to current position + buffer.flip(); + + ByteBuffer lineBuffer = newLineBuffer(); + + while (buffer.position() < buffer.limit()) + { + byte b = buffer.get(); + if (b == 10) // '\n' + { + // The bytes between the buffer's current position and its limit, + // if any, are copied to the beginning of the buffer. That is, the + // byte at index p = position() is copied to index zero, the byte at + // index p + 1 is copied to index one, and so forth until the byte + // at index limit() - 1 is copied to index n = limit() - 1 - p. + // The buffer's position is then set to n+1 and its limit is set to + // its capacity. + buffer.compact(); + + lineBuffer.flip(); // limit to position, position to 0 + return lineBuffer; + } + else + { + lineBuffer.put(b); + } + } + + buffer.limit(BUFFER_SIZE); + buffer.position(mark); + + if(buffer.hasRemaining()) + { + return null; + } + else + { + // In the first 512 was no newline found, so the input is not standard + // compliant. We return the current buffer as new line and add a space + // to the beginning of the next line which corrects some overlong header + // lines. + inputBuffer = newLineBuffer(); + inputBuffer.put((byte)' '); + buffer.flip(); + return buffer; + } + } + } + + /** + * Returns a at least 512 bytes long ByteBuffer ready for usage. + * The method first try to reuse an already allocated (cached) buffer but + * if that fails returns a newly allocated direct buffer. + * Use recycleBuffer() method when you do not longer use the allocated buffer. + */ + static ByteBuffer newLineBuffer() + { + ByteBuffer buf = null; + synchronized(freeSmallBuffers) + { + if(!freeSmallBuffers.isEmpty()) + { + buf = freeSmallBuffers.remove(0); + } + } + + if(buf == null) + { + // Allocate a non-direct buffer + buf = ByteBuffer.allocate(BUFFER_SIZE); + } + + assert buf.position() == 0; + assert buf.limit() >= BUFFER_SIZE; + + return buf; + } + + /** + * Adds the given buffer to the list of free buffers if it is a valuable + * direct allocated buffer. + * @param buffer + */ + public static void recycleBuffer(ByteBuffer buffer) + { + assert buffer != null; + + if(buffer.isDirect()) + { + assert buffer.capacity() >= BUFFER_SIZE; + + // Add old buffers to the list of free buffers + synchronized(freeSmallBuffers) + { + buffer.clear(); // Set position to 0 and limit to capacity + freeSmallBuffers.add(buffer); + } + } // if(buffer.isDirect()) + } + + /** + * Recycles all buffers of this ChannelLineBuffers object. + */ + public void recycleBuffers() + { + synchronized(inputBuffer) + { + recycleBuffer(inputBuffer); + this.inputBuffer = null; + } + + synchronized(outputBuffers) + { + for(ByteBuffer buf : outputBuffers) + { + recycleBuffer(buf); + } + outputBuffers = null; + } + } + +}