1.1 --- a/src/org/sonews/daemon/ChannelLineBuffers.java Sun Aug 29 17:28:58 2010 +0200
1.2 +++ b/src/org/sonews/daemon/ChannelLineBuffers.java Wed Sep 14 23:25:00 2011 +0200
1.3 @@ -30,254 +30,223 @@
1.4 * @author Christian Lins
1.5 * @since sonews/0.5.0
1.6 */
1.7 -public class ChannelLineBuffers
1.8 +public class ChannelLineBuffers
1.9 {
1.10 -
1.11 - /**
1.12 - * Size of one small buffer;
1.13 - * per default this is 512 bytes to fit one standard line.
1.14 - */
1.15 - public static final int BUFFER_SIZE = 512;
1.16 -
1.17 - private static int maxCachedBuffers = 2048; // Cached buffers maximum
1.18 -
1.19 - private static final List<ByteBuffer> freeSmallBuffers
1.20 - = new ArrayList<ByteBuffer>(maxCachedBuffers);
1.21 -
1.22 - /**
1.23 - * Allocates a predefined number of direct ByteBuffers (allocated via
1.24 - * ByteBuffer.allocateDirect()). This method is Thread-safe, but should only
1.25 - * called at startup.
1.26 - */
1.27 - public static void allocateDirect()
1.28 - {
1.29 - synchronized(freeSmallBuffers)
1.30 - {
1.31 - for(int n = 0; n < maxCachedBuffers; n++)
1.32 - {
1.33 - ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
1.34 - freeSmallBuffers.add(buffer);
1.35 - }
1.36 - }
1.37 - }
1.38 -
1.39 - private ByteBuffer inputBuffer = newLineBuffer();
1.40 - private List<ByteBuffer> outputBuffers = new ArrayList<ByteBuffer>();
1.41 -
1.42 - /**
1.43 - * Add the given ByteBuffer to the list of buffers to be send to the client.
1.44 - * This method is Thread-safe.
1.45 - * @param buffer
1.46 - * @throws java.nio.channels.ClosedChannelException If the client channel was
1.47 - * already closed.
1.48 - */
1.49 - public void addOutputBuffer(ByteBuffer buffer)
1.50 - throws ClosedChannelException
1.51 - {
1.52 - if(outputBuffers == null)
1.53 - {
1.54 - throw new ClosedChannelException();
1.55 - }
1.56 -
1.57 - synchronized(outputBuffers)
1.58 - {
1.59 - outputBuffers.add(buffer);
1.60 - }
1.61 - }
1.62 -
1.63 - /**
1.64 - * Currently a channel has only one input buffer. This *may* be a bottleneck
1.65 - * and should investigated in the future.
1.66 - * @param channel
1.67 - * @return The input buffer associated with given channel.
1.68 - */
1.69 - public ByteBuffer getInputBuffer()
1.70 - {
1.71 - return inputBuffer;
1.72 - }
1.73 -
1.74 - /**
1.75 - * Returns the current output buffer for writing(!) to SocketChannel.
1.76 - * @param channel
1.77 - * @return The next input buffer that contains unprocessed data or null
1.78 - * if the connection was closed or there are no more unprocessed buffers.
1.79 - */
1.80 - public ByteBuffer getOutputBuffer()
1.81 - {
1.82 - synchronized(outputBuffers)
1.83 - {
1.84 - if(outputBuffers == null || outputBuffers.isEmpty())
1.85 - {
1.86 - return null;
1.87 - }
1.88 - else
1.89 - {
1.90 - ByteBuffer buffer = outputBuffers.get(0);
1.91 - if(buffer.remaining() == 0)
1.92 - {
1.93 - outputBuffers.remove(0);
1.94 - // Add old buffers to the list of free buffers
1.95 - recycleBuffer(buffer);
1.96 - buffer = getOutputBuffer();
1.97 - }
1.98 - return buffer;
1.99 - }
1.100 - }
1.101 - }
1.102
1.103 - /**
1.104 - * @return false if there are output buffers pending to be written to the
1.105 - * client.
1.106 - */
1.107 - boolean isOutputBufferEmpty()
1.108 - {
1.109 - synchronized(outputBuffers)
1.110 - {
1.111 - return outputBuffers.isEmpty();
1.112 - }
1.113 - }
1.114 -
1.115 - /**
1.116 - * Goes through the input buffer of the given channel and searches
1.117 - * for next line terminator. If a '\n' is found, the bytes up to the
1.118 - * line terminator are returned as array of bytes (the line terminator
1.119 - * is omitted). If none is found the method returns null.
1.120 - * @param channel
1.121 - * @return A ByteBuffer wrapping the line.
1.122 - */
1.123 - ByteBuffer nextInputLine()
1.124 - {
1.125 - if(inputBuffer == null)
1.126 - {
1.127 - return null;
1.128 - }
1.129 -
1.130 - synchronized(inputBuffer)
1.131 - {
1.132 - ByteBuffer buffer = inputBuffer;
1.133 + /**
1.134 + * Size of one small buffer;
1.135 + * per default this is 512 bytes to fit one standard line.
1.136 + */
1.137 + public static final int BUFFER_SIZE = 512;
1.138 + private static int maxCachedBuffers = 2048; // Cached buffers maximum
1.139 + private static final List<ByteBuffer> freeSmallBuffers = new ArrayList<ByteBuffer>(maxCachedBuffers);
1.140
1.141 - // Mark the current write position
1.142 - int mark = buffer.position();
1.143 + /**
1.144 + * Allocates a predefined number of direct ByteBuffers (allocated via
1.145 + * ByteBuffer.allocateDirect()). This method is Thread-safe, but should only
1.146 + * called at startup.
1.147 + */
1.148 + public static void allocateDirect()
1.149 + {
1.150 + synchronized (freeSmallBuffers) {
1.151 + for (int n = 0; n < maxCachedBuffers; n++) {
1.152 + ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
1.153 + freeSmallBuffers.add(buffer);
1.154 + }
1.155 + }
1.156 + }
1.157 + private ByteBuffer inputBuffer = newLineBuffer();
1.158 + private List<ByteBuffer> outputBuffers = new ArrayList<ByteBuffer>();
1.159
1.160 - // Set position to 0 and limit to current position
1.161 - buffer.flip();
1.162 + /**
1.163 + * Add the given ByteBuffer to the list of buffers to be send to the client.
1.164 + * This method is Thread-safe.
1.165 + * @param buffer
1.166 + * @throws java.nio.channels.ClosedChannelException If the client channel was
1.167 + * already closed.
1.168 + */
1.169 + public void addOutputBuffer(ByteBuffer buffer)
1.170 + throws ClosedChannelException
1.171 + {
1.172 + if (outputBuffers == null) {
1.173 + throw new ClosedChannelException();
1.174 + }
1.175
1.176 - ByteBuffer lineBuffer = newLineBuffer();
1.177 + synchronized (outputBuffers) {
1.178 + outputBuffers.add(buffer);
1.179 + }
1.180 + }
1.181
1.182 - while (buffer.position() < buffer.limit())
1.183 - {
1.184 - byte b = buffer.get();
1.185 - if (b == 10) // '\n'
1.186 - {
1.187 - // The bytes between the buffer's current position and its limit,
1.188 - // if any, are copied to the beginning of the buffer. That is, the
1.189 - // byte at index p = position() is copied to index zero, the byte at
1.190 - // index p + 1 is copied to index one, and so forth until the byte
1.191 - // at index limit() - 1 is copied to index n = limit() - 1 - p.
1.192 - // The buffer's position is then set to n+1 and its limit is set to
1.193 - // its capacity.
1.194 - buffer.compact();
1.195 + /**
1.196 + * Currently a channel has only one input buffer. This *may* be a bottleneck
1.197 + * and should investigated in the future.
1.198 + * @param channel
1.199 + * @return The input buffer associated with given channel.
1.200 + */
1.201 + public ByteBuffer getInputBuffer()
1.202 + {
1.203 + return inputBuffer;
1.204 + }
1.205
1.206 - lineBuffer.flip(); // limit to position, position to 0
1.207 - return lineBuffer;
1.208 - }
1.209 - else
1.210 - {
1.211 - lineBuffer.put(b);
1.212 - }
1.213 - }
1.214 + /**
1.215 + * Returns the current output buffer for writing(!) to SocketChannel.
1.216 + * @param channel
1.217 + * @return The next input buffer that contains unprocessed data or null
1.218 + * if the connection was closed or there are no more unprocessed buffers.
1.219 + */
1.220 + public ByteBuffer getOutputBuffer()
1.221 + {
1.222 + synchronized (outputBuffers) {
1.223 + if (outputBuffers == null || outputBuffers.isEmpty()) {
1.224 + return null;
1.225 + } else {
1.226 + ByteBuffer buffer = outputBuffers.get(0);
1.227 + if (buffer.remaining() == 0) {
1.228 + outputBuffers.remove(0);
1.229 + // Add old buffers to the list of free buffers
1.230 + recycleBuffer(buffer);
1.231 + buffer = getOutputBuffer();
1.232 + }
1.233 + return buffer;
1.234 + }
1.235 + }
1.236 + }
1.237
1.238 - buffer.limit(BUFFER_SIZE);
1.239 - buffer.position(mark);
1.240 + /**
1.241 + * @return false if there are output buffers pending to be written to the
1.242 + * client.
1.243 + */
1.244 + boolean isOutputBufferEmpty()
1.245 + {
1.246 + synchronized (outputBuffers) {
1.247 + return outputBuffers.isEmpty();
1.248 + }
1.249 + }
1.250
1.251 - if(buffer.hasRemaining())
1.252 - {
1.253 - return null;
1.254 - }
1.255 - else
1.256 - {
1.257 - // In the first 512 was no newline found, so the input is not standard
1.258 - // compliant. We return the current buffer as new line and add a space
1.259 - // to the beginning of the next line which corrects some overlong header
1.260 - // lines.
1.261 - inputBuffer = newLineBuffer();
1.262 - inputBuffer.put((byte)' ');
1.263 - buffer.flip();
1.264 - return buffer;
1.265 - }
1.266 - }
1.267 - }
1.268 -
1.269 - /**
1.270 - * Returns a at least 512 bytes long ByteBuffer ready for usage.
1.271 - * The method first try to reuse an already allocated (cached) buffer but
1.272 - * if that fails returns a newly allocated direct buffer.
1.273 - * Use recycleBuffer() method when you do not longer use the allocated buffer.
1.274 - */
1.275 - static ByteBuffer newLineBuffer()
1.276 - {
1.277 - ByteBuffer buf = null;
1.278 - synchronized(freeSmallBuffers)
1.279 - {
1.280 - if(!freeSmallBuffers.isEmpty())
1.281 - {
1.282 - buf = freeSmallBuffers.remove(0);
1.283 - }
1.284 - }
1.285 -
1.286 - if(buf == null)
1.287 - {
1.288 - // Allocate a non-direct buffer
1.289 - buf = ByteBuffer.allocate(BUFFER_SIZE);
1.290 - }
1.291 -
1.292 - assert buf.position() == 0;
1.293 - assert buf.limit() >= BUFFER_SIZE;
1.294 -
1.295 - return buf;
1.296 - }
1.297 -
1.298 - /**
1.299 - * Adds the given buffer to the list of free buffers if it is a valuable
1.300 - * direct allocated buffer.
1.301 - * @param buffer
1.302 - */
1.303 - public static void recycleBuffer(ByteBuffer buffer)
1.304 - {
1.305 - assert buffer != null;
1.306 + /**
1.307 + * Goes through the input buffer of the given channel and searches
1.308 + * for next line terminator. If a '\n' is found, the bytes up to the
1.309 + * line terminator are returned as array of bytes (the line terminator
1.310 + * is omitted). If none is found the method returns null.
1.311 + * @param channel
1.312 + * @return A ByteBuffer wrapping the line.
1.313 + */
1.314 + ByteBuffer nextInputLine()
1.315 + {
1.316 + if (inputBuffer == null) {
1.317 + return null;
1.318 + }
1.319
1.320 - if(buffer.isDirect())
1.321 - {
1.322 - assert buffer.capacity() >= BUFFER_SIZE;
1.323 -
1.324 - // Add old buffers to the list of free buffers
1.325 - synchronized(freeSmallBuffers)
1.326 - {
1.327 - buffer.clear(); // Set position to 0 and limit to capacity
1.328 - freeSmallBuffers.add(buffer);
1.329 - }
1.330 - } // if(buffer.isDirect())
1.331 - }
1.332 -
1.333 - /**
1.334 - * Recycles all buffers of this ChannelLineBuffers object.
1.335 - */
1.336 - public void recycleBuffers()
1.337 - {
1.338 - synchronized(inputBuffer)
1.339 - {
1.340 - recycleBuffer(inputBuffer);
1.341 - this.inputBuffer = null;
1.342 - }
1.343 -
1.344 - synchronized(outputBuffers)
1.345 - {
1.346 - for(ByteBuffer buf : outputBuffers)
1.347 - {
1.348 - recycleBuffer(buf);
1.349 - }
1.350 - outputBuffers = null;
1.351 - }
1.352 - }
1.353 -
1.354 + synchronized (inputBuffer) {
1.355 + ByteBuffer buffer = inputBuffer;
1.356 +
1.357 + // Mark the current write position
1.358 + int mark = buffer.position();
1.359 +
1.360 + // Set position to 0 and limit to current position
1.361 + buffer.flip();
1.362 +
1.363 + ByteBuffer lineBuffer = newLineBuffer();
1.364 +
1.365 + while (buffer.position() < buffer.limit()) {
1.366 + byte b = buffer.get();
1.367 + if (b == 10) // '\n'
1.368 + {
1.369 + // The bytes between the buffer's current position and its limit,
1.370 + // if any, are copied to the beginning of the buffer. That is, the
1.371 + // byte at index p = position() is copied to index zero, the byte at
1.372 + // index p + 1 is copied to index one, and so forth until the byte
1.373 + // at index limit() - 1 is copied to index n = limit() - 1 - p.
1.374 + // The buffer's position is then set to n+1 and its limit is set to
1.375 + // its capacity.
1.376 + buffer.compact();
1.377 +
1.378 + lineBuffer.flip(); // limit to position, position to 0
1.379 + return lineBuffer;
1.380 + } else {
1.381 + lineBuffer.put(b);
1.382 + }
1.383 + }
1.384 +
1.385 + buffer.limit(BUFFER_SIZE);
1.386 + buffer.position(mark);
1.387 +
1.388 + if (buffer.hasRemaining()) {
1.389 + return null;
1.390 + } else {
1.391 + // In the first 512 was no newline found, so the input is not standard
1.392 + // compliant. We return the current buffer as new line and add a space
1.393 + // to the beginning of the next line which corrects some overlong header
1.394 + // lines.
1.395 + inputBuffer = newLineBuffer();
1.396 + inputBuffer.put((byte) ' ');
1.397 + buffer.flip();
1.398 + return buffer;
1.399 + }
1.400 + }
1.401 + }
1.402 +
1.403 + /**
1.404 + * Returns a at least 512 bytes long ByteBuffer ready for usage.
1.405 + * The method first try to reuse an already allocated (cached) buffer but
1.406 + * if that fails returns a newly allocated direct buffer.
1.407 + * Use recycleBuffer() method when you do not longer use the allocated buffer.
1.408 + */
1.409 + static ByteBuffer newLineBuffer()
1.410 + {
1.411 + ByteBuffer buf = null;
1.412 + synchronized (freeSmallBuffers) {
1.413 + if (!freeSmallBuffers.isEmpty()) {
1.414 + buf = freeSmallBuffers.remove(0);
1.415 + }
1.416 + }
1.417 +
1.418 + if (buf == null) {
1.419 + // Allocate a non-direct buffer
1.420 + buf = ByteBuffer.allocate(BUFFER_SIZE);
1.421 + }
1.422 +
1.423 + assert buf.position() == 0;
1.424 + assert buf.limit() >= BUFFER_SIZE;
1.425 +
1.426 + return buf;
1.427 + }
1.428 +
1.429 + /**
1.430 + * Adds the given buffer to the list of free buffers if it is a valuable
1.431 + * direct allocated buffer.
1.432 + * @param buffer
1.433 + */
1.434 + public static void recycleBuffer(ByteBuffer buffer)
1.435 + {
1.436 + assert buffer != null;
1.437 +
1.438 + if (buffer.isDirect()) {
1.439 + assert buffer.capacity() >= BUFFER_SIZE;
1.440 +
1.441 + // Add old buffers to the list of free buffers
1.442 + synchronized (freeSmallBuffers) {
1.443 + buffer.clear(); // Set position to 0 and limit to capacity
1.444 + freeSmallBuffers.add(buffer);
1.445 + }
1.446 + } // if(buffer.isDirect())
1.447 + }
1.448 +
1.449 + /**
1.450 + * Recycles all buffers of this ChannelLineBuffers object.
1.451 + */
1.452 + public void recycleBuffers()
1.453 + {
1.454 + synchronized (inputBuffer) {
1.455 + recycleBuffer(inputBuffer);
1.456 + this.inputBuffer = null;
1.457 + }
1.458 +
1.459 + synchronized (outputBuffers) {
1.460 + for (ByteBuffer buf : outputBuffers) {
1.461 + recycleBuffer(buf);
1.462 + }
1.463 + outputBuffers = null;
1.464 + }
1.465 + }
1.466 }