chris@1: /* chris@1: * SONEWS News Server chris@1: * see AUTHORS for the list of contributors chris@1: * chris@1: * This program is free software: you can redistribute it and/or modify chris@1: * it under the terms of the GNU General Public License as published by chris@1: * the Free Software Foundation, either version 3 of the License, or chris@1: * (at your option) any later version. chris@1: * chris@1: * This program is distributed in the hope that it will be useful, chris@1: * but WITHOUT ANY WARRANTY; without even the implied warranty of chris@1: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the chris@1: * GNU General Public License for more details. chris@1: * chris@1: * You should have received a copy of the GNU General Public License chris@1: * along with this program. If not, see . chris@1: */ chris@1: chris@1: package org.sonews.daemon; chris@1: chris@1: import java.nio.ByteBuffer; chris@1: import java.nio.channels.ClosedChannelException; chris@1: import java.util.ArrayList; chris@1: import java.util.List; chris@1: chris@1: /** chris@1: * Class holding ByteBuffers for SocketChannels/NNTPConnection. chris@1: * Due to the complex nature of AIO/NIO we must properly handle the line chris@1: * buffers for the input and output of the SocketChannels. chris@1: * @author Christian Lins chris@1: * @since sonews/0.5.0 chris@1: */ chris@1: public class ChannelLineBuffers chris@1: { chris@1: chris@1: /** chris@1: * Size of one small buffer; chris@1: * per default this is 512 bytes to fit one standard line. chris@1: */ chris@1: public static final int BUFFER_SIZE = 512; chris@1: chris@1: private static int maxCachedBuffers = 2048; // Cached buffers maximum chris@1: chris@1: private static final List freeSmallBuffers chris@1: = new ArrayList(maxCachedBuffers); chris@1: chris@1: /** chris@1: * Allocates a predefined number of direct ByteBuffers (allocated via chris@1: * ByteBuffer.allocateDirect()). This method is Thread-safe, but should only chris@1: * called at startup. chris@1: */ chris@1: public static void allocateDirect() chris@1: { chris@1: synchronized(freeSmallBuffers) chris@1: { chris@1: for(int n = 0; n < maxCachedBuffers; n++) chris@1: { chris@1: ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE); chris@1: freeSmallBuffers.add(buffer); chris@1: } chris@1: } chris@1: } chris@1: chris@1: private ByteBuffer inputBuffer = newLineBuffer(); chris@1: private List outputBuffers = new ArrayList(); chris@1: chris@1: /** chris@1: * Add the given ByteBuffer to the list of buffers to be send to the client. chris@1: * This method is Thread-safe. chris@1: * @param buffer chris@1: * @throws java.nio.channels.ClosedChannelException If the client channel was chris@1: * already closed. chris@1: */ chris@1: public void addOutputBuffer(ByteBuffer buffer) chris@1: throws ClosedChannelException chris@1: { chris@1: if(outputBuffers == null) chris@1: { chris@1: throw new ClosedChannelException(); chris@1: } chris@1: chris@1: synchronized(outputBuffers) chris@1: { chris@1: outputBuffers.add(buffer); chris@1: } chris@1: } chris@1: chris@1: /** chris@1: * Currently a channel has only one input buffer. This *may* be a bottleneck chris@1: * and should investigated in the future. chris@1: * @param channel chris@1: * @return The input buffer associated with given channel. chris@1: */ chris@1: public ByteBuffer getInputBuffer() chris@1: { chris@1: return inputBuffer; chris@1: } chris@1: chris@1: /** chris@1: * Returns the current output buffer for writing(!) to SocketChannel. chris@1: * @param channel chris@1: * @return The next input buffer that contains unprocessed data or null chris@1: * if the connection was closed or there are no more unprocessed buffers. chris@1: */ chris@1: public ByteBuffer getOutputBuffer() chris@1: { chris@1: synchronized(outputBuffers) chris@1: { chris@1: if(outputBuffers == null || outputBuffers.isEmpty()) chris@1: { chris@1: return null; chris@1: } chris@1: else chris@1: { chris@1: ByteBuffer buffer = outputBuffers.get(0); chris@1: if(buffer.remaining() == 0) chris@1: { chris@1: outputBuffers.remove(0); chris@1: // Add old buffers to the list of free buffers chris@1: recycleBuffer(buffer); chris@1: buffer = getOutputBuffer(); chris@1: } chris@1: return buffer; chris@1: } chris@1: } chris@1: } chris@1: chris@1: /** chris@1: * Goes through the input buffer of the given channel and searches chris@1: * for next line terminator. If a '\n' is found, the bytes up to the chris@1: * line terminator are returned as array of bytes (the line terminator chris@1: * is omitted). If none is found the method returns null. chris@1: * @param channel chris@1: * @return A ByteBuffer wrapping the line. chris@1: */ chris@1: ByteBuffer nextInputLine() chris@1: { chris@1: if(inputBuffer == null) chris@1: { chris@1: return null; chris@1: } chris@1: chris@1: synchronized(inputBuffer) chris@1: { chris@1: ByteBuffer buffer = inputBuffer; chris@1: chris@1: // Mark the current write position chris@1: int mark = buffer.position(); chris@1: chris@1: // Set position to 0 and limit to current position chris@1: buffer.flip(); chris@1: chris@1: ByteBuffer lineBuffer = newLineBuffer(); chris@1: chris@1: while (buffer.position() < buffer.limit()) chris@1: { chris@1: byte b = buffer.get(); chris@1: if (b == 10) // '\n' chris@1: { chris@1: // The bytes between the buffer's current position and its limit, chris@1: // if any, are copied to the beginning of the buffer. That is, the chris@1: // byte at index p = position() is copied to index zero, the byte at chris@1: // index p + 1 is copied to index one, and so forth until the byte chris@1: // at index limit() - 1 is copied to index n = limit() - 1 - p. chris@1: // The buffer's position is then set to n+1 and its limit is set to chris@1: // its capacity. chris@1: buffer.compact(); chris@1: chris@1: lineBuffer.flip(); // limit to position, position to 0 chris@1: return lineBuffer; chris@1: } chris@1: else chris@1: { chris@1: lineBuffer.put(b); chris@1: } chris@1: } chris@1: chris@1: buffer.limit(BUFFER_SIZE); chris@1: buffer.position(mark); chris@1: chris@1: if(buffer.hasRemaining()) chris@1: { chris@1: return null; chris@1: } chris@1: else chris@1: { chris@1: // In the first 512 was no newline found, so the input is not standard chris@1: // compliant. We return the current buffer as new line and add a space chris@1: // to the beginning of the next line which corrects some overlong header chris@1: // lines. chris@1: inputBuffer = newLineBuffer(); chris@1: inputBuffer.put((byte)' '); chris@1: buffer.flip(); chris@1: return buffer; chris@1: } chris@1: } chris@1: } chris@1: chris@1: /** chris@1: * Returns a at least 512 bytes long ByteBuffer ready for usage. chris@1: * The method first try to reuse an already allocated (cached) buffer but chris@1: * if that fails returns a newly allocated direct buffer. chris@1: * Use recycleBuffer() method when you do not longer use the allocated buffer. chris@1: */ chris@1: static ByteBuffer newLineBuffer() chris@1: { chris@1: ByteBuffer buf = null; chris@1: synchronized(freeSmallBuffers) chris@1: { chris@1: if(!freeSmallBuffers.isEmpty()) chris@1: { chris@1: buf = freeSmallBuffers.remove(0); chris@1: } chris@1: } chris@1: chris@1: if(buf == null) chris@1: { chris@1: // Allocate a non-direct buffer chris@1: buf = ByteBuffer.allocate(BUFFER_SIZE); chris@1: } chris@1: chris@1: assert buf.position() == 0; chris@1: assert buf.limit() >= BUFFER_SIZE; chris@1: chris@1: return buf; chris@1: } chris@1: chris@1: /** chris@1: * Adds the given buffer to the list of free buffers if it is a valuable chris@1: * direct allocated buffer. chris@1: * @param buffer chris@1: */ chris@1: public static void recycleBuffer(ByteBuffer buffer) chris@1: { chris@1: assert buffer != null; chris@1: chris@1: if(buffer.isDirect()) chris@1: { chris@3: assert buffer.capacity() >= BUFFER_SIZE; chris@3: chris@1: // Add old buffers to the list of free buffers chris@1: synchronized(freeSmallBuffers) chris@1: { chris@1: buffer.clear(); // Set position to 0 and limit to capacity chris@1: freeSmallBuffers.add(buffer); chris@1: } chris@1: } // if(buffer.isDirect()) chris@1: } chris@1: chris@1: /** chris@1: * Recycles all buffers of this ChannelLineBuffers object. chris@1: */ chris@1: public void recycleBuffers() chris@1: { chris@1: synchronized(inputBuffer) chris@1: { chris@1: recycleBuffer(inputBuffer); chris@1: this.inputBuffer = null; chris@1: } chris@1: chris@1: synchronized(outputBuffers) chris@1: { chris@1: for(ByteBuffer buf : outputBuffers) chris@1: { chris@1: recycleBuffer(buf); chris@1: } chris@1: outputBuffers = null; chris@1: } chris@1: } chris@1: chris@1: }