diff -r ed84c8bdd87b -r 74139325d305 src/org/sonews/daemon/ChannelLineBuffers.java --- a/src/org/sonews/daemon/ChannelLineBuffers.java Sun Aug 29 17:28:58 2010 +0200 +++ b/src/org/sonews/daemon/ChannelLineBuffers.java Sun Aug 29 18:17:37 2010 +0200 @@ -30,254 +30,223 @@ * @author Christian Lins * @since sonews/0.5.0 */ -public class ChannelLineBuffers +public class ChannelLineBuffers { - - /** - * Size of one small buffer; - * per default this is 512 bytes to fit one standard line. - */ - public static final int BUFFER_SIZE = 512; - - private static int maxCachedBuffers = 2048; // Cached buffers maximum - - private static final List freeSmallBuffers - = new ArrayList(maxCachedBuffers); - - /** - * Allocates a predefined number of direct ByteBuffers (allocated via - * ByteBuffer.allocateDirect()). This method is Thread-safe, but should only - * called at startup. - */ - public static void allocateDirect() - { - synchronized(freeSmallBuffers) - { - for(int n = 0; n < maxCachedBuffers; n++) - { - ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE); - freeSmallBuffers.add(buffer); - } - } - } - - private ByteBuffer inputBuffer = newLineBuffer(); - private List outputBuffers = new ArrayList(); - - /** - * Add the given ByteBuffer to the list of buffers to be send to the client. - * This method is Thread-safe. - * @param buffer - * @throws java.nio.channels.ClosedChannelException If the client channel was - * already closed. - */ - public void addOutputBuffer(ByteBuffer buffer) - throws ClosedChannelException - { - if(outputBuffers == null) - { - throw new ClosedChannelException(); - } - - synchronized(outputBuffers) - { - outputBuffers.add(buffer); - } - } - - /** - * Currently a channel has only one input buffer. This *may* be a bottleneck - * and should investigated in the future. - * @param channel - * @return The input buffer associated with given channel. - */ - public ByteBuffer getInputBuffer() - { - return inputBuffer; - } - - /** - * Returns the current output buffer for writing(!) to SocketChannel. - * @param channel - * @return The next input buffer that contains unprocessed data or null - * if the connection was closed or there are no more unprocessed buffers. - */ - public ByteBuffer getOutputBuffer() - { - synchronized(outputBuffers) - { - if(outputBuffers == null || outputBuffers.isEmpty()) - { - return null; - } - else - { - ByteBuffer buffer = outputBuffers.get(0); - if(buffer.remaining() == 0) - { - outputBuffers.remove(0); - // Add old buffers to the list of free buffers - recycleBuffer(buffer); - buffer = getOutputBuffer(); - } - return buffer; - } - } - } - /** - * @return false if there are output buffers pending to be written to the - * client. - */ - boolean isOutputBufferEmpty() - { - synchronized(outputBuffers) - { - return outputBuffers.isEmpty(); - } - } - - /** - * Goes through the input buffer of the given channel and searches - * for next line terminator. If a '\n' is found, the bytes up to the - * line terminator are returned as array of bytes (the line terminator - * is omitted). If none is found the method returns null. - * @param channel - * @return A ByteBuffer wrapping the line. - */ - ByteBuffer nextInputLine() - { - if(inputBuffer == null) - { - return null; - } - - synchronized(inputBuffer) - { - ByteBuffer buffer = inputBuffer; + /** + * Size of one small buffer; + * per default this is 512 bytes to fit one standard line. + */ + public static final int BUFFER_SIZE = 512; + private static int maxCachedBuffers = 2048; // Cached buffers maximum + private static final List freeSmallBuffers = new ArrayList(maxCachedBuffers); - // Mark the current write position - int mark = buffer.position(); + /** + * Allocates a predefined number of direct ByteBuffers (allocated via + * ByteBuffer.allocateDirect()). This method is Thread-safe, but should only + * called at startup. + */ + public static void allocateDirect() + { + synchronized (freeSmallBuffers) { + for (int n = 0; n < maxCachedBuffers; n++) { + ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE); + freeSmallBuffers.add(buffer); + } + } + } + private ByteBuffer inputBuffer = newLineBuffer(); + private List outputBuffers = new ArrayList(); - // Set position to 0 and limit to current position - buffer.flip(); + /** + * Add the given ByteBuffer to the list of buffers to be send to the client. + * This method is Thread-safe. + * @param buffer + * @throws java.nio.channels.ClosedChannelException If the client channel was + * already closed. + */ + public void addOutputBuffer(ByteBuffer buffer) + throws ClosedChannelException + { + if (outputBuffers == null) { + throw new ClosedChannelException(); + } - ByteBuffer lineBuffer = newLineBuffer(); + synchronized (outputBuffers) { + outputBuffers.add(buffer); + } + } - while (buffer.position() < buffer.limit()) - { - byte b = buffer.get(); - if (b == 10) // '\n' - { - // The bytes between the buffer's current position and its limit, - // if any, are copied to the beginning of the buffer. That is, the - // byte at index p = position() is copied to index zero, the byte at - // index p + 1 is copied to index one, and so forth until the byte - // at index limit() - 1 is copied to index n = limit() - 1 - p. - // The buffer's position is then set to n+1 and its limit is set to - // its capacity. - buffer.compact(); + /** + * Currently a channel has only one input buffer. This *may* be a bottleneck + * and should investigated in the future. + * @param channel + * @return The input buffer associated with given channel. + */ + public ByteBuffer getInputBuffer() + { + return inputBuffer; + } - lineBuffer.flip(); // limit to position, position to 0 - return lineBuffer; - } - else - { - lineBuffer.put(b); - } - } + /** + * Returns the current output buffer for writing(!) to SocketChannel. + * @param channel + * @return The next input buffer that contains unprocessed data or null + * if the connection was closed or there are no more unprocessed buffers. + */ + public ByteBuffer getOutputBuffer() + { + synchronized (outputBuffers) { + if (outputBuffers == null || outputBuffers.isEmpty()) { + return null; + } else { + ByteBuffer buffer = outputBuffers.get(0); + if (buffer.remaining() == 0) { + outputBuffers.remove(0); + // Add old buffers to the list of free buffers + recycleBuffer(buffer); + buffer = getOutputBuffer(); + } + return buffer; + } + } + } - buffer.limit(BUFFER_SIZE); - buffer.position(mark); + /** + * @return false if there are output buffers pending to be written to the + * client. + */ + boolean isOutputBufferEmpty() + { + synchronized (outputBuffers) { + return outputBuffers.isEmpty(); + } + } - if(buffer.hasRemaining()) - { - return null; - } - else - { - // In the first 512 was no newline found, so the input is not standard - // compliant. We return the current buffer as new line and add a space - // to the beginning of the next line which corrects some overlong header - // lines. - inputBuffer = newLineBuffer(); - inputBuffer.put((byte)' '); - buffer.flip(); - return buffer; - } - } - } - - /** - * Returns a at least 512 bytes long ByteBuffer ready for usage. - * The method first try to reuse an already allocated (cached) buffer but - * if that fails returns a newly allocated direct buffer. - * Use recycleBuffer() method when you do not longer use the allocated buffer. - */ - static ByteBuffer newLineBuffer() - { - ByteBuffer buf = null; - synchronized(freeSmallBuffers) - { - if(!freeSmallBuffers.isEmpty()) - { - buf = freeSmallBuffers.remove(0); - } - } - - if(buf == null) - { - // Allocate a non-direct buffer - buf = ByteBuffer.allocate(BUFFER_SIZE); - } - - assert buf.position() == 0; - assert buf.limit() >= BUFFER_SIZE; - - return buf; - } - - /** - * Adds the given buffer to the list of free buffers if it is a valuable - * direct allocated buffer. - * @param buffer - */ - public static void recycleBuffer(ByteBuffer buffer) - { - assert buffer != null; + /** + * Goes through the input buffer of the given channel and searches + * for next line terminator. If a '\n' is found, the bytes up to the + * line terminator are returned as array of bytes (the line terminator + * is omitted). If none is found the method returns null. + * @param channel + * @return A ByteBuffer wrapping the line. + */ + ByteBuffer nextInputLine() + { + if (inputBuffer == null) { + return null; + } - if(buffer.isDirect()) - { - assert buffer.capacity() >= BUFFER_SIZE; - - // Add old buffers to the list of free buffers - synchronized(freeSmallBuffers) - { - buffer.clear(); // Set position to 0 and limit to capacity - freeSmallBuffers.add(buffer); - } - } // if(buffer.isDirect()) - } - - /** - * Recycles all buffers of this ChannelLineBuffers object. - */ - public void recycleBuffers() - { - synchronized(inputBuffer) - { - recycleBuffer(inputBuffer); - this.inputBuffer = null; - } - - synchronized(outputBuffers) - { - for(ByteBuffer buf : outputBuffers) - { - recycleBuffer(buf); - } - outputBuffers = null; - } - } - + synchronized (inputBuffer) { + ByteBuffer buffer = inputBuffer; + + // Mark the current write position + int mark = buffer.position(); + + // Set position to 0 and limit to current position + buffer.flip(); + + ByteBuffer lineBuffer = newLineBuffer(); + + while (buffer.position() < buffer.limit()) { + byte b = buffer.get(); + if (b == 10) // '\n' + { + // The bytes between the buffer's current position and its limit, + // if any, are copied to the beginning of the buffer. That is, the + // byte at index p = position() is copied to index zero, the byte at + // index p + 1 is copied to index one, and so forth until the byte + // at index limit() - 1 is copied to index n = limit() - 1 - p. + // The buffer's position is then set to n+1 and its limit is set to + // its capacity. + buffer.compact(); + + lineBuffer.flip(); // limit to position, position to 0 + return lineBuffer; + } else { + lineBuffer.put(b); + } + } + + buffer.limit(BUFFER_SIZE); + buffer.position(mark); + + if (buffer.hasRemaining()) { + return null; + } else { + // In the first 512 was no newline found, so the input is not standard + // compliant. We return the current buffer as new line and add a space + // to the beginning of the next line which corrects some overlong header + // lines. + inputBuffer = newLineBuffer(); + inputBuffer.put((byte) ' '); + buffer.flip(); + return buffer; + } + } + } + + /** + * Returns a at least 512 bytes long ByteBuffer ready for usage. + * The method first try to reuse an already allocated (cached) buffer but + * if that fails returns a newly allocated direct buffer. + * Use recycleBuffer() method when you do not longer use the allocated buffer. + */ + static ByteBuffer newLineBuffer() + { + ByteBuffer buf = null; + synchronized (freeSmallBuffers) { + if (!freeSmallBuffers.isEmpty()) { + buf = freeSmallBuffers.remove(0); + } + } + + if (buf == null) { + // Allocate a non-direct buffer + buf = ByteBuffer.allocate(BUFFER_SIZE); + } + + assert buf.position() == 0; + assert buf.limit() >= BUFFER_SIZE; + + return buf; + } + + /** + * Adds the given buffer to the list of free buffers if it is a valuable + * direct allocated buffer. + * @param buffer + */ + public static void recycleBuffer(ByteBuffer buffer) + { + assert buffer != null; + + if (buffer.isDirect()) { + assert buffer.capacity() >= BUFFER_SIZE; + + // Add old buffers to the list of free buffers + synchronized (freeSmallBuffers) { + buffer.clear(); // Set position to 0 and limit to capacity + freeSmallBuffers.add(buffer); + } + } // if(buffer.isDirect()) + } + + /** + * Recycles all buffers of this ChannelLineBuffers object. + */ + public void recycleBuffers() + { + synchronized (inputBuffer) { + recycleBuffer(inputBuffer); + this.inputBuffer = null; + } + + synchronized (outputBuffers) { + for (ByteBuffer buf : outputBuffers) { + recycleBuffer(buf); + } + outputBuffers = null; + } + } }