org/sonews/daemon/ChannelLineBuffers.java
author cli
Thu Aug 20 16:57:38 2009 +0200 (2009-08-20)
changeset 14 efce4ec25564
parent 1 6fceb66e1ad7
child 25 dd05c3f2fa24
permissions -rw-r--r--
Fix #548: API change; changed parameter type of Storage.getGroupsForList()
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.daemon;
    20 
    21 import java.nio.ByteBuffer;
    22 import java.nio.channels.ClosedChannelException;
    23 import java.util.ArrayList;
    24 import java.util.List;
    25 
    26 /**
    27  * Class holding ByteBuffers for SocketChannels/NNTPConnection.
    28  * Due to the complex nature of AIO/NIO we must properly handle the line 
    29  * buffers for the input and output of the SocketChannels.
    30  * @author Christian Lins
    31  * @since sonews/0.5.0
    32  */
    33 public class ChannelLineBuffers 
    34 {
    35   
    36   /**
    37    * Size of one small buffer; 
    38    * per default this is 512 bytes to fit one standard line.
    39    */
    40   public static final int BUFFER_SIZE = 512;
    41   
    42   private static int maxCachedBuffers = 2048; // Cached buffers maximum
    43   
    44   private static final List<ByteBuffer> freeSmallBuffers
    45     = new ArrayList<ByteBuffer>(maxCachedBuffers);
    46   
    47   /**
    48    * Allocates a predefined number of direct ByteBuffers (allocated via
    49    * ByteBuffer.allocateDirect()). This method is Thread-safe, but should only
    50    * called at startup.
    51    */
    52   public static void allocateDirect()
    53   {
    54     synchronized(freeSmallBuffers)
    55     {
    56       for(int n = 0; n < maxCachedBuffers; n++)
    57       {
    58         ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
    59         freeSmallBuffers.add(buffer);
    60       }
    61     }
    62   }
    63   
    64   private ByteBuffer       inputBuffer   = newLineBuffer();
    65   private List<ByteBuffer> outputBuffers = new ArrayList<ByteBuffer>();
    66   
    67   /**
    68    * Add the given ByteBuffer to the list of buffers to be send to the client.
    69    * This method is Thread-safe.
    70    * @param buffer
    71    * @throws java.nio.channels.ClosedChannelException If the client channel was
    72    * already closed.
    73    */
    74   public void addOutputBuffer(ByteBuffer buffer)
    75     throws ClosedChannelException
    76   {
    77     if(outputBuffers == null)
    78     {
    79       throw new ClosedChannelException();
    80     }
    81     
    82     synchronized(outputBuffers)
    83     {
    84       outputBuffers.add(buffer);
    85     }
    86   }
    87   
    88   /**
    89    * Currently a channel has only one input buffer. This *may* be a bottleneck
    90    * and should investigated in the future.
    91    * @param channel
    92    * @return The input buffer associated with given channel.
    93    */
    94   public ByteBuffer getInputBuffer()
    95   {
    96     return inputBuffer;
    97   }
    98   
    99   /**
   100    * Returns the current output buffer for writing(!) to SocketChannel.
   101    * @param channel
   102    * @return The next input buffer that contains unprocessed data or null
   103    * if the connection was closed or there are no more unprocessed buffers.
   104    */
   105   public ByteBuffer getOutputBuffer()
   106   {
   107     synchronized(outputBuffers)
   108     {
   109       if(outputBuffers == null || outputBuffers.isEmpty())
   110       {
   111         return null;
   112       }
   113       else
   114       {
   115         ByteBuffer buffer = outputBuffers.get(0);
   116         if(buffer.remaining() == 0)
   117         {
   118           outputBuffers.remove(0);
   119           // Add old buffers to the list of free buffers
   120           recycleBuffer(buffer);
   121           buffer = getOutputBuffer();
   122         }
   123         return buffer;
   124       }
   125     }
   126   }
   127   
   128   /**
   129    * Goes through the input buffer of the given channel and searches
   130    * for next line terminator. If a '\n' is found, the bytes up to the
   131    * line terminator are returned as array of bytes (the line terminator
   132    * is omitted). If none is found the method returns null.
   133    * @param channel
   134    * @return A ByteBuffer wrapping the line.
   135    */
   136   ByteBuffer nextInputLine()
   137   {
   138     if(inputBuffer == null)
   139     {
   140       return null;
   141     }
   142     
   143     synchronized(inputBuffer)
   144     {
   145       ByteBuffer buffer = inputBuffer;
   146 
   147       // Mark the current write position
   148       int mark = buffer.position();
   149 
   150       // Set position to 0 and limit to current position
   151       buffer.flip();
   152 
   153       ByteBuffer lineBuffer = newLineBuffer();
   154 
   155       while (buffer.position() < buffer.limit())
   156       {
   157         byte b = buffer.get();
   158         if (b == 10) // '\n'
   159         {
   160           // The bytes between the buffer's current position and its limit, 
   161           // if any, are copied to the beginning of the buffer. That is, the 
   162           // byte at index p = position() is copied to index zero, the byte at 
   163           // index p + 1 is copied to index one, and so forth until the byte 
   164           // at index limit() - 1 is copied to index n = limit() - 1 - p. 
   165           // The buffer's position is then set to n+1 and its limit is set to 
   166           // its capacity.
   167           buffer.compact();
   168 
   169           lineBuffer.flip(); // limit to position, position to 0
   170           return lineBuffer;
   171         }
   172         else
   173         {
   174           lineBuffer.put(b);
   175         }
   176       }
   177 
   178       buffer.limit(BUFFER_SIZE);
   179       buffer.position(mark);
   180 
   181       if(buffer.hasRemaining())
   182       {
   183         return null;
   184       }
   185       else
   186       {
   187         // In the first 512 was no newline found, so the input is not standard
   188         // compliant. We return the current buffer as new line and add a space
   189         // to the beginning of the next line which corrects some overlong header
   190         // lines.
   191         inputBuffer = newLineBuffer();
   192         inputBuffer.put((byte)' ');
   193         buffer.flip();
   194         return buffer;
   195       }
   196     }
   197   }
   198   
   199   /**
   200    * Returns a at least 512 bytes long ByteBuffer ready for usage.
   201    * The method first try to reuse an already allocated (cached) buffer but
   202    * if that fails returns a newly allocated direct buffer.
   203    * Use recycleBuffer() method when you do not longer use the allocated buffer.
   204    */
   205   static ByteBuffer newLineBuffer()
   206   {
   207     ByteBuffer buf = null;
   208     synchronized(freeSmallBuffers)
   209     {
   210       if(!freeSmallBuffers.isEmpty())
   211       {
   212         buf = freeSmallBuffers.remove(0);
   213       }
   214     }
   215       
   216     if(buf == null)
   217     {
   218       // Allocate a non-direct buffer
   219       buf = ByteBuffer.allocate(BUFFER_SIZE);
   220     }
   221     
   222     assert buf.position() == 0;
   223     assert buf.limit() >= BUFFER_SIZE;
   224     
   225     return buf;
   226   }
   227   
   228   /**
   229    * Adds the given buffer to the list of free buffers if it is a valuable
   230    * direct allocated buffer.
   231    * @param buffer
   232    */
   233   public static void recycleBuffer(ByteBuffer buffer)
   234   {
   235     assert buffer != null;
   236 
   237     if(buffer.isDirect())
   238     {
   239       assert buffer.capacity() >= BUFFER_SIZE;
   240       
   241       // Add old buffers to the list of free buffers
   242       synchronized(freeSmallBuffers)
   243       {
   244         buffer.clear(); // Set position to 0 and limit to capacity
   245         freeSmallBuffers.add(buffer);
   246       }
   247     } // if(buffer.isDirect())
   248   }
   249   
   250   /**
   251    * Recycles all buffers of this ChannelLineBuffers object.
   252    */
   253   public void recycleBuffers()
   254   {
   255     synchronized(inputBuffer)
   256     {
   257       recycleBuffer(inputBuffer);
   258       this.inputBuffer = null;
   259     }
   260     
   261     synchronized(outputBuffers)
   262     {
   263       for(ByteBuffer buf : outputBuffers)
   264       {
   265         recycleBuffer(buf);
   266       }
   267       outputBuffers = null;
   268     }
   269   }
   270   
   271 }