org/sonews/daemon/NNTPConnection.java
author chris <chris@marvin>
Fri Jun 26 16:48:50 2009 +0200 (2009-06-26)
changeset 1 6fceb66e1ad7
child 3 2fdc9cc89502
permissions -rw-r--r--
Hooray... sonews/0.5.0 final

HG: Enter commit message. Lines beginning with 'HG:' are removed.
HG: Remove all lines to abort the collapse operation.
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.daemon;
    20 
    21 import org.sonews.util.Log;
    22 import java.io.IOException;
    23 import java.net.InetSocketAddress;
    24 import java.nio.ByteBuffer;
    25 import java.nio.CharBuffer;
    26 import java.nio.channels.ClosedChannelException;
    27 import java.nio.channels.SelectionKey;
    28 import java.nio.channels.SocketChannel;
    29 import java.nio.charset.Charset;
    30 import java.util.Timer;
    31 import java.util.TimerTask;
    32 import org.sonews.daemon.command.ArticleCommand;
    33 import org.sonews.daemon.command.CapabilitiesCommand;
    34 import org.sonews.daemon.command.AbstractCommand;
    35 import org.sonews.daemon.command.GroupCommand;
    36 import org.sonews.daemon.command.HelpCommand;
    37 import org.sonews.daemon.command.ListCommand;
    38 import org.sonews.daemon.command.ListGroupCommand;
    39 import org.sonews.daemon.command.ModeReaderCommand;
    40 import org.sonews.daemon.command.NewGroupsCommand;
    41 import org.sonews.daemon.command.NextPrevCommand;
    42 import org.sonews.daemon.command.OverCommand;
    43 import org.sonews.daemon.command.PostCommand;
    44 import org.sonews.daemon.command.QuitCommand;
    45 import org.sonews.daemon.command.StatCommand;
    46 import org.sonews.daemon.command.UnsupportedCommand;
    47 import org.sonews.daemon.command.XDaemonCommand;
    48 import org.sonews.daemon.command.XPatCommand;
    49 import org.sonews.daemon.storage.Article;
    50 import org.sonews.daemon.storage.Group;
    51 import org.sonews.util.Stats;
    52 
    53 /**
    54  * For every SocketChannel (so TCP/IP connection) there is an instance of
    55  * this class.
    56  * @author Christian Lins
    57  * @since sonews/0.5.0
    58  */
    59 public final class NNTPConnection
    60 {
    61 
    62   public static final String NEWLINE            = "\r\n";    // RFC defines this as newline
    63   public static final String MESSAGE_ID_PATTERN = "<[^>]+>";
    64   
    65   private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon
    66   
    67   /** SocketChannel is generally thread-safe */
    68   private SocketChannel   channel        = null;
    69   private Charset         charset        = Charset.forName("UTF-8");
    70   private AbstractCommand command        = null;
    71   private Article         currentArticle = null;
    72   private Group           currentGroup   = null;
    73   private volatile long   lastActivity   = System.currentTimeMillis();
    74   private ChannelLineBuffers lineBuffers = new ChannelLineBuffers();
    75   private int             readLock       = 0;
    76   private final Object    readLockGate   = new Object();
    77   private SelectionKey    writeSelKey    = null;
    78   
    79   public NNTPConnection(final SocketChannel channel)
    80     throws IOException
    81   {
    82     if(channel == null)
    83     {
    84       throw new IllegalArgumentException("channel is null");
    85     }
    86 
    87     this.channel = channel;
    88     Stats.getInstance().clientConnect();
    89   }
    90   
    91   /**
    92    * Tries to get the read lock for this NNTPConnection. This method is Thread-
    93    * safe and returns true of the read lock was successfully set. If the lock
    94    * is still hold by another Thread the method returns false.
    95    */
    96   boolean tryReadLock()
    97   {
    98     // As synchronizing simple types may cause deadlocks,
    99     // we use a gate object.
   100     synchronized(readLockGate)
   101     {
   102       if(readLock != 0)
   103       {
   104         return false;
   105       }
   106       else
   107       {
   108         readLock = Thread.currentThread().hashCode();
   109         return true;
   110       }
   111     }
   112   }
   113   
   114   /**
   115    * Releases the read lock in a Thread-safe way.
   116    * @throws IllegalMonitorStateException if a Thread not holding the lock
   117    * tries to release it.
   118    */
   119   void unlockReadLock()
   120   {
   121     synchronized(readLockGate)
   122     {
   123       if(readLock == Thread.currentThread().hashCode())
   124       {
   125         readLock = 0;
   126       }
   127       else
   128       {
   129         throw new IllegalMonitorStateException();
   130       }
   131     }
   132   }
   133   
   134   /**
   135    * @return Current input buffer of this NNTPConnection instance.
   136    */
   137   public ByteBuffer getInputBuffer()
   138   {
   139     return this.lineBuffers.getInputBuffer();
   140   }
   141   
   142   /**
   143    * @return Output buffer of this NNTPConnection which has at least one byte
   144    * free storage.
   145    */
   146   public ByteBuffer getOutputBuffer()
   147   {
   148     return this.lineBuffers.getOutputBuffer();
   149   }
   150   
   151   /**
   152    * @return ChannelLineBuffers instance associated with this NNTPConnection.
   153    */
   154   public ChannelLineBuffers getBuffers()
   155   {
   156     return this.lineBuffers;
   157   }
   158   
   159   /**
   160    * @return true if this connection comes from a local remote address.
   161    */
   162   public boolean isLocalConnection()
   163   {
   164     return ((InetSocketAddress)this.channel.socket().getRemoteSocketAddress())
   165       .getHostName().equalsIgnoreCase("localhost");
   166   }
   167 
   168   void setWriteSelectionKey(SelectionKey selKey)
   169   {
   170     this.writeSelKey = selKey;
   171   }
   172 
   173   public void shutdownInput()
   174   {
   175     try
   176     {
   177       // Closes the input line of the channel's socket, so no new data
   178       // will be received and a timeout can be triggered.
   179       this.channel.socket().shutdownInput();
   180     }
   181     catch(IOException ex)
   182     {
   183       Log.msg("Exception in NNTPConnection.shutdownInput(): " + ex, false);
   184       if(Log.isDebug())
   185       {
   186         ex.printStackTrace();
   187       }
   188     }
   189   }
   190   
   191   public void shutdownOutput()
   192   {
   193     cancelTimer.schedule(new TimerTask() 
   194     {
   195       @Override
   196       public void run()
   197       {
   198         try
   199         {
   200           // Closes the output line of the channel's socket.
   201           channel.socket().shutdownOutput();
   202           channel.close();
   203         }
   204         catch(Exception ex)
   205         {
   206           Log.msg("NNTPConnection.shutdownOutput(): " + ex, false);
   207           if(Log.isDebug())
   208           {
   209             ex.printStackTrace();
   210           }
   211         }
   212       }
   213     }, 3000);
   214   }
   215   
   216   public SocketChannel getChannel()
   217   {
   218     return this.channel;
   219   }
   220   
   221   public Article getCurrentArticle()
   222   {
   223     return this.currentArticle;
   224   }
   225   
   226   public Charset getCurrentCharset()
   227   {
   228     return this.charset;
   229   }
   230   
   231   public Group getCurrentGroup()
   232   {
   233     return this.currentGroup;
   234   }
   235   
   236   public void setCurrentArticle(final Article article)
   237   {
   238     this.currentArticle = article;
   239   }
   240   
   241   public void setCurrentGroup(final Group group)
   242   {
   243     this.currentGroup = group;
   244   }
   245   
   246   public long getLastActivity()
   247   {
   248     return this.lastActivity;
   249   }
   250   
   251   /**
   252    * Due to the readLockGate there is no need to synchronize this method.
   253    * @param raw
   254    * @throws IllegalArgumentException if raw is null.
   255    * @throws IllegalStateException if calling thread does not own the readLock.
   256    */
   257   void lineReceived(byte[] raw)
   258   {
   259     if(raw == null)
   260     {
   261       throw new IllegalArgumentException("raw is null");
   262     }
   263     
   264     if(readLock == 0 || readLock != Thread.currentThread().hashCode())
   265     {
   266       throw new IllegalStateException("readLock not properly set");
   267     }
   268 
   269     this.lastActivity = System.currentTimeMillis();
   270     
   271     String line = new String(raw, this.charset);
   272     
   273     // There might be a trailing \r, but trim() is a bad idea
   274     // as it removes also leading spaces from long header lines.
   275     if(line.endsWith("\r"))
   276     {
   277       line = line.substring(0, line.length() - 1);
   278     }
   279     
   280     Log.msg("<< " + line, true);
   281     
   282     if(command == null)
   283     {
   284       command = parseCommandLine(line);
   285       assert command != null;
   286     }
   287 
   288     try
   289     {
   290       // The command object will process the line we just received
   291       command.processLine(line);
   292     }
   293     catch(ClosedChannelException ex0)
   294     {
   295       try
   296       {
   297         Log.msg("Connection to " + channel.socket().getRemoteSocketAddress() 
   298             + " closed: " + ex0, true);
   299       }
   300       catch(Exception ex0a)
   301       {
   302         ex0a.printStackTrace();
   303       }
   304     }
   305     catch(Exception ex1)
   306     {
   307       try
   308       {
   309         command = null;
   310         ex1.printStackTrace();
   311         println("500 Internal server error");
   312       }
   313       catch(Exception ex2)
   314       {
   315         ex2.printStackTrace();
   316       }
   317     }
   318 
   319     if(command == null || command.hasFinished())
   320     {
   321       command = null;
   322       charset = Charset.forName("UTF-8"); // Reset to default
   323     }
   324   }
   325   
   326   /**
   327    * This method performes several if/elseif constructs to determine the
   328    * fitting command object. 
   329    * TODO: This string comparisons are probably slow!
   330    * @param line
   331    * @return
   332    */
   333   private AbstractCommand parseCommandLine(String line)
   334   {
   335     AbstractCommand  cmd    = new UnsupportedCommand(this);
   336     String   cmdStr = line.split(" ")[0];
   337     
   338     if(cmdStr.equalsIgnoreCase("ARTICLE") || 
   339       cmdStr.equalsIgnoreCase("BODY"))
   340     {
   341       cmd = new ArticleCommand(this);
   342     }
   343     else if(cmdStr.equalsIgnoreCase("CAPABILITIES"))
   344     {
   345       cmd = new CapabilitiesCommand(this);
   346     }
   347     else if(cmdStr.equalsIgnoreCase("GROUP"))
   348     {
   349       cmd = new GroupCommand(this);
   350     }
   351     else if(cmdStr.equalsIgnoreCase("HEAD"))
   352     {
   353       cmd = new ArticleCommand(this);
   354     }
   355     else if(cmdStr.equalsIgnoreCase("HELP"))
   356     {
   357       cmd = new HelpCommand(this);
   358     }
   359     else if(cmdStr.equalsIgnoreCase("LIST"))
   360     {
   361       cmd = new ListCommand(this);
   362     }
   363     else if(cmdStr.equalsIgnoreCase("LISTGROUP"))
   364     {
   365       cmd = new ListGroupCommand(this);
   366     }
   367     else if(cmdStr.equalsIgnoreCase("MODE"))
   368     {
   369       cmd = new ModeReaderCommand(this);
   370     }
   371     else if(cmdStr.equalsIgnoreCase("NEWGROUPS"))
   372     {
   373       cmd = new NewGroupsCommand(this);
   374     }
   375     else if(cmdStr.equalsIgnoreCase("NEXT") ||
   376       cmdStr.equalsIgnoreCase("PREV"))
   377     {
   378       cmd = new NextPrevCommand(this);
   379     }
   380     else if(cmdStr.equalsIgnoreCase("OVER") ||
   381       cmdStr.equalsIgnoreCase("XOVER")) // for compatibility with older RFCs
   382     {
   383       cmd = new OverCommand(this);
   384     }
   385     else if(cmdStr.equalsIgnoreCase("POST"))
   386     {
   387       cmd = new PostCommand(this);
   388     }
   389     else if(cmdStr.equalsIgnoreCase("QUIT"))
   390     {
   391       cmd = new QuitCommand(this);
   392     }
   393     else if(cmdStr.equalsIgnoreCase("STAT"))
   394     {
   395       cmd = new StatCommand(this);
   396     }
   397     else if(cmdStr.equalsIgnoreCase("XDAEMON"))
   398     {
   399       cmd = new XDaemonCommand(this);
   400     }
   401     else if(cmdStr.equalsIgnoreCase("XPAT"))
   402     {
   403       cmd = new XPatCommand(this);
   404     }
   405     
   406     return cmd;
   407   }
   408   
   409   /**
   410    * Puts the given line into the output buffer, adds a newline character
   411    * and returns. The method returns immediately and does not block until
   412    * the line was sent. If line is longer than 510 octets it is split up in
   413    * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE).
   414    * @param line
   415    */
   416   public void println(final CharSequence line, final Charset charset)
   417     throws IOException
   418   {    
   419     writeToChannel(CharBuffer.wrap(line), charset, line);
   420     writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
   421   }
   422   
   423   /**
   424    * Encodes the given CharBuffer using the given Charset to a bunch of
   425    * ByteBuffers (each 512 bytes large) and enqueues them for writing at the
   426    * connected SocketChannel.
   427    * @throws java.io.IOException
   428    */
   429   private void writeToChannel(CharBuffer characters, final Charset charset,
   430     CharSequence debugLine)
   431     throws IOException
   432   {
   433     if(!charset.canEncode())
   434     {
   435       Log.msg("FATAL: Charset " + charset + " cannot encode!", false);
   436       return;
   437     }
   438     
   439     // Write characters to output buffers
   440     LineEncoder lenc = new LineEncoder(characters, charset);
   441     lenc.encode(lineBuffers);
   442     
   443     // Enable OP_WRITE events so that the buffers are processed
   444     try
   445     {
   446       this.writeSelKey.interestOps(SelectionKey.OP_WRITE);
   447       ChannelWriter.getInstance().getSelector().wakeup();
   448     }
   449     catch (Exception ex) // CancelledKeyException and ChannelCloseException
   450     {
   451       Log.msg("NNTPConnection.writeToChannel(): " + ex, false);
   452       return;
   453     }
   454 
   455     // Update last activity timestamp
   456     this.lastActivity = System.currentTimeMillis();
   457     if(debugLine != null)
   458     {
   459       Log.msg(">> " + debugLine, true);
   460     }
   461   }
   462   
   463   public void println(final CharSequence line)
   464     throws IOException
   465   {
   466     println(line, charset);
   467   }
   468   
   469   public void print(final String line)
   470     throws IOException
   471   {
   472     writeToChannel(CharBuffer.wrap(line), charset, line);
   473   }
   474   
   475   public void setCurrentCharset(final Charset charset)
   476   {
   477     this.charset = charset;
   478   }
   479   
   480 }