1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/org/sonews/daemon/ChannelLineBuffers.java Wed Jul 01 10:48:22 2009 +0200
1.3 @@ -0,0 +1,270 @@
1.4 +/*
1.5 + * SONEWS News Server
1.6 + * see AUTHORS for the list of contributors
1.7 + *
1.8 + * This program is free software: you can redistribute it and/or modify
1.9 + * it under the terms of the GNU General Public License as published by
1.10 + * the Free Software Foundation, either version 3 of the License, or
1.11 + * (at your option) any later version.
1.12 + *
1.13 + * This program is distributed in the hope that it will be useful,
1.14 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
1.15 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1.16 + * GNU General Public License for more details.
1.17 + *
1.18 + * You should have received a copy of the GNU General Public License
1.19 + * along with this program. If not, see <http://www.gnu.org/licenses/>.
1.20 + */
1.21 +
1.22 +package org.sonews.daemon;
1.23 +
1.24 +import java.nio.ByteBuffer;
1.25 +import java.nio.channels.ClosedChannelException;
1.26 +import java.util.ArrayList;
1.27 +import java.util.List;
1.28 +
1.29 +/**
1.30 + * Class holding ByteBuffers for SocketChannels/NNTPConnection.
1.31 + * Due to the complex nature of AIO/NIO we must properly handle the line
1.32 + * buffers for the input and output of the SocketChannels.
1.33 + * @author Christian Lins
1.34 + * @since sonews/0.5.0
1.35 + */
1.36 +public class ChannelLineBuffers
1.37 +{
1.38 +
1.39 + /**
1.40 + * Size of one small buffer;
1.41 + * per default this is 512 bytes to fit one standard line.
1.42 + */
1.43 + public static final int BUFFER_SIZE = 512;
1.44 +
1.45 + private static int maxCachedBuffers = 2048; // Cached buffers maximum
1.46 +
1.47 + private static final List<ByteBuffer> freeSmallBuffers
1.48 + = new ArrayList<ByteBuffer>(maxCachedBuffers);
1.49 +
1.50 + /**
1.51 + * Allocates a predefined number of direct ByteBuffers (allocated via
1.52 + * ByteBuffer.allocateDirect()). This method is Thread-safe, but should only
1.53 + * called at startup.
1.54 + */
1.55 + public static void allocateDirect()
1.56 + {
1.57 + synchronized(freeSmallBuffers)
1.58 + {
1.59 + for(int n = 0; n < maxCachedBuffers; n++)
1.60 + {
1.61 + ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
1.62 + freeSmallBuffers.add(buffer);
1.63 + }
1.64 + }
1.65 + }
1.66 +
1.67 + private ByteBuffer inputBuffer = newLineBuffer();
1.68 + private List<ByteBuffer> outputBuffers = new ArrayList<ByteBuffer>();
1.69 +
1.70 + /**
1.71 + * Add the given ByteBuffer to the list of buffers to be send to the client.
1.72 + * This method is Thread-safe.
1.73 + * @param buffer
1.74 + * @throws java.nio.channels.ClosedChannelException If the client channel was
1.75 + * already closed.
1.76 + */
1.77 + public void addOutputBuffer(ByteBuffer buffer)
1.78 + throws ClosedChannelException
1.79 + {
1.80 + if(outputBuffers == null)
1.81 + {
1.82 + throw new ClosedChannelException();
1.83 + }
1.84 +
1.85 + synchronized(outputBuffers)
1.86 + {
1.87 + outputBuffers.add(buffer);
1.88 + }
1.89 + }
1.90 +
1.91 + /**
1.92 + * Currently a channel has only one input buffer. This *may* be a bottleneck
1.93 + * and should investigated in the future.
1.94 + * @param channel
1.95 + * @return The input buffer associated with given channel.
1.96 + */
1.97 + public ByteBuffer getInputBuffer()
1.98 + {
1.99 + return inputBuffer;
1.100 + }
1.101 +
1.102 + /**
1.103 + * Returns the current output buffer for writing(!) to SocketChannel.
1.104 + * @param channel
1.105 + * @return The next input buffer that contains unprocessed data or null
1.106 + * if the connection was closed or there are no more unprocessed buffers.
1.107 + */
1.108 + public ByteBuffer getOutputBuffer()
1.109 + {
1.110 + synchronized(outputBuffers)
1.111 + {
1.112 + if(outputBuffers == null || outputBuffers.isEmpty())
1.113 + {
1.114 + return null;
1.115 + }
1.116 + else
1.117 + {
1.118 + ByteBuffer buffer = outputBuffers.get(0);
1.119 + if(buffer.remaining() == 0)
1.120 + {
1.121 + outputBuffers.remove(0);
1.122 + // Add old buffers to the list of free buffers
1.123 + recycleBuffer(buffer);
1.124 + buffer = getOutputBuffer();
1.125 + }
1.126 + return buffer;
1.127 + }
1.128 + }
1.129 + }
1.130 +
1.131 + /**
1.132 + * Goes through the input buffer of the given channel and searches
1.133 + * for next line terminator. If a '\n' is found, the bytes up to the
1.134 + * line terminator are returned as array of bytes (the line terminator
1.135 + * is omitted). If none is found the method returns null.
1.136 + * @param channel
1.137 + * @return A ByteBuffer wrapping the line.
1.138 + */
1.139 + ByteBuffer nextInputLine()
1.140 + {
1.141 + if(inputBuffer == null)
1.142 + {
1.143 + return null;
1.144 + }
1.145 +
1.146 + synchronized(inputBuffer)
1.147 + {
1.148 + ByteBuffer buffer = inputBuffer;
1.149 +
1.150 + // Mark the current write position
1.151 + int mark = buffer.position();
1.152 +
1.153 + // Set position to 0 and limit to current position
1.154 + buffer.flip();
1.155 +
1.156 + ByteBuffer lineBuffer = newLineBuffer();
1.157 +
1.158 + while (buffer.position() < buffer.limit())
1.159 + {
1.160 + byte b = buffer.get();
1.161 + if (b == 10) // '\n'
1.162 + {
1.163 + // The bytes between the buffer's current position and its limit,
1.164 + // if any, are copied to the beginning of the buffer. That is, the
1.165 + // byte at index p = position() is copied to index zero, the byte at
1.166 + // index p + 1 is copied to index one, and so forth until the byte
1.167 + // at index limit() - 1 is copied to index n = limit() - 1 - p.
1.168 + // The buffer's position is then set to n+1 and its limit is set to
1.169 + // its capacity.
1.170 + buffer.compact();
1.171 +
1.172 + lineBuffer.flip(); // limit to position, position to 0
1.173 + return lineBuffer;
1.174 + }
1.175 + else
1.176 + {
1.177 + lineBuffer.put(b);
1.178 + }
1.179 + }
1.180 +
1.181 + buffer.limit(BUFFER_SIZE);
1.182 + buffer.position(mark);
1.183 +
1.184 + if(buffer.hasRemaining())
1.185 + {
1.186 + return null;
1.187 + }
1.188 + else
1.189 + {
1.190 + // In the first 512 was no newline found, so the input is not standard
1.191 + // compliant. We return the current buffer as new line and add a space
1.192 + // to the beginning of the next line which corrects some overlong header
1.193 + // lines.
1.194 + inputBuffer = newLineBuffer();
1.195 + inputBuffer.put((byte)' ');
1.196 + buffer.flip();
1.197 + return buffer;
1.198 + }
1.199 + }
1.200 + }
1.201 +
1.202 + /**
1.203 + * Returns a at least 512 bytes long ByteBuffer ready for usage.
1.204 + * The method first try to reuse an already allocated (cached) buffer but
1.205 + * if that fails returns a newly allocated direct buffer.
1.206 + * Use recycleBuffer() method when you do not longer use the allocated buffer.
1.207 + */
1.208 + static ByteBuffer newLineBuffer()
1.209 + {
1.210 + ByteBuffer buf = null;
1.211 + synchronized(freeSmallBuffers)
1.212 + {
1.213 + if(!freeSmallBuffers.isEmpty())
1.214 + {
1.215 + buf = freeSmallBuffers.remove(0);
1.216 + }
1.217 + }
1.218 +
1.219 + if(buf == null)
1.220 + {
1.221 + // Allocate a non-direct buffer
1.222 + buf = ByteBuffer.allocate(BUFFER_SIZE);
1.223 + }
1.224 +
1.225 + assert buf.position() == 0;
1.226 + assert buf.limit() >= BUFFER_SIZE;
1.227 +
1.228 + return buf;
1.229 + }
1.230 +
1.231 + /**
1.232 + * Adds the given buffer to the list of free buffers if it is a valuable
1.233 + * direct allocated buffer.
1.234 + * @param buffer
1.235 + */
1.236 + public static void recycleBuffer(ByteBuffer buffer)
1.237 + {
1.238 + assert buffer != null;
1.239 + assert buffer.capacity() >= BUFFER_SIZE;
1.240 +
1.241 + if(buffer.isDirect())
1.242 + {
1.243 + // Add old buffers to the list of free buffers
1.244 + synchronized(freeSmallBuffers)
1.245 + {
1.246 + buffer.clear(); // Set position to 0 and limit to capacity
1.247 + freeSmallBuffers.add(buffer);
1.248 + }
1.249 + } // if(buffer.isDirect())
1.250 + }
1.251 +
1.252 + /**
1.253 + * Recycles all buffers of this ChannelLineBuffers object.
1.254 + */
1.255 + public void recycleBuffers()
1.256 + {
1.257 + synchronized(inputBuffer)
1.258 + {
1.259 + recycleBuffer(inputBuffer);
1.260 + this.inputBuffer = null;
1.261 + }
1.262 +
1.263 + synchronized(outputBuffers)
1.264 + {
1.265 + for(ByteBuffer buf : outputBuffers)
1.266 + {
1.267 + recycleBuffer(buf);
1.268 + }
1.269 + outputBuffers = null;
1.270 + }
1.271 + }
1.272 +
1.273 +}