1.1 --- a/org/sonews/daemon/ChannelLineBuffers.java Sun Aug 29 17:04:25 2010 +0200
1.2 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000
1.3 @@ -1,283 +0,0 @@
1.4 -/*
1.5 - * SONEWS News Server
1.6 - * see AUTHORS for the list of contributors
1.7 - *
1.8 - * This program is free software: you can redistribute it and/or modify
1.9 - * it under the terms of the GNU General Public License as published by
1.10 - * the Free Software Foundation, either version 3 of the License, or
1.11 - * (at your option) any later version.
1.12 - *
1.13 - * This program is distributed in the hope that it will be useful,
1.14 - * but WITHOUT ANY WARRANTY; without even the implied warranty of
1.15 - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1.16 - * GNU General Public License for more details.
1.17 - *
1.18 - * You should have received a copy of the GNU General Public License
1.19 - * along with this program. If not, see <http://www.gnu.org/licenses/>.
1.20 - */
1.21 -
1.22 -package org.sonews.daemon;
1.23 -
1.24 -import java.nio.ByteBuffer;
1.25 -import java.nio.channels.ClosedChannelException;
1.26 -import java.util.ArrayList;
1.27 -import java.util.List;
1.28 -
1.29 -/**
1.30 - * Class holding ByteBuffers for SocketChannels/NNTPConnection.
1.31 - * Due to the complex nature of AIO/NIO we must properly handle the line
1.32 - * buffers for the input and output of the SocketChannels.
1.33 - * @author Christian Lins
1.34 - * @since sonews/0.5.0
1.35 - */
1.36 -public class ChannelLineBuffers
1.37 -{
1.38 -
1.39 - /**
1.40 - * Size of one small buffer;
1.41 - * per default this is 512 bytes to fit one standard line.
1.42 - */
1.43 - public static final int BUFFER_SIZE = 512;
1.44 -
1.45 - private static int maxCachedBuffers = 2048; // Cached buffers maximum
1.46 -
1.47 - private static final List<ByteBuffer> freeSmallBuffers
1.48 - = new ArrayList<ByteBuffer>(maxCachedBuffers);
1.49 -
1.50 - /**
1.51 - * Allocates a predefined number of direct ByteBuffers (allocated via
1.52 - * ByteBuffer.allocateDirect()). This method is Thread-safe, but should only
1.53 - * called at startup.
1.54 - */
1.55 - public static void allocateDirect()
1.56 - {
1.57 - synchronized(freeSmallBuffers)
1.58 - {
1.59 - for(int n = 0; n < maxCachedBuffers; n++)
1.60 - {
1.61 - ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
1.62 - freeSmallBuffers.add(buffer);
1.63 - }
1.64 - }
1.65 - }
1.66 -
1.67 - private ByteBuffer inputBuffer = newLineBuffer();
1.68 - private List<ByteBuffer> outputBuffers = new ArrayList<ByteBuffer>();
1.69 -
1.70 - /**
1.71 - * Add the given ByteBuffer to the list of buffers to be send to the client.
1.72 - * This method is Thread-safe.
1.73 - * @param buffer
1.74 - * @throws java.nio.channels.ClosedChannelException If the client channel was
1.75 - * already closed.
1.76 - */
1.77 - public void addOutputBuffer(ByteBuffer buffer)
1.78 - throws ClosedChannelException
1.79 - {
1.80 - if(outputBuffers == null)
1.81 - {
1.82 - throw new ClosedChannelException();
1.83 - }
1.84 -
1.85 - synchronized(outputBuffers)
1.86 - {
1.87 - outputBuffers.add(buffer);
1.88 - }
1.89 - }
1.90 -
1.91 - /**
1.92 - * Currently a channel has only one input buffer. This *may* be a bottleneck
1.93 - * and should investigated in the future.
1.94 - * @param channel
1.95 - * @return The input buffer associated with given channel.
1.96 - */
1.97 - public ByteBuffer getInputBuffer()
1.98 - {
1.99 - return inputBuffer;
1.100 - }
1.101 -
1.102 - /**
1.103 - * Returns the current output buffer for writing(!) to SocketChannel.
1.104 - * @param channel
1.105 - * @return The next input buffer that contains unprocessed data or null
1.106 - * if the connection was closed or there are no more unprocessed buffers.
1.107 - */
1.108 - public ByteBuffer getOutputBuffer()
1.109 - {
1.110 - synchronized(outputBuffers)
1.111 - {
1.112 - if(outputBuffers == null || outputBuffers.isEmpty())
1.113 - {
1.114 - return null;
1.115 - }
1.116 - else
1.117 - {
1.118 - ByteBuffer buffer = outputBuffers.get(0);
1.119 - if(buffer.remaining() == 0)
1.120 - {
1.121 - outputBuffers.remove(0);
1.122 - // Add old buffers to the list of free buffers
1.123 - recycleBuffer(buffer);
1.124 - buffer = getOutputBuffer();
1.125 - }
1.126 - return buffer;
1.127 - }
1.128 - }
1.129 - }
1.130 -
1.131 - /**
1.132 - * @return false if there are output buffers pending to be written to the
1.133 - * client.
1.134 - */
1.135 - boolean isOutputBufferEmpty()
1.136 - {
1.137 - synchronized(outputBuffers)
1.138 - {
1.139 - return outputBuffers.isEmpty();
1.140 - }
1.141 - }
1.142 -
1.143 - /**
1.144 - * Goes through the input buffer of the given channel and searches
1.145 - * for next line terminator. If a '\n' is found, the bytes up to the
1.146 - * line terminator are returned as array of bytes (the line terminator
1.147 - * is omitted). If none is found the method returns null.
1.148 - * @param channel
1.149 - * @return A ByteBuffer wrapping the line.
1.150 - */
1.151 - ByteBuffer nextInputLine()
1.152 - {
1.153 - if(inputBuffer == null)
1.154 - {
1.155 - return null;
1.156 - }
1.157 -
1.158 - synchronized(inputBuffer)
1.159 - {
1.160 - ByteBuffer buffer = inputBuffer;
1.161 -
1.162 - // Mark the current write position
1.163 - int mark = buffer.position();
1.164 -
1.165 - // Set position to 0 and limit to current position
1.166 - buffer.flip();
1.167 -
1.168 - ByteBuffer lineBuffer = newLineBuffer();
1.169 -
1.170 - while (buffer.position() < buffer.limit())
1.171 - {
1.172 - byte b = buffer.get();
1.173 - if (b == 10) // '\n'
1.174 - {
1.175 - // The bytes between the buffer's current position and its limit,
1.176 - // if any, are copied to the beginning of the buffer. That is, the
1.177 - // byte at index p = position() is copied to index zero, the byte at
1.178 - // index p + 1 is copied to index one, and so forth until the byte
1.179 - // at index limit() - 1 is copied to index n = limit() - 1 - p.
1.180 - // The buffer's position is then set to n+1 and its limit is set to
1.181 - // its capacity.
1.182 - buffer.compact();
1.183 -
1.184 - lineBuffer.flip(); // limit to position, position to 0
1.185 - return lineBuffer;
1.186 - }
1.187 - else
1.188 - {
1.189 - lineBuffer.put(b);
1.190 - }
1.191 - }
1.192 -
1.193 - buffer.limit(BUFFER_SIZE);
1.194 - buffer.position(mark);
1.195 -
1.196 - if(buffer.hasRemaining())
1.197 - {
1.198 - return null;
1.199 - }
1.200 - else
1.201 - {
1.202 - // In the first 512 was no newline found, so the input is not standard
1.203 - // compliant. We return the current buffer as new line and add a space
1.204 - // to the beginning of the next line which corrects some overlong header
1.205 - // lines.
1.206 - inputBuffer = newLineBuffer();
1.207 - inputBuffer.put((byte)' ');
1.208 - buffer.flip();
1.209 - return buffer;
1.210 - }
1.211 - }
1.212 - }
1.213 -
1.214 - /**
1.215 - * Returns a at least 512 bytes long ByteBuffer ready for usage.
1.216 - * The method first try to reuse an already allocated (cached) buffer but
1.217 - * if that fails returns a newly allocated direct buffer.
1.218 - * Use recycleBuffer() method when you do not longer use the allocated buffer.
1.219 - */
1.220 - static ByteBuffer newLineBuffer()
1.221 - {
1.222 - ByteBuffer buf = null;
1.223 - synchronized(freeSmallBuffers)
1.224 - {
1.225 - if(!freeSmallBuffers.isEmpty())
1.226 - {
1.227 - buf = freeSmallBuffers.remove(0);
1.228 - }
1.229 - }
1.230 -
1.231 - if(buf == null)
1.232 - {
1.233 - // Allocate a non-direct buffer
1.234 - buf = ByteBuffer.allocate(BUFFER_SIZE);
1.235 - }
1.236 -
1.237 - assert buf.position() == 0;
1.238 - assert buf.limit() >= BUFFER_SIZE;
1.239 -
1.240 - return buf;
1.241 - }
1.242 -
1.243 - /**
1.244 - * Adds the given buffer to the list of free buffers if it is a valuable
1.245 - * direct allocated buffer.
1.246 - * @param buffer
1.247 - */
1.248 - public static void recycleBuffer(ByteBuffer buffer)
1.249 - {
1.250 - assert buffer != null;
1.251 -
1.252 - if(buffer.isDirect())
1.253 - {
1.254 - assert buffer.capacity() >= BUFFER_SIZE;
1.255 -
1.256 - // Add old buffers to the list of free buffers
1.257 - synchronized(freeSmallBuffers)
1.258 - {
1.259 - buffer.clear(); // Set position to 0 and limit to capacity
1.260 - freeSmallBuffers.add(buffer);
1.261 - }
1.262 - } // if(buffer.isDirect())
1.263 - }
1.264 -
1.265 - /**
1.266 - * Recycles all buffers of this ChannelLineBuffers object.
1.267 - */
1.268 - public void recycleBuffers()
1.269 - {
1.270 - synchronized(inputBuffer)
1.271 - {
1.272 - recycleBuffer(inputBuffer);
1.273 - this.inputBuffer = null;
1.274 - }
1.275 -
1.276 - synchronized(outputBuffers)
1.277 - {
1.278 - for(ByteBuffer buf : outputBuffers)
1.279 - {
1.280 - recycleBuffer(buf);
1.281 - }
1.282 - outputBuffers = null;
1.283 - }
1.284 - }
1.285 -
1.286 -}