org/sonews/daemon/NNTPConnection.java
author cli
Wed Aug 12 13:03:23 2009 +0200 (2009-08-12)
changeset 7 0b76e099eb96
parent 1 6fceb66e1ad7
child 15 f2293e8566f5
permissions -rw-r--r--
PullFeeder sends an addition "MODE READER" to peers.
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.daemon;
    20 
    21 import org.sonews.util.Log;
    22 import java.io.IOException;
    23 import java.net.InetSocketAddress;
    24 import java.net.SocketException;
    25 import java.nio.ByteBuffer;
    26 import java.nio.CharBuffer;
    27 import java.nio.channels.ClosedChannelException;
    28 import java.nio.channels.SelectionKey;
    29 import java.nio.channels.SocketChannel;
    30 import java.nio.charset.Charset;
    31 import java.util.Arrays;
    32 import java.util.Timer;
    33 import java.util.TimerTask;
    34 import org.sonews.daemon.command.Command;
    35 import org.sonews.storage.Article;
    36 import org.sonews.storage.Channel;
    37 import org.sonews.util.Stats;
    38 
    39 /**
    40  * For every SocketChannel (so TCP/IP connection) there is an instance of
    41  * this class.
    42  * @author Christian Lins
    43  * @since sonews/0.5.0
    44  */
    45 public final class NNTPConnection
    46 {
    47 
    48   public static final String NEWLINE            = "\r\n";    // RFC defines this as newline
    49   public static final String MESSAGE_ID_PATTERN = "<[^>]+>";
    50   
    51   private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon
    52   
    53   /** SocketChannel is generally thread-safe */
    54   private SocketChannel   channel        = null;
    55   private Charset         charset        = Charset.forName("UTF-8");
    56   private Command         command        = null;
    57   private Article         currentArticle = null;
    58   private Channel         currentGroup   = null;
    59   private volatile long   lastActivity   = System.currentTimeMillis();
    60   private ChannelLineBuffers lineBuffers = new ChannelLineBuffers();
    61   private int             readLock       = 0;
    62   private final Object    readLockGate   = new Object();
    63   private SelectionKey    writeSelKey    = null;
    64   
    65   public NNTPConnection(final SocketChannel channel)
    66     throws IOException
    67   {
    68     if(channel == null)
    69     {
    70       throw new IllegalArgumentException("channel is null");
    71     }
    72 
    73     this.channel = channel;
    74     Stats.getInstance().clientConnect();
    75   }
    76   
    77   /**
    78    * Tries to get the read lock for this NNTPConnection. This method is Thread-
    79    * safe and returns true of the read lock was successfully set. If the lock
    80    * is still hold by another Thread the method returns false.
    81    */
    82   boolean tryReadLock()
    83   {
    84     // As synchronizing simple types may cause deadlocks,
    85     // we use a gate object.
    86     synchronized(readLockGate)
    87     {
    88       if(readLock != 0)
    89       {
    90         return false;
    91       }
    92       else
    93       {
    94         readLock = Thread.currentThread().hashCode();
    95         return true;
    96       }
    97     }
    98   }
    99   
   100   /**
   101    * Releases the read lock in a Thread-safe way.
   102    * @throws IllegalMonitorStateException if a Thread not holding the lock
   103    * tries to release it.
   104    */
   105   void unlockReadLock()
   106   {
   107     synchronized(readLockGate)
   108     {
   109       if(readLock == Thread.currentThread().hashCode())
   110       {
   111         readLock = 0;
   112       }
   113       else
   114       {
   115         throw new IllegalMonitorStateException();
   116       }
   117     }
   118   }
   119   
   120   /**
   121    * @return Current input buffer of this NNTPConnection instance.
   122    */
   123   public ByteBuffer getInputBuffer()
   124   {
   125     return this.lineBuffers.getInputBuffer();
   126   }
   127   
   128   /**
   129    * @return Output buffer of this NNTPConnection which has at least one byte
   130    * free storage.
   131    */
   132   public ByteBuffer getOutputBuffer()
   133   {
   134     return this.lineBuffers.getOutputBuffer();
   135   }
   136   
   137   /**
   138    * @return ChannelLineBuffers instance associated with this NNTPConnection.
   139    */
   140   public ChannelLineBuffers getBuffers()
   141   {
   142     return this.lineBuffers;
   143   }
   144   
   145   /**
   146    * @return true if this connection comes from a local remote address.
   147    */
   148   public boolean isLocalConnection()
   149   {
   150     return ((InetSocketAddress)this.channel.socket().getRemoteSocketAddress())
   151       .getHostName().equalsIgnoreCase("localhost");
   152   }
   153 
   154   void setWriteSelectionKey(SelectionKey selKey)
   155   {
   156     this.writeSelKey = selKey;
   157   }
   158 
   159   public void shutdownInput()
   160   {
   161     try
   162     {
   163       // Closes the input line of the channel's socket, so no new data
   164       // will be received and a timeout can be triggered.
   165       this.channel.socket().shutdownInput();
   166     }
   167     catch(IOException ex)
   168     {
   169       Log.msg("Exception in NNTPConnection.shutdownInput(): " + ex, false);
   170       if(Log.isDebug())
   171       {
   172         ex.printStackTrace();
   173       }
   174     }
   175   }
   176   
   177   public void shutdownOutput()
   178   {
   179     cancelTimer.schedule(new TimerTask() 
   180     {
   181       @Override
   182       public void run()
   183       {
   184         try
   185         {
   186           // Closes the output line of the channel's socket.
   187           channel.socket().shutdownOutput();
   188           channel.close();
   189         }
   190         catch(SocketException ex)
   191         {
   192           // Socket was already disconnected
   193           Log.msg("NNTPConnection.shutdownOutput(): " + ex, true);
   194         }
   195         catch(Exception ex)
   196         {
   197           Log.msg("NNTPConnection.shutdownOutput(): " + ex, false);
   198           if(Log.isDebug())
   199           {
   200             ex.printStackTrace();
   201           }
   202         }
   203       }
   204     }, 3000);
   205   }
   206   
   207   public SocketChannel getSocketChannel()
   208   {
   209     return this.channel;
   210   }
   211   
   212   public Article getCurrentArticle()
   213   {
   214     return this.currentArticle;
   215   }
   216   
   217   public Charset getCurrentCharset()
   218   {
   219     return this.charset;
   220   }
   221 
   222   /**
   223    * @return The currently selected communication channel (not SocketChannel)
   224    */
   225   public Channel getCurrentChannel()
   226   {
   227     return this.currentGroup;
   228   }
   229   
   230   public void setCurrentArticle(final Article article)
   231   {
   232     this.currentArticle = article;
   233   }
   234   
   235   public void setCurrentGroup(final Channel group)
   236   {
   237     this.currentGroup = group;
   238   }
   239   
   240   public long getLastActivity()
   241   {
   242     return this.lastActivity;
   243   }
   244   
   245   /**
   246    * Due to the readLockGate there is no need to synchronize this method.
   247    * @param raw
   248    * @throws IllegalArgumentException if raw is null.
   249    * @throws IllegalStateException if calling thread does not own the readLock.
   250    */
   251   void lineReceived(byte[] raw)
   252   {
   253     if(raw == null)
   254     {
   255       throw new IllegalArgumentException("raw is null");
   256     }
   257     
   258     if(readLock == 0 || readLock != Thread.currentThread().hashCode())
   259     {
   260       throw new IllegalStateException("readLock not properly set");
   261     }
   262 
   263     this.lastActivity = System.currentTimeMillis();
   264     
   265     String line = new String(raw, this.charset);
   266     
   267     // There might be a trailing \r, but trim() is a bad idea
   268     // as it removes also leading spaces from long header lines.
   269     if(line.endsWith("\r"))
   270     {
   271       line = line.substring(0, line.length() - 1);
   272       raw  = Arrays.copyOf(raw, raw.length - 1);
   273     }
   274     
   275     Log.msg("<< " + line, true);
   276     
   277     if(command == null)
   278     {
   279       command = parseCommandLine(line);
   280       assert command != null;
   281     }
   282 
   283     try
   284     {
   285       // The command object will process the line we just received
   286       command.processLine(this, line, raw);
   287     }
   288     catch(ClosedChannelException ex0)
   289     {
   290       try
   291       {
   292         Log.msg("Connection to " + channel.socket().getRemoteSocketAddress() 
   293             + " closed: " + ex0, true);
   294       }
   295       catch(Exception ex0a)
   296       {
   297         ex0a.printStackTrace();
   298       }
   299     }
   300     catch(Exception ex1)
   301     {
   302       try
   303       {
   304         command = null;
   305         ex1.printStackTrace();
   306         println("500 Internal server error");
   307       }
   308       catch(Exception ex2)
   309       {
   310         ex2.printStackTrace();
   311       }
   312     }
   313 
   314     if(command == null || command.hasFinished())
   315     {
   316       command = null;
   317       charset = Charset.forName("UTF-8"); // Reset to default
   318     }
   319   }
   320   
   321   /**
   322    * This method determines the fitting command processing class.
   323    * @param line
   324    * @return
   325    */
   326   private Command parseCommandLine(String line)
   327   {
   328     String cmdStr = line.split(" ")[0];
   329     return CommandSelector.getInstance().get(cmdStr);
   330   }
   331   
   332   /**
   333    * Puts the given line into the output buffer, adds a newline character
   334    * and returns. The method returns immediately and does not block until
   335    * the line was sent. If line is longer than 510 octets it is split up in
   336    * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE).
   337    * @param line
   338    */
   339   public void println(final CharSequence line, final Charset charset)
   340     throws IOException
   341   {    
   342     writeToChannel(CharBuffer.wrap(line), charset, line);
   343     writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
   344   }
   345 
   346   /**
   347    * Writes the given raw lines to the output buffers and finishes with
   348    * a newline character (\r\n).
   349    * @param rawLines
   350    */
   351   public void println(final byte[] rawLines)
   352     throws IOException
   353   {
   354     this.lineBuffers.addOutputBuffer(ByteBuffer.wrap(rawLines));
   355     writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
   356   }
   357   
   358   /**
   359    * Encodes the given CharBuffer using the given Charset to a bunch of
   360    * ByteBuffers (each 512 bytes large) and enqueues them for writing at the
   361    * connected SocketChannel.
   362    * @throws java.io.IOException
   363    */
   364   private void writeToChannel(CharBuffer characters, final Charset charset,
   365     CharSequence debugLine)
   366     throws IOException
   367   {
   368     if(!charset.canEncode())
   369     {
   370       Log.msg("FATAL: Charset " + charset + " cannot encode!", false);
   371       return;
   372     }
   373     
   374     // Write characters to output buffers
   375     LineEncoder lenc = new LineEncoder(characters, charset);
   376     lenc.encode(lineBuffers);
   377     
   378     enableWriteEvents(debugLine);
   379   }
   380 
   381   private void enableWriteEvents(CharSequence debugLine)
   382   {
   383     // Enable OP_WRITE events so that the buffers are processed
   384     try
   385     {
   386       this.writeSelKey.interestOps(SelectionKey.OP_WRITE);
   387       ChannelWriter.getInstance().getSelector().wakeup();
   388     }
   389     catch (Exception ex) // CancelledKeyException and ChannelCloseException
   390     {
   391       Log.msg("NNTPConnection.writeToChannel(): " + ex, false);
   392       return;
   393     }
   394 
   395     // Update last activity timestamp
   396     this.lastActivity = System.currentTimeMillis();
   397     if(debugLine != null)
   398     {
   399       Log.msg(">> " + debugLine, true);
   400     }
   401   }
   402   
   403   public void println(final CharSequence line)
   404     throws IOException
   405   {
   406     println(line, charset);
   407   }
   408   
   409   public void print(final String line)
   410     throws IOException
   411   {
   412     writeToChannel(CharBuffer.wrap(line), charset, line);
   413   }
   414   
   415   public void setCurrentCharset(final Charset charset)
   416   {
   417     this.charset = charset;
   418   }
   419   
   420 }