org/sonews/daemon/NNTPConnection.java
changeset 1 6fceb66e1ad7
child 3 2fdc9cc89502
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/org/sonews/daemon/NNTPConnection.java	Fri Jun 26 16:48:50 2009 +0200
     1.3 @@ -0,0 +1,480 @@
     1.4 +/*
     1.5 + *   SONEWS News Server
     1.6 + *   see AUTHORS for the list of contributors
     1.7 + *
     1.8 + *   This program is free software: you can redistribute it and/or modify
     1.9 + *   it under the terms of the GNU General Public License as published by
    1.10 + *   the Free Software Foundation, either version 3 of the License, or
    1.11 + *   (at your option) any later version.
    1.12 + *
    1.13 + *   This program is distributed in the hope that it will be useful,
    1.14 + *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    1.15 + *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    1.16 + *   GNU General Public License for more details.
    1.17 + *
    1.18 + *   You should have received a copy of the GNU General Public License
    1.19 + *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    1.20 + */
    1.21 +
    1.22 +package org.sonews.daemon;
    1.23 +
    1.24 +import org.sonews.util.Log;
    1.25 +import java.io.IOException;
    1.26 +import java.net.InetSocketAddress;
    1.27 +import java.nio.ByteBuffer;
    1.28 +import java.nio.CharBuffer;
    1.29 +import java.nio.channels.ClosedChannelException;
    1.30 +import java.nio.channels.SelectionKey;
    1.31 +import java.nio.channels.SocketChannel;
    1.32 +import java.nio.charset.Charset;
    1.33 +import java.util.Timer;
    1.34 +import java.util.TimerTask;
    1.35 +import org.sonews.daemon.command.ArticleCommand;
    1.36 +import org.sonews.daemon.command.CapabilitiesCommand;
    1.37 +import org.sonews.daemon.command.AbstractCommand;
    1.38 +import org.sonews.daemon.command.GroupCommand;
    1.39 +import org.sonews.daemon.command.HelpCommand;
    1.40 +import org.sonews.daemon.command.ListCommand;
    1.41 +import org.sonews.daemon.command.ListGroupCommand;
    1.42 +import org.sonews.daemon.command.ModeReaderCommand;
    1.43 +import org.sonews.daemon.command.NewGroupsCommand;
    1.44 +import org.sonews.daemon.command.NextPrevCommand;
    1.45 +import org.sonews.daemon.command.OverCommand;
    1.46 +import org.sonews.daemon.command.PostCommand;
    1.47 +import org.sonews.daemon.command.QuitCommand;
    1.48 +import org.sonews.daemon.command.StatCommand;
    1.49 +import org.sonews.daemon.command.UnsupportedCommand;
    1.50 +import org.sonews.daemon.command.XDaemonCommand;
    1.51 +import org.sonews.daemon.command.XPatCommand;
    1.52 +import org.sonews.daemon.storage.Article;
    1.53 +import org.sonews.daemon.storage.Group;
    1.54 +import org.sonews.util.Stats;
    1.55 +
    1.56 +/**
    1.57 + * For every SocketChannel (so TCP/IP connection) there is an instance of
    1.58 + * this class.
    1.59 + * @author Christian Lins
    1.60 + * @since sonews/0.5.0
    1.61 + */
    1.62 +public final class NNTPConnection
    1.63 +{
    1.64 +
    1.65 +  public static final String NEWLINE            = "\r\n";    // RFC defines this as newline
    1.66 +  public static final String MESSAGE_ID_PATTERN = "<[^>]+>";
    1.67 +  
    1.68 +  private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon
    1.69 +  
    1.70 +  /** SocketChannel is generally thread-safe */
    1.71 +  private SocketChannel   channel        = null;
    1.72 +  private Charset         charset        = Charset.forName("UTF-8");
    1.73 +  private AbstractCommand command        = null;
    1.74 +  private Article         currentArticle = null;
    1.75 +  private Group           currentGroup   = null;
    1.76 +  private volatile long   lastActivity   = System.currentTimeMillis();
    1.77 +  private ChannelLineBuffers lineBuffers = new ChannelLineBuffers();
    1.78 +  private int             readLock       = 0;
    1.79 +  private final Object    readLockGate   = new Object();
    1.80 +  private SelectionKey    writeSelKey    = null;
    1.81 +  
    1.82 +  public NNTPConnection(final SocketChannel channel)
    1.83 +    throws IOException
    1.84 +  {
    1.85 +    if(channel == null)
    1.86 +    {
    1.87 +      throw new IllegalArgumentException("channel is null");
    1.88 +    }
    1.89 +
    1.90 +    this.channel = channel;
    1.91 +    Stats.getInstance().clientConnect();
    1.92 +  }
    1.93 +  
    1.94 +  /**
    1.95 +   * Tries to get the read lock for this NNTPConnection. This method is Thread-
    1.96 +   * safe and returns true of the read lock was successfully set. If the lock
    1.97 +   * is still hold by another Thread the method returns false.
    1.98 +   */
    1.99 +  boolean tryReadLock()
   1.100 +  {
   1.101 +    // As synchronizing simple types may cause deadlocks,
   1.102 +    // we use a gate object.
   1.103 +    synchronized(readLockGate)
   1.104 +    {
   1.105 +      if(readLock != 0)
   1.106 +      {
   1.107 +        return false;
   1.108 +      }
   1.109 +      else
   1.110 +      {
   1.111 +        readLock = Thread.currentThread().hashCode();
   1.112 +        return true;
   1.113 +      }
   1.114 +    }
   1.115 +  }
   1.116 +  
   1.117 +  /**
   1.118 +   * Releases the read lock in a Thread-safe way.
   1.119 +   * @throws IllegalMonitorStateException if a Thread not holding the lock
   1.120 +   * tries to release it.
   1.121 +   */
   1.122 +  void unlockReadLock()
   1.123 +  {
   1.124 +    synchronized(readLockGate)
   1.125 +    {
   1.126 +      if(readLock == Thread.currentThread().hashCode())
   1.127 +      {
   1.128 +        readLock = 0;
   1.129 +      }
   1.130 +      else
   1.131 +      {
   1.132 +        throw new IllegalMonitorStateException();
   1.133 +      }
   1.134 +    }
   1.135 +  }
   1.136 +  
   1.137 +  /**
   1.138 +   * @return Current input buffer of this NNTPConnection instance.
   1.139 +   */
   1.140 +  public ByteBuffer getInputBuffer()
   1.141 +  {
   1.142 +    return this.lineBuffers.getInputBuffer();
   1.143 +  }
   1.144 +  
   1.145 +  /**
   1.146 +   * @return Output buffer of this NNTPConnection which has at least one byte
   1.147 +   * free storage.
   1.148 +   */
   1.149 +  public ByteBuffer getOutputBuffer()
   1.150 +  {
   1.151 +    return this.lineBuffers.getOutputBuffer();
   1.152 +  }
   1.153 +  
   1.154 +  /**
   1.155 +   * @return ChannelLineBuffers instance associated with this NNTPConnection.
   1.156 +   */
   1.157 +  public ChannelLineBuffers getBuffers()
   1.158 +  {
   1.159 +    return this.lineBuffers;
   1.160 +  }
   1.161 +  
   1.162 +  /**
   1.163 +   * @return true if this connection comes from a local remote address.
   1.164 +   */
   1.165 +  public boolean isLocalConnection()
   1.166 +  {
   1.167 +    return ((InetSocketAddress)this.channel.socket().getRemoteSocketAddress())
   1.168 +      .getHostName().equalsIgnoreCase("localhost");
   1.169 +  }
   1.170 +
   1.171 +  void setWriteSelectionKey(SelectionKey selKey)
   1.172 +  {
   1.173 +    this.writeSelKey = selKey;
   1.174 +  }
   1.175 +
   1.176 +  public void shutdownInput()
   1.177 +  {
   1.178 +    try
   1.179 +    {
   1.180 +      // Closes the input line of the channel's socket, so no new data
   1.181 +      // will be received and a timeout can be triggered.
   1.182 +      this.channel.socket().shutdownInput();
   1.183 +    }
   1.184 +    catch(IOException ex)
   1.185 +    {
   1.186 +      Log.msg("Exception in NNTPConnection.shutdownInput(): " + ex, false);
   1.187 +      if(Log.isDebug())
   1.188 +      {
   1.189 +        ex.printStackTrace();
   1.190 +      }
   1.191 +    }
   1.192 +  }
   1.193 +  
   1.194 +  public void shutdownOutput()
   1.195 +  {
   1.196 +    cancelTimer.schedule(new TimerTask() 
   1.197 +    {
   1.198 +      @Override
   1.199 +      public void run()
   1.200 +      {
   1.201 +        try
   1.202 +        {
   1.203 +          // Closes the output line of the channel's socket.
   1.204 +          channel.socket().shutdownOutput();
   1.205 +          channel.close();
   1.206 +        }
   1.207 +        catch(Exception ex)
   1.208 +        {
   1.209 +          Log.msg("NNTPConnection.shutdownOutput(): " + ex, false);
   1.210 +          if(Log.isDebug())
   1.211 +          {
   1.212 +            ex.printStackTrace();
   1.213 +          }
   1.214 +        }
   1.215 +      }
   1.216 +    }, 3000);
   1.217 +  }
   1.218 +  
   1.219 +  public SocketChannel getChannel()
   1.220 +  {
   1.221 +    return this.channel;
   1.222 +  }
   1.223 +  
   1.224 +  public Article getCurrentArticle()
   1.225 +  {
   1.226 +    return this.currentArticle;
   1.227 +  }
   1.228 +  
   1.229 +  public Charset getCurrentCharset()
   1.230 +  {
   1.231 +    return this.charset;
   1.232 +  }
   1.233 +  
   1.234 +  public Group getCurrentGroup()
   1.235 +  {
   1.236 +    return this.currentGroup;
   1.237 +  }
   1.238 +  
   1.239 +  public void setCurrentArticle(final Article article)
   1.240 +  {
   1.241 +    this.currentArticle = article;
   1.242 +  }
   1.243 +  
   1.244 +  public void setCurrentGroup(final Group group)
   1.245 +  {
   1.246 +    this.currentGroup = group;
   1.247 +  }
   1.248 +  
   1.249 +  public long getLastActivity()
   1.250 +  {
   1.251 +    return this.lastActivity;
   1.252 +  }
   1.253 +  
   1.254 +  /**
   1.255 +   * Due to the readLockGate there is no need to synchronize this method.
   1.256 +   * @param raw
   1.257 +   * @throws IllegalArgumentException if raw is null.
   1.258 +   * @throws IllegalStateException if calling thread does not own the readLock.
   1.259 +   */
   1.260 +  void lineReceived(byte[] raw)
   1.261 +  {
   1.262 +    if(raw == null)
   1.263 +    {
   1.264 +      throw new IllegalArgumentException("raw is null");
   1.265 +    }
   1.266 +    
   1.267 +    if(readLock == 0 || readLock != Thread.currentThread().hashCode())
   1.268 +    {
   1.269 +      throw new IllegalStateException("readLock not properly set");
   1.270 +    }
   1.271 +
   1.272 +    this.lastActivity = System.currentTimeMillis();
   1.273 +    
   1.274 +    String line = new String(raw, this.charset);
   1.275 +    
   1.276 +    // There might be a trailing \r, but trim() is a bad idea
   1.277 +    // as it removes also leading spaces from long header lines.
   1.278 +    if(line.endsWith("\r"))
   1.279 +    {
   1.280 +      line = line.substring(0, line.length() - 1);
   1.281 +    }
   1.282 +    
   1.283 +    Log.msg("<< " + line, true);
   1.284 +    
   1.285 +    if(command == null)
   1.286 +    {
   1.287 +      command = parseCommandLine(line);
   1.288 +      assert command != null;
   1.289 +    }
   1.290 +
   1.291 +    try
   1.292 +    {
   1.293 +      // The command object will process the line we just received
   1.294 +      command.processLine(line);
   1.295 +    }
   1.296 +    catch(ClosedChannelException ex0)
   1.297 +    {
   1.298 +      try
   1.299 +      {
   1.300 +        Log.msg("Connection to " + channel.socket().getRemoteSocketAddress() 
   1.301 +            + " closed: " + ex0, true);
   1.302 +      }
   1.303 +      catch(Exception ex0a)
   1.304 +      {
   1.305 +        ex0a.printStackTrace();
   1.306 +      }
   1.307 +    }
   1.308 +    catch(Exception ex1)
   1.309 +    {
   1.310 +      try
   1.311 +      {
   1.312 +        command = null;
   1.313 +        ex1.printStackTrace();
   1.314 +        println("500 Internal server error");
   1.315 +      }
   1.316 +      catch(Exception ex2)
   1.317 +      {
   1.318 +        ex2.printStackTrace();
   1.319 +      }
   1.320 +    }
   1.321 +
   1.322 +    if(command == null || command.hasFinished())
   1.323 +    {
   1.324 +      command = null;
   1.325 +      charset = Charset.forName("UTF-8"); // Reset to default
   1.326 +    }
   1.327 +  }
   1.328 +  
   1.329 +  /**
   1.330 +   * This method performes several if/elseif constructs to determine the
   1.331 +   * fitting command object. 
   1.332 +   * TODO: This string comparisons are probably slow!
   1.333 +   * @param line
   1.334 +   * @return
   1.335 +   */
   1.336 +  private AbstractCommand parseCommandLine(String line)
   1.337 +  {
   1.338 +    AbstractCommand  cmd    = new UnsupportedCommand(this);
   1.339 +    String   cmdStr = line.split(" ")[0];
   1.340 +    
   1.341 +    if(cmdStr.equalsIgnoreCase("ARTICLE") || 
   1.342 +      cmdStr.equalsIgnoreCase("BODY"))
   1.343 +    {
   1.344 +      cmd = new ArticleCommand(this);
   1.345 +    }
   1.346 +    else if(cmdStr.equalsIgnoreCase("CAPABILITIES"))
   1.347 +    {
   1.348 +      cmd = new CapabilitiesCommand(this);
   1.349 +    }
   1.350 +    else if(cmdStr.equalsIgnoreCase("GROUP"))
   1.351 +    {
   1.352 +      cmd = new GroupCommand(this);
   1.353 +    }
   1.354 +    else if(cmdStr.equalsIgnoreCase("HEAD"))
   1.355 +    {
   1.356 +      cmd = new ArticleCommand(this);
   1.357 +    }
   1.358 +    else if(cmdStr.equalsIgnoreCase("HELP"))
   1.359 +    {
   1.360 +      cmd = new HelpCommand(this);
   1.361 +    }
   1.362 +    else if(cmdStr.equalsIgnoreCase("LIST"))
   1.363 +    {
   1.364 +      cmd = new ListCommand(this);
   1.365 +    }
   1.366 +    else if(cmdStr.equalsIgnoreCase("LISTGROUP"))
   1.367 +    {
   1.368 +      cmd = new ListGroupCommand(this);
   1.369 +    }
   1.370 +    else if(cmdStr.equalsIgnoreCase("MODE"))
   1.371 +    {
   1.372 +      cmd = new ModeReaderCommand(this);
   1.373 +    }
   1.374 +    else if(cmdStr.equalsIgnoreCase("NEWGROUPS"))
   1.375 +    {
   1.376 +      cmd = new NewGroupsCommand(this);
   1.377 +    }
   1.378 +    else if(cmdStr.equalsIgnoreCase("NEXT") ||
   1.379 +      cmdStr.equalsIgnoreCase("PREV"))
   1.380 +    {
   1.381 +      cmd = new NextPrevCommand(this);
   1.382 +    }
   1.383 +    else if(cmdStr.equalsIgnoreCase("OVER") ||
   1.384 +      cmdStr.equalsIgnoreCase("XOVER")) // for compatibility with older RFCs
   1.385 +    {
   1.386 +      cmd = new OverCommand(this);
   1.387 +    }
   1.388 +    else if(cmdStr.equalsIgnoreCase("POST"))
   1.389 +    {
   1.390 +      cmd = new PostCommand(this);
   1.391 +    }
   1.392 +    else if(cmdStr.equalsIgnoreCase("QUIT"))
   1.393 +    {
   1.394 +      cmd = new QuitCommand(this);
   1.395 +    }
   1.396 +    else if(cmdStr.equalsIgnoreCase("STAT"))
   1.397 +    {
   1.398 +      cmd = new StatCommand(this);
   1.399 +    }
   1.400 +    else if(cmdStr.equalsIgnoreCase("XDAEMON"))
   1.401 +    {
   1.402 +      cmd = new XDaemonCommand(this);
   1.403 +    }
   1.404 +    else if(cmdStr.equalsIgnoreCase("XPAT"))
   1.405 +    {
   1.406 +      cmd = new XPatCommand(this);
   1.407 +    }
   1.408 +    
   1.409 +    return cmd;
   1.410 +  }
   1.411 +  
   1.412 +  /**
   1.413 +   * Puts the given line into the output buffer, adds a newline character
   1.414 +   * and returns. The method returns immediately and does not block until
   1.415 +   * the line was sent. If line is longer than 510 octets it is split up in
   1.416 +   * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE).
   1.417 +   * @param line
   1.418 +   */
   1.419 +  public void println(final CharSequence line, final Charset charset)
   1.420 +    throws IOException
   1.421 +  {    
   1.422 +    writeToChannel(CharBuffer.wrap(line), charset, line);
   1.423 +    writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
   1.424 +  }
   1.425 +  
   1.426 +  /**
   1.427 +   * Encodes the given CharBuffer using the given Charset to a bunch of
   1.428 +   * ByteBuffers (each 512 bytes large) and enqueues them for writing at the
   1.429 +   * connected SocketChannel.
   1.430 +   * @throws java.io.IOException
   1.431 +   */
   1.432 +  private void writeToChannel(CharBuffer characters, final Charset charset,
   1.433 +    CharSequence debugLine)
   1.434 +    throws IOException
   1.435 +  {
   1.436 +    if(!charset.canEncode())
   1.437 +    {
   1.438 +      Log.msg("FATAL: Charset " + charset + " cannot encode!", false);
   1.439 +      return;
   1.440 +    }
   1.441 +    
   1.442 +    // Write characters to output buffers
   1.443 +    LineEncoder lenc = new LineEncoder(characters, charset);
   1.444 +    lenc.encode(lineBuffers);
   1.445 +    
   1.446 +    // Enable OP_WRITE events so that the buffers are processed
   1.447 +    try
   1.448 +    {
   1.449 +      this.writeSelKey.interestOps(SelectionKey.OP_WRITE);
   1.450 +      ChannelWriter.getInstance().getSelector().wakeup();
   1.451 +    }
   1.452 +    catch (Exception ex) // CancelledKeyException and ChannelCloseException
   1.453 +    {
   1.454 +      Log.msg("NNTPConnection.writeToChannel(): " + ex, false);
   1.455 +      return;
   1.456 +    }
   1.457 +
   1.458 +    // Update last activity timestamp
   1.459 +    this.lastActivity = System.currentTimeMillis();
   1.460 +    if(debugLine != null)
   1.461 +    {
   1.462 +      Log.msg(">> " + debugLine, true);
   1.463 +    }
   1.464 +  }
   1.465 +  
   1.466 +  public void println(final CharSequence line)
   1.467 +    throws IOException
   1.468 +  {
   1.469 +    println(line, charset);
   1.470 +  }
   1.471 +  
   1.472 +  public void print(final String line)
   1.473 +    throws IOException
   1.474 +  {
   1.475 +    writeToChannel(CharBuffer.wrap(line), charset, line);
   1.476 +  }
   1.477 +  
   1.478 +  public void setCurrentCharset(final Charset charset)
   1.479 +  {
   1.480 +    this.charset = charset;
   1.481 +  }
   1.482 +  
   1.483 +}