src/org/sonews/daemon/NNTPConnection.java
changeset 35 ed84c8bdd87b
parent 30 146b3275b792
child 37 74139325d305
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/src/org/sonews/daemon/NNTPConnection.java	Sun Aug 29 17:28:58 2010 +0200
     1.3 @@ -0,0 +1,428 @@
     1.4 +/*
     1.5 + *   SONEWS News Server
     1.6 + *   see AUTHORS for the list of contributors
     1.7 + *
     1.8 + *   This program is free software: you can redistribute it and/or modify
     1.9 + *   it under the terms of the GNU General Public License as published by
    1.10 + *   the Free Software Foundation, either version 3 of the License, or
    1.11 + *   (at your option) any later version.
    1.12 + *
    1.13 + *   This program is distributed in the hope that it will be useful,
    1.14 + *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    1.15 + *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    1.16 + *   GNU General Public License for more details.
    1.17 + *
    1.18 + *   You should have received a copy of the GNU General Public License
    1.19 + *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    1.20 + */
    1.21 +
    1.22 +package org.sonews.daemon;
    1.23 +
    1.24 +import java.io.IOException;
    1.25 +import java.net.InetSocketAddress;
    1.26 +import java.net.SocketException;
    1.27 +import java.nio.ByteBuffer;
    1.28 +import java.nio.CharBuffer;
    1.29 +import java.nio.channels.ClosedChannelException;
    1.30 +import java.nio.channels.SelectionKey;
    1.31 +import java.nio.channels.SocketChannel;
    1.32 +import java.nio.charset.Charset;
    1.33 +import java.util.Arrays;
    1.34 +import java.util.Timer;
    1.35 +import java.util.TimerTask;
    1.36 +import org.sonews.daemon.command.Command;
    1.37 +import org.sonews.storage.Article;
    1.38 +import org.sonews.storage.Channel;
    1.39 +import org.sonews.storage.StorageBackendException;
    1.40 +import org.sonews.util.Log;
    1.41 +import org.sonews.util.Stats;
    1.42 +
    1.43 +/**
    1.44 + * For every SocketChannel (so TCP/IP connection) there is an instance of
    1.45 + * this class.
    1.46 + * @author Christian Lins
    1.47 + * @since sonews/0.5.0
    1.48 + */
    1.49 +public final class NNTPConnection
    1.50 +{
    1.51 +
    1.52 +  public static final String NEWLINE            = "\r\n";    // RFC defines this as newline
    1.53 +  public static final String MESSAGE_ID_PATTERN = "<[^>]+>";
    1.54 +  
    1.55 +  private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon
    1.56 +  
    1.57 +  /** SocketChannel is generally thread-safe */
    1.58 +  private SocketChannel   channel        = null;
    1.59 +  private Charset         charset        = Charset.forName("UTF-8");
    1.60 +  private Command         command        = null;
    1.61 +  private Article         currentArticle = null;
    1.62 +  private Channel         currentGroup   = null;
    1.63 +  private volatile long   lastActivity   = System.currentTimeMillis();
    1.64 +  private ChannelLineBuffers lineBuffers = new ChannelLineBuffers();
    1.65 +  private int             readLock       = 0;
    1.66 +  private final Object    readLockGate   = new Object();
    1.67 +  private SelectionKey    writeSelKey    = null;
    1.68 +  
    1.69 +  public NNTPConnection(final SocketChannel channel)
    1.70 +    throws IOException
    1.71 +  {
    1.72 +    if(channel == null)
    1.73 +    {
    1.74 +      throw new IllegalArgumentException("channel is null");
    1.75 +    }
    1.76 +
    1.77 +    this.channel = channel;
    1.78 +    Stats.getInstance().clientConnect();
    1.79 +  }
    1.80 +  
    1.81 +  /**
    1.82 +   * Tries to get the read lock for this NNTPConnection. This method is Thread-
    1.83 +   * safe and returns true of the read lock was successfully set. If the lock
    1.84 +   * is still hold by another Thread the method returns false.
    1.85 +   */
    1.86 +  boolean tryReadLock()
    1.87 +  {
    1.88 +    // As synchronizing simple types may cause deadlocks,
    1.89 +    // we use a gate object.
    1.90 +    synchronized(readLockGate)
    1.91 +    {
    1.92 +      if(readLock != 0)
    1.93 +      {
    1.94 +        return false;
    1.95 +      }
    1.96 +      else
    1.97 +      {
    1.98 +        readLock = Thread.currentThread().hashCode();
    1.99 +        return true;
   1.100 +      }
   1.101 +    }
   1.102 +  }
   1.103 +  
   1.104 +  /**
   1.105 +   * Releases the read lock in a Thread-safe way.
   1.106 +   * @throws IllegalMonitorStateException if a Thread not holding the lock
   1.107 +   * tries to release it.
   1.108 +   */
   1.109 +  void unlockReadLock()
   1.110 +  {
   1.111 +    synchronized(readLockGate)
   1.112 +    {
   1.113 +      if(readLock == Thread.currentThread().hashCode())
   1.114 +      {
   1.115 +        readLock = 0;
   1.116 +      }
   1.117 +      else
   1.118 +      {
   1.119 +        throw new IllegalMonitorStateException();
   1.120 +      }
   1.121 +    }
   1.122 +  }
   1.123 +  
   1.124 +  /**
   1.125 +   * @return Current input buffer of this NNTPConnection instance.
   1.126 +   */
   1.127 +  public ByteBuffer getInputBuffer()
   1.128 +  {
   1.129 +    return this.lineBuffers.getInputBuffer();
   1.130 +  }
   1.131 +  
   1.132 +  /**
   1.133 +   * @return Output buffer of this NNTPConnection which has at least one byte
   1.134 +   * free storage.
   1.135 +   */
   1.136 +  public ByteBuffer getOutputBuffer()
   1.137 +  {
   1.138 +    return this.lineBuffers.getOutputBuffer();
   1.139 +  }
   1.140 +  
   1.141 +  /**
   1.142 +   * @return ChannelLineBuffers instance associated with this NNTPConnection.
   1.143 +   */
   1.144 +  public ChannelLineBuffers getBuffers()
   1.145 +  {
   1.146 +    return this.lineBuffers;
   1.147 +  }
   1.148 +  
   1.149 +  /**
   1.150 +   * @return true if this connection comes from a local remote address.
   1.151 +   */
   1.152 +  public boolean isLocalConnection()
   1.153 +  {
   1.154 +    return ((InetSocketAddress)this.channel.socket().getRemoteSocketAddress())
   1.155 +      .getHostName().equalsIgnoreCase("localhost");
   1.156 +  }
   1.157 +
   1.158 +  void setWriteSelectionKey(SelectionKey selKey)
   1.159 +  {
   1.160 +    this.writeSelKey = selKey;
   1.161 +  }
   1.162 +
   1.163 +  public void shutdownInput()
   1.164 +  {
   1.165 +    try
   1.166 +    {
   1.167 +      // Closes the input line of the channel's socket, so no new data
   1.168 +      // will be received and a timeout can be triggered.
   1.169 +      this.channel.socket().shutdownInput();
   1.170 +    }
   1.171 +    catch(IOException ex)
   1.172 +    {
   1.173 +      Log.get().warning("Exception in NNTPConnection.shutdownInput(): " + ex);
   1.174 +    }
   1.175 +  }
   1.176 +  
   1.177 +  public void shutdownOutput()
   1.178 +  {
   1.179 +    cancelTimer.schedule(new TimerTask() 
   1.180 +    {
   1.181 +      @Override
   1.182 +      public void run()
   1.183 +      {
   1.184 +        try
   1.185 +        {
   1.186 +          // Closes the output line of the channel's socket.
   1.187 +          channel.socket().shutdownOutput();
   1.188 +          channel.close();
   1.189 +        }
   1.190 +        catch(SocketException ex)
   1.191 +        {
   1.192 +          // Socket was already disconnected
   1.193 +          Log.get().info("NNTPConnection.shutdownOutput(): " + ex);
   1.194 +        }
   1.195 +        catch(Exception ex)
   1.196 +        {
   1.197 +          Log.get().warning("NNTPConnection.shutdownOutput(): " + ex);
   1.198 +        }
   1.199 +      }
   1.200 +    }, 3000);
   1.201 +  }
   1.202 +  
   1.203 +  public SocketChannel getSocketChannel()
   1.204 +  {
   1.205 +    return this.channel;
   1.206 +  }
   1.207 +  
   1.208 +  public Article getCurrentArticle()
   1.209 +  {
   1.210 +    return this.currentArticle;
   1.211 +  }
   1.212 +  
   1.213 +  public Charset getCurrentCharset()
   1.214 +  {
   1.215 +    return this.charset;
   1.216 +  }
   1.217 +
   1.218 +  /**
   1.219 +   * @return The currently selected communication channel (not SocketChannel)
   1.220 +   */
   1.221 +  public Channel getCurrentChannel()
   1.222 +  {
   1.223 +    return this.currentGroup;
   1.224 +  }
   1.225 +  
   1.226 +  public void setCurrentArticle(final Article article)
   1.227 +  {
   1.228 +    this.currentArticle = article;
   1.229 +  }
   1.230 +  
   1.231 +  public void setCurrentGroup(final Channel group)
   1.232 +  {
   1.233 +    this.currentGroup = group;
   1.234 +  }
   1.235 +  
   1.236 +  public long getLastActivity()
   1.237 +  {
   1.238 +    return this.lastActivity;
   1.239 +  }
   1.240 +  
   1.241 +  /**
   1.242 +   * Due to the readLockGate there is no need to synchronize this method.
   1.243 +   * @param raw
   1.244 +   * @throws IllegalArgumentException if raw is null.
   1.245 +   * @throws IllegalStateException if calling thread does not own the readLock.
   1.246 +   */
   1.247 +  void lineReceived(byte[] raw)
   1.248 +  {
   1.249 +    if(raw == null)
   1.250 +    {
   1.251 +      throw new IllegalArgumentException("raw is null");
   1.252 +    }
   1.253 +    
   1.254 +    if(readLock == 0 || readLock != Thread.currentThread().hashCode())
   1.255 +    {
   1.256 +      throw new IllegalStateException("readLock not properly set");
   1.257 +    }
   1.258 +
   1.259 +    this.lastActivity = System.currentTimeMillis();
   1.260 +    
   1.261 +    String line = new String(raw, this.charset);
   1.262 +    
   1.263 +    // There might be a trailing \r, but trim() is a bad idea
   1.264 +    // as it removes also leading spaces from long header lines.
   1.265 +    if(line.endsWith("\r"))
   1.266 +    {
   1.267 +      line = line.substring(0, line.length() - 1);
   1.268 +      raw  = Arrays.copyOf(raw, raw.length - 1);
   1.269 +    }
   1.270 +    
   1.271 +    Log.get().fine("<< " + line);
   1.272 +    
   1.273 +    if(command == null)
   1.274 +    {
   1.275 +      command = parseCommandLine(line);
   1.276 +      assert command != null;
   1.277 +    }
   1.278 +
   1.279 +    try
   1.280 +    {
   1.281 +      // The command object will process the line we just received
   1.282 +      try
   1.283 +      {
   1.284 +        command.processLine(this, line, raw);
   1.285 +      }
   1.286 +      catch(StorageBackendException ex)
   1.287 +      {
   1.288 +        Log.get().info("Retry command processing after StorageBackendException");
   1.289 +
   1.290 +        // Try it a second time, so that the backend has time to recover
   1.291 +        command.processLine(this, line, raw);
   1.292 +      }
   1.293 +    }
   1.294 +    catch(ClosedChannelException ex0)
   1.295 +    {
   1.296 +      try
   1.297 +      {
   1.298 +        Log.get().info("Connection to " + channel.socket().getRemoteSocketAddress()
   1.299 +            + " closed: " + ex0);
   1.300 +      }
   1.301 +      catch(Exception ex0a)
   1.302 +      {
   1.303 +        ex0a.printStackTrace();
   1.304 +      }
   1.305 +    }
   1.306 +    catch(Exception ex1) // This will catch a second StorageBackendException
   1.307 +    {
   1.308 +      try
   1.309 +      {
   1.310 +        command = null;
   1.311 +        ex1.printStackTrace();
   1.312 +        println("500 Internal server error");
   1.313 +      }
   1.314 +      catch(Exception ex2)
   1.315 +      {
   1.316 +        ex2.printStackTrace();
   1.317 +      }
   1.318 +    }
   1.319 +
   1.320 +    if(command == null || command.hasFinished())
   1.321 +    {
   1.322 +      command = null;
   1.323 +      charset = Charset.forName("UTF-8"); // Reset to default
   1.324 +    }
   1.325 +  }
   1.326 +  
   1.327 +  /**
   1.328 +   * This method determines the fitting command processing class.
   1.329 +   * @param line
   1.330 +   * @return
   1.331 +   */
   1.332 +  private Command parseCommandLine(String line)
   1.333 +  {
   1.334 +    String cmdStr = line.split(" ")[0];
   1.335 +    return CommandSelector.getInstance().get(cmdStr);
   1.336 +  }
   1.337 +  
   1.338 +  /**
   1.339 +   * Puts the given line into the output buffer, adds a newline character
   1.340 +   * and returns. The method returns immediately and does not block until
   1.341 +   * the line was sent. If line is longer than 510 octets it is split up in
   1.342 +   * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE).
   1.343 +   * @param line
   1.344 +   */
   1.345 +  public void println(final CharSequence line, final Charset charset)
   1.346 +    throws IOException
   1.347 +  {    
   1.348 +    writeToChannel(CharBuffer.wrap(line), charset, line);
   1.349 +    writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
   1.350 +  }
   1.351 +
   1.352 +  /**
   1.353 +   * Writes the given raw lines to the output buffers and finishes with
   1.354 +   * a newline character (\r\n).
   1.355 +   * @param rawLines
   1.356 +   */
   1.357 +  public void println(final byte[] rawLines)
   1.358 +    throws IOException
   1.359 +  {
   1.360 +    this.lineBuffers.addOutputBuffer(ByteBuffer.wrap(rawLines));
   1.361 +    writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
   1.362 +  }
   1.363 +  
   1.364 +  /**
   1.365 +   * Encodes the given CharBuffer using the given Charset to a bunch of
   1.366 +   * ByteBuffers (each 512 bytes large) and enqueues them for writing at the
   1.367 +   * connected SocketChannel.
   1.368 +   * @throws java.io.IOException
   1.369 +   */
   1.370 +  private void writeToChannel(CharBuffer characters, final Charset charset,
   1.371 +    CharSequence debugLine)
   1.372 +    throws IOException
   1.373 +  {
   1.374 +    if(!charset.canEncode())
   1.375 +    {
   1.376 +      Log.get().severe("FATAL: Charset " + charset + " cannot encode!");
   1.377 +      return;
   1.378 +    }
   1.379 +    
   1.380 +    // Write characters to output buffers
   1.381 +    LineEncoder lenc = new LineEncoder(characters, charset);
   1.382 +    lenc.encode(lineBuffers);
   1.383 +    
   1.384 +    enableWriteEvents(debugLine);
   1.385 +  }
   1.386 +
   1.387 +  private void enableWriteEvents(CharSequence debugLine)
   1.388 +  {
   1.389 +    // Enable OP_WRITE events so that the buffers are processed
   1.390 +    try
   1.391 +    {
   1.392 +      this.writeSelKey.interestOps(SelectionKey.OP_WRITE);
   1.393 +      ChannelWriter.getInstance().getSelector().wakeup();
   1.394 +    }
   1.395 +    catch(Exception ex) // CancelledKeyException and ChannelCloseException
   1.396 +    {
   1.397 +      Log.get().warning("NNTPConnection.writeToChannel(): " + ex);
   1.398 +      return;
   1.399 +    }
   1.400 +
   1.401 +    // Update last activity timestamp
   1.402 +    this.lastActivity = System.currentTimeMillis();
   1.403 +    if(debugLine != null)
   1.404 +    {
   1.405 +      Log.get().fine(">> " + debugLine);
   1.406 +    }
   1.407 +  }
   1.408 +  
   1.409 +  public void println(final CharSequence line)
   1.410 +    throws IOException
   1.411 +  {
   1.412 +    println(line, charset);
   1.413 +  }
   1.414 +  
   1.415 +  public void print(final String line)
   1.416 +    throws IOException
   1.417 +  {
   1.418 +    writeToChannel(CharBuffer.wrap(line), charset, line);
   1.419 +  }
   1.420 +  
   1.421 +  public void setCurrentCharset(final Charset charset)
   1.422 +  {
   1.423 +    this.charset = charset;
   1.424 +  }
   1.425 +
   1.426 +  void setLastActivity(long timestamp)
   1.427 +  {
   1.428 +    this.lastActivity = timestamp;
   1.429 +  }
   1.430 +  
   1.431 +}