chris@1: /*
chris@1:  *   SONEWS News Server
chris@1:  *   see AUTHORS for the list of contributors
chris@1:  *
chris@1:  *   This program is free software: you can redistribute it and/or modify
chris@1:  *   it under the terms of the GNU General Public License as published by
chris@1:  *   the Free Software Foundation, either version 3 of the License, or
chris@1:  *   (at your option) any later version.
chris@1:  *
chris@1:  *   This program is distributed in the hope that it will be useful,
chris@1:  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1:  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1:  *   GNU General Public License for more details.
chris@1:  *
chris@1:  *   You should have received a copy of the GNU General Public License
chris@1:  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1:  */
chris@1: 
chris@1: package org.sonews.daemon;
chris@1: 
chris@1: import java.nio.ByteBuffer;
chris@1: import java.nio.channels.ClosedChannelException;
chris@1: import java.util.ArrayList;
chris@1: import java.util.List;
chris@1: 
chris@1: /**
chris@1:  * Class holding ByteBuffers for SocketChannels/NNTPConnection.
chris@1:  * Due to the complex nature of AIO/NIO we must properly handle the line 
chris@1:  * buffers for the input and output of the SocketChannels.
chris@1:  * @author Christian Lins
chris@1:  * @since sonews/0.5.0
chris@1:  */
chris@1: public class ChannelLineBuffers 
chris@1: {
chris@1:   
chris@1:   /**
chris@1:    * Size of one small buffer; 
chris@1:    * per default this is 512 bytes to fit one standard line.
chris@1:    */
chris@1:   public static final int BUFFER_SIZE = 512;
chris@1:   
chris@1:   private static int maxCachedBuffers = 2048; // Cached buffers maximum
chris@1:   
chris@1:   private static final List<ByteBuffer> freeSmallBuffers
chris@1:     = new ArrayList<ByteBuffer>(maxCachedBuffers);
chris@1:   
chris@1:   /**
chris@1:    * Allocates a predefined number of direct ByteBuffers (allocated via
chris@1:    * ByteBuffer.allocateDirect()). This method is Thread-safe, but should only
chris@1:    * called at startup.
chris@1:    */
chris@1:   public static void allocateDirect()
chris@1:   {
chris@1:     synchronized(freeSmallBuffers)
chris@1:     {
chris@1:       for(int n = 0; n < maxCachedBuffers; n++)
chris@1:       {
chris@1:         ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
chris@1:         freeSmallBuffers.add(buffer);
chris@1:       }
chris@1:     }
chris@1:   }
chris@1:   
chris@1:   private ByteBuffer       inputBuffer   = newLineBuffer();
chris@1:   private List<ByteBuffer> outputBuffers = new ArrayList<ByteBuffer>();
chris@1:   
chris@1:   /**
chris@1:    * Add the given ByteBuffer to the list of buffers to be send to the client.
chris@1:    * This method is Thread-safe.
chris@1:    * @param buffer
chris@1:    * @throws java.nio.channels.ClosedChannelException If the client channel was
chris@1:    * already closed.
chris@1:    */
chris@1:   public void addOutputBuffer(ByteBuffer buffer)
chris@1:     throws ClosedChannelException
chris@1:   {
chris@1:     if(outputBuffers == null)
chris@1:     {
chris@1:       throw new ClosedChannelException();
chris@1:     }
chris@1:     
chris@1:     synchronized(outputBuffers)
chris@1:     {
chris@1:       outputBuffers.add(buffer);
chris@1:     }
chris@1:   }
chris@1:   
chris@1:   /**
chris@1:    * Currently a channel has only one input buffer. This *may* be a bottleneck
chris@1:    * and should investigated in the future.
chris@1:    * @param channel
chris@1:    * @return The input buffer associated with given channel.
chris@1:    */
chris@1:   public ByteBuffer getInputBuffer()
chris@1:   {
chris@1:     return inputBuffer;
chris@1:   }
chris@1:   
chris@1:   /**
chris@1:    * Returns the current output buffer for writing(!) to SocketChannel.
chris@1:    * @param channel
chris@1:    * @return The next input buffer that contains unprocessed data or null
chris@1:    * if the connection was closed or there are no more unprocessed buffers.
chris@1:    */
chris@1:   public ByteBuffer getOutputBuffer()
chris@1:   {
chris@1:     synchronized(outputBuffers)
chris@1:     {
chris@1:       if(outputBuffers == null || outputBuffers.isEmpty())
chris@1:       {
chris@1:         return null;
chris@1:       }
chris@1:       else
chris@1:       {
chris@1:         ByteBuffer buffer = outputBuffers.get(0);
chris@1:         if(buffer.remaining() == 0)
chris@1:         {
chris@1:           outputBuffers.remove(0);
chris@1:           // Add old buffers to the list of free buffers
chris@1:           recycleBuffer(buffer);
chris@1:           buffer = getOutputBuffer();
chris@1:         }
chris@1:         return buffer;
chris@1:       }
chris@1:     }
chris@1:   }
chris@1:   
chris@1:   /**
chris@1:    * Goes through the input buffer of the given channel and searches
chris@1:    * for next line terminator. If a '\n' is found, the bytes up to the
chris@1:    * line terminator are returned as array of bytes (the line terminator
chris@1:    * is omitted). If none is found the method returns null.
chris@1:    * @param channel
chris@1:    * @return A ByteBuffer wrapping the line.
chris@1:    */
chris@1:   ByteBuffer nextInputLine()
chris@1:   {
chris@1:     if(inputBuffer == null)
chris@1:     {
chris@1:       return null;
chris@1:     }
chris@1:     
chris@1:     synchronized(inputBuffer)
chris@1:     {
chris@1:       ByteBuffer buffer = inputBuffer;
chris@1: 
chris@1:       // Mark the current write position
chris@1:       int mark = buffer.position();
chris@1: 
chris@1:       // Set position to 0 and limit to current position
chris@1:       buffer.flip();
chris@1: 
chris@1:       ByteBuffer lineBuffer = newLineBuffer();
chris@1: 
chris@1:       while (buffer.position() < buffer.limit())
chris@1:       {
chris@1:         byte b = buffer.get();
chris@1:         if (b == 10) // '\n'
chris@1:         {
chris@1:           // The bytes between the buffer's current position and its limit, 
chris@1:           // if any, are copied to the beginning of the buffer. That is, the 
chris@1:           // byte at index p = position() is copied to index zero, the byte at 
chris@1:           // index p + 1 is copied to index one, and so forth until the byte 
chris@1:           // at index limit() - 1 is copied to index n = limit() - 1 - p. 
chris@1:           // The buffer's position is then set to n+1 and its limit is set to 
chris@1:           // its capacity.
chris@1:           buffer.compact();
chris@1: 
chris@1:           lineBuffer.flip(); // limit to position, position to 0
chris@1:           return lineBuffer;
chris@1:         }
chris@1:         else
chris@1:         {
chris@1:           lineBuffer.put(b);
chris@1:         }
chris@1:       }
chris@1: 
chris@1:       buffer.limit(BUFFER_SIZE);
chris@1:       buffer.position(mark);
chris@1: 
chris@1:       if(buffer.hasRemaining())
chris@1:       {
chris@1:         return null;
chris@1:       }
chris@1:       else
chris@1:       {
chris@1:         // In the first 512 was no newline found, so the input is not standard
chris@1:         // compliant. We return the current buffer as new line and add a space
chris@1:         // to the beginning of the next line which corrects some overlong header
chris@1:         // lines.
chris@1:         inputBuffer = newLineBuffer();
chris@1:         inputBuffer.put((byte)' ');
chris@1:         buffer.flip();
chris@1:         return buffer;
chris@1:       }
chris@1:     }
chris@1:   }
chris@1:   
chris@1:   /**
chris@1:    * Returns a at least 512 bytes long ByteBuffer ready for usage.
chris@1:    * The method first try to reuse an already allocated (cached) buffer but
chris@1:    * if that fails returns a newly allocated direct buffer.
chris@1:    * Use recycleBuffer() method when you do not longer use the allocated buffer.
chris@1:    */
chris@1:   static ByteBuffer newLineBuffer()
chris@1:   {
chris@1:     ByteBuffer buf = null;
chris@1:     synchronized(freeSmallBuffers)
chris@1:     {
chris@1:       if(!freeSmallBuffers.isEmpty())
chris@1:       {
chris@1:         buf = freeSmallBuffers.remove(0);
chris@1:       }
chris@1:     }
chris@1:       
chris@1:     if(buf == null)
chris@1:     {
chris@1:       // Allocate a non-direct buffer
chris@1:       buf = ByteBuffer.allocate(BUFFER_SIZE);
chris@1:     }
chris@1:     
chris@1:     assert buf.position() == 0;
chris@1:     assert buf.limit() >= BUFFER_SIZE;
chris@1:     
chris@1:     return buf;
chris@1:   }
chris@1:   
chris@1:   /**
chris@1:    * Adds the given buffer to the list of free buffers if it is a valuable
chris@1:    * direct allocated buffer.
chris@1:    * @param buffer
chris@1:    */
chris@1:   public static void recycleBuffer(ByteBuffer buffer)
chris@1:   {
chris@1:     assert buffer != null;
chris@1: 
chris@1:     if(buffer.isDirect())
chris@1:     {
chris@3:       assert buffer.capacity() >= BUFFER_SIZE;
chris@3:       
chris@1:       // Add old buffers to the list of free buffers
chris@1:       synchronized(freeSmallBuffers)
chris@1:       {
chris@1:         buffer.clear(); // Set position to 0 and limit to capacity
chris@1:         freeSmallBuffers.add(buffer);
chris@1:       }
chris@1:     } // if(buffer.isDirect())
chris@1:   }
chris@1:   
chris@1:   /**
chris@1:    * Recycles all buffers of this ChannelLineBuffers object.
chris@1:    */
chris@1:   public void recycleBuffers()
chris@1:   {
chris@1:     synchronized(inputBuffer)
chris@1:     {
chris@1:       recycleBuffer(inputBuffer);
chris@1:       this.inputBuffer = null;
chris@1:     }
chris@1:     
chris@1:     synchronized(outputBuffers)
chris@1:     {
chris@1:       for(ByteBuffer buf : outputBuffers)
chris@1:       {
chris@1:         recycleBuffer(buf);
chris@1:       }
chris@1:       outputBuffers = null;
chris@1:     }
chris@1:   }
chris@1:   
chris@1: }