diff -r dd05c3f2fa24 -r c404a87db5b7 src/org/sonews/daemon/ChannelLineBuffers.java
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/org/sonews/daemon/ChannelLineBuffers.java Sun Aug 29 17:43:58 2010 +0200
@@ -0,0 +1,283 @@
+/*
+ * SONEWS News Server
+ * see AUTHORS for the list of contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package org.sonews.daemon;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Class holding ByteBuffers for SocketChannels/NNTPConnection.
+ * Due to the complex nature of AIO/NIO we must properly handle the line
+ * buffers for the input and output of the SocketChannels.
+ * @author Christian Lins
+ * @since sonews/0.5.0
+ */
+public class ChannelLineBuffers
+{
+
+ /**
+ * Size of one small buffer;
+ * per default this is 512 bytes to fit one standard line.
+ */
+ public static final int BUFFER_SIZE = 512;
+
+ private static int maxCachedBuffers = 2048; // Cached buffers maximum
+
+ private static final List freeSmallBuffers
+ = new ArrayList(maxCachedBuffers);
+
+ /**
+ * Allocates a predefined number of direct ByteBuffers (allocated via
+ * ByteBuffer.allocateDirect()). This method is Thread-safe, but should only
+ * called at startup.
+ */
+ public static void allocateDirect()
+ {
+ synchronized(freeSmallBuffers)
+ {
+ for(int n = 0; n < maxCachedBuffers; n++)
+ {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
+ freeSmallBuffers.add(buffer);
+ }
+ }
+ }
+
+ private ByteBuffer inputBuffer = newLineBuffer();
+ private List outputBuffers = new ArrayList();
+
+ /**
+ * Add the given ByteBuffer to the list of buffers to be send to the client.
+ * This method is Thread-safe.
+ * @param buffer
+ * @throws java.nio.channels.ClosedChannelException If the client channel was
+ * already closed.
+ */
+ public void addOutputBuffer(ByteBuffer buffer)
+ throws ClosedChannelException
+ {
+ if(outputBuffers == null)
+ {
+ throw new ClosedChannelException();
+ }
+
+ synchronized(outputBuffers)
+ {
+ outputBuffers.add(buffer);
+ }
+ }
+
+ /**
+ * Currently a channel has only one input buffer. This *may* be a bottleneck
+ * and should investigated in the future.
+ * @param channel
+ * @return The input buffer associated with given channel.
+ */
+ public ByteBuffer getInputBuffer()
+ {
+ return inputBuffer;
+ }
+
+ /**
+ * Returns the current output buffer for writing(!) to SocketChannel.
+ * @param channel
+ * @return The next input buffer that contains unprocessed data or null
+ * if the connection was closed or there are no more unprocessed buffers.
+ */
+ public ByteBuffer getOutputBuffer()
+ {
+ synchronized(outputBuffers)
+ {
+ if(outputBuffers == null || outputBuffers.isEmpty())
+ {
+ return null;
+ }
+ else
+ {
+ ByteBuffer buffer = outputBuffers.get(0);
+ if(buffer.remaining() == 0)
+ {
+ outputBuffers.remove(0);
+ // Add old buffers to the list of free buffers
+ recycleBuffer(buffer);
+ buffer = getOutputBuffer();
+ }
+ return buffer;
+ }
+ }
+ }
+
+ /**
+ * @return false if there are output buffers pending to be written to the
+ * client.
+ */
+ boolean isOutputBufferEmpty()
+ {
+ synchronized(outputBuffers)
+ {
+ return outputBuffers.isEmpty();
+ }
+ }
+
+ /**
+ * Goes through the input buffer of the given channel and searches
+ * for next line terminator. If a '\n' is found, the bytes up to the
+ * line terminator are returned as array of bytes (the line terminator
+ * is omitted). If none is found the method returns null.
+ * @param channel
+ * @return A ByteBuffer wrapping the line.
+ */
+ ByteBuffer nextInputLine()
+ {
+ if(inputBuffer == null)
+ {
+ return null;
+ }
+
+ synchronized(inputBuffer)
+ {
+ ByteBuffer buffer = inputBuffer;
+
+ // Mark the current write position
+ int mark = buffer.position();
+
+ // Set position to 0 and limit to current position
+ buffer.flip();
+
+ ByteBuffer lineBuffer = newLineBuffer();
+
+ while (buffer.position() < buffer.limit())
+ {
+ byte b = buffer.get();
+ if (b == 10) // '\n'
+ {
+ // The bytes between the buffer's current position and its limit,
+ // if any, are copied to the beginning of the buffer. That is, the
+ // byte at index p = position() is copied to index zero, the byte at
+ // index p + 1 is copied to index one, and so forth until the byte
+ // at index limit() - 1 is copied to index n = limit() - 1 - p.
+ // The buffer's position is then set to n+1 and its limit is set to
+ // its capacity.
+ buffer.compact();
+
+ lineBuffer.flip(); // limit to position, position to 0
+ return lineBuffer;
+ }
+ else
+ {
+ lineBuffer.put(b);
+ }
+ }
+
+ buffer.limit(BUFFER_SIZE);
+ buffer.position(mark);
+
+ if(buffer.hasRemaining())
+ {
+ return null;
+ }
+ else
+ {
+ // In the first 512 was no newline found, so the input is not standard
+ // compliant. We return the current buffer as new line and add a space
+ // to the beginning of the next line which corrects some overlong header
+ // lines.
+ inputBuffer = newLineBuffer();
+ inputBuffer.put((byte)' ');
+ buffer.flip();
+ return buffer;
+ }
+ }
+ }
+
+ /**
+ * Returns a at least 512 bytes long ByteBuffer ready for usage.
+ * The method first try to reuse an already allocated (cached) buffer but
+ * if that fails returns a newly allocated direct buffer.
+ * Use recycleBuffer() method when you do not longer use the allocated buffer.
+ */
+ static ByteBuffer newLineBuffer()
+ {
+ ByteBuffer buf = null;
+ synchronized(freeSmallBuffers)
+ {
+ if(!freeSmallBuffers.isEmpty())
+ {
+ buf = freeSmallBuffers.remove(0);
+ }
+ }
+
+ if(buf == null)
+ {
+ // Allocate a non-direct buffer
+ buf = ByteBuffer.allocate(BUFFER_SIZE);
+ }
+
+ assert buf.position() == 0;
+ assert buf.limit() >= BUFFER_SIZE;
+
+ return buf;
+ }
+
+ /**
+ * Adds the given buffer to the list of free buffers if it is a valuable
+ * direct allocated buffer.
+ * @param buffer
+ */
+ public static void recycleBuffer(ByteBuffer buffer)
+ {
+ assert buffer != null;
+
+ if(buffer.isDirect())
+ {
+ assert buffer.capacity() >= BUFFER_SIZE;
+
+ // Add old buffers to the list of free buffers
+ synchronized(freeSmallBuffers)
+ {
+ buffer.clear(); // Set position to 0 and limit to capacity
+ freeSmallBuffers.add(buffer);
+ }
+ } // if(buffer.isDirect())
+ }
+
+ /**
+ * Recycles all buffers of this ChannelLineBuffers object.
+ */
+ public void recycleBuffers()
+ {
+ synchronized(inputBuffer)
+ {
+ recycleBuffer(inputBuffer);
+ this.inputBuffer = null;
+ }
+
+ synchronized(outputBuffers)
+ {
+ for(ByteBuffer buf : outputBuffers)
+ {
+ recycleBuffer(buf);
+ }
+ outputBuffers = null;
+ }
+ }
+
+}