org/sonews/daemon/NNTPConnection.java
author chris <chris@marvin>
Wed Jul 01 10:48:22 2009 +0200 (2009-07-01)
changeset 2 1090e2141798
child 3 2fdc9cc89502
permissions -rw-r--r--
sonews/0.5.1 fixes merged
chris@1
     1
/*
chris@1
     2
 *   SONEWS News Server
chris@1
     3
 *   see AUTHORS for the list of contributors
chris@1
     4
 *
chris@1
     5
 *   This program is free software: you can redistribute it and/or modify
chris@1
     6
 *   it under the terms of the GNU General Public License as published by
chris@1
     7
 *   the Free Software Foundation, either version 3 of the License, or
chris@1
     8
 *   (at your option) any later version.
chris@1
     9
 *
chris@1
    10
 *   This program is distributed in the hope that it will be useful,
chris@1
    11
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1
    12
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1
    13
 *   GNU General Public License for more details.
chris@1
    14
 *
chris@1
    15
 *   You should have received a copy of the GNU General Public License
chris@1
    16
 *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1
    17
 */
chris@1
    18
chris@1
    19
package org.sonews.daemon;
chris@1
    20
chris@1
    21
import org.sonews.util.Log;
chris@1
    22
import java.io.IOException;
chris@1
    23
import java.net.InetSocketAddress;
chris@1
    24
import java.nio.ByteBuffer;
chris@1
    25
import java.nio.CharBuffer;
chris@1
    26
import java.nio.channels.ClosedChannelException;
chris@1
    27
import java.nio.channels.SelectionKey;
chris@1
    28
import java.nio.channels.SocketChannel;
chris@1
    29
import java.nio.charset.Charset;
chris@1
    30
import java.util.Timer;
chris@1
    31
import java.util.TimerTask;
chris@1
    32
import org.sonews.daemon.command.ArticleCommand;
chris@1
    33
import org.sonews.daemon.command.CapabilitiesCommand;
chris@1
    34
import org.sonews.daemon.command.AbstractCommand;
chris@1
    35
import org.sonews.daemon.command.GroupCommand;
chris@1
    36
import org.sonews.daemon.command.HelpCommand;
chris@1
    37
import org.sonews.daemon.command.ListCommand;
chris@1
    38
import org.sonews.daemon.command.ListGroupCommand;
chris@1
    39
import org.sonews.daemon.command.ModeReaderCommand;
chris@1
    40
import org.sonews.daemon.command.NewGroupsCommand;
chris@1
    41
import org.sonews.daemon.command.NextPrevCommand;
chris@1
    42
import org.sonews.daemon.command.OverCommand;
chris@1
    43
import org.sonews.daemon.command.PostCommand;
chris@1
    44
import org.sonews.daemon.command.QuitCommand;
chris@1
    45
import org.sonews.daemon.command.StatCommand;
chris@1
    46
import org.sonews.daemon.command.UnsupportedCommand;
chris@1
    47
import org.sonews.daemon.command.XDaemonCommand;
chris@1
    48
import org.sonews.daemon.command.XPatCommand;
chris@1
    49
import org.sonews.daemon.storage.Article;
chris@1
    50
import org.sonews.daemon.storage.Group;
chris@1
    51
import org.sonews.util.Stats;
chris@1
    52
chris@1
    53
/**
chris@1
    54
 * For every SocketChannel (so TCP/IP connection) there is an instance of
chris@1
    55
 * this class.
chris@1
    56
 * @author Christian Lins
chris@1
    57
 * @since sonews/0.5.0
chris@1
    58
 */
chris@1
    59
public final class NNTPConnection
chris@1
    60
{
chris@1
    61
chris@1
    62
  public static final String NEWLINE            = "\r\n";    // RFC defines this as newline
chris@1
    63
  public static final String MESSAGE_ID_PATTERN = "<[^>]+>";
chris@1
    64
  
chris@1
    65
  private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon
chris@1
    66
  
chris@1
    67
  /** SocketChannel is generally thread-safe */
chris@1
    68
  private SocketChannel   channel        = null;
chris@1
    69
  private Charset         charset        = Charset.forName("UTF-8");
chris@1
    70
  private AbstractCommand command        = null;
chris@1
    71
  private Article         currentArticle = null;
chris@1
    72
  private Group           currentGroup   = null;
chris@1
    73
  private volatile long   lastActivity   = System.currentTimeMillis();
chris@1
    74
  private ChannelLineBuffers lineBuffers = new ChannelLineBuffers();
chris@1
    75
  private int             readLock       = 0;
chris@1
    76
  private final Object    readLockGate   = new Object();
chris@1
    77
  private SelectionKey    writeSelKey    = null;
chris@1
    78
  
chris@1
    79
  public NNTPConnection(final SocketChannel channel)
chris@1
    80
    throws IOException
chris@1
    81
  {
chris@1
    82
    if(channel == null)
chris@1
    83
    {
chris@1
    84
      throw new IllegalArgumentException("channel is null");
chris@1
    85
    }
chris@1
    86
chris@1
    87
    this.channel = channel;
chris@1
    88
    Stats.getInstance().clientConnect();
chris@1
    89
  }
chris@1
    90
  
chris@1
    91
  /**
chris@1
    92
   * Tries to get the read lock for this NNTPConnection. This method is Thread-
chris@1
    93
   * safe and returns true of the read lock was successfully set. If the lock
chris@1
    94
   * is still hold by another Thread the method returns false.
chris@1
    95
   */
chris@1
    96
  boolean tryReadLock()
chris@1
    97
  {
chris@1
    98
    // As synchronizing simple types may cause deadlocks,
chris@1
    99
    // we use a gate object.
chris@1
   100
    synchronized(readLockGate)
chris@1
   101
    {
chris@1
   102
      if(readLock != 0)
chris@1
   103
      {
chris@1
   104
        return false;
chris@1
   105
      }
chris@1
   106
      else
chris@1
   107
      {
chris@1
   108
        readLock = Thread.currentThread().hashCode();
chris@1
   109
        return true;
chris@1
   110
      }
chris@1
   111
    }
chris@1
   112
  }
chris@1
   113
  
chris@1
   114
  /**
chris@1
   115
   * Releases the read lock in a Thread-safe way.
chris@1
   116
   * @throws IllegalMonitorStateException if a Thread not holding the lock
chris@1
   117
   * tries to release it.
chris@1
   118
   */
chris@1
   119
  void unlockReadLock()
chris@1
   120
  {
chris@1
   121
    synchronized(readLockGate)
chris@1
   122
    {
chris@1
   123
      if(readLock == Thread.currentThread().hashCode())
chris@1
   124
      {
chris@1
   125
        readLock = 0;
chris@1
   126
      }
chris@1
   127
      else
chris@1
   128
      {
chris@1
   129
        throw new IllegalMonitorStateException();
chris@1
   130
      }
chris@1
   131
    }
chris@1
   132
  }
chris@1
   133
  
chris@1
   134
  /**
chris@1
   135
   * @return Current input buffer of this NNTPConnection instance.
chris@1
   136
   */
chris@1
   137
  public ByteBuffer getInputBuffer()
chris@1
   138
  {
chris@1
   139
    return this.lineBuffers.getInputBuffer();
chris@1
   140
  }
chris@1
   141
  
chris@1
   142
  /**
chris@1
   143
   * @return Output buffer of this NNTPConnection which has at least one byte
chris@1
   144
   * free storage.
chris@1
   145
   */
chris@1
   146
  public ByteBuffer getOutputBuffer()
chris@1
   147
  {
chris@1
   148
    return this.lineBuffers.getOutputBuffer();
chris@1
   149
  }
chris@1
   150
  
chris@1
   151
  /**
chris@1
   152
   * @return ChannelLineBuffers instance associated with this NNTPConnection.
chris@1
   153
   */
chris@1
   154
  public ChannelLineBuffers getBuffers()
chris@1
   155
  {
chris@1
   156
    return this.lineBuffers;
chris@1
   157
  }
chris@1
   158
  
chris@1
   159
  /**
chris@1
   160
   * @return true if this connection comes from a local remote address.
chris@1
   161
   */
chris@1
   162
  public boolean isLocalConnection()
chris@1
   163
  {
chris@1
   164
    return ((InetSocketAddress)this.channel.socket().getRemoteSocketAddress())
chris@1
   165
      .getHostName().equalsIgnoreCase("localhost");
chris@1
   166
  }
chris@1
   167
chris@1
   168
  void setWriteSelectionKey(SelectionKey selKey)
chris@1
   169
  {
chris@1
   170
    this.writeSelKey = selKey;
chris@1
   171
  }
chris@1
   172
chris@1
   173
  public void shutdownInput()
chris@1
   174
  {
chris@1
   175
    try
chris@1
   176
    {
chris@1
   177
      // Closes the input line of the channel's socket, so no new data
chris@1
   178
      // will be received and a timeout can be triggered.
chris@1
   179
      this.channel.socket().shutdownInput();
chris@1
   180
    }
chris@1
   181
    catch(IOException ex)
chris@1
   182
    {
chris@1
   183
      Log.msg("Exception in NNTPConnection.shutdownInput(): " + ex, false);
chris@1
   184
      if(Log.isDebug())
chris@1
   185
      {
chris@1
   186
        ex.printStackTrace();
chris@1
   187
      }
chris@1
   188
    }
chris@1
   189
  }
chris@1
   190
  
chris@1
   191
  public void shutdownOutput()
chris@1
   192
  {
chris@1
   193
    cancelTimer.schedule(new TimerTask() 
chris@1
   194
    {
chris@1
   195
      @Override
chris@1
   196
      public void run()
chris@1
   197
      {
chris@1
   198
        try
chris@1
   199
        {
chris@1
   200
          // Closes the output line of the channel's socket.
chris@1
   201
          channel.socket().shutdownOutput();
chris@1
   202
          channel.close();
chris@1
   203
        }
chris@1
   204
        catch(Exception ex)
chris@1
   205
        {
chris@1
   206
          Log.msg("NNTPConnection.shutdownOutput(): " + ex, false);
chris@1
   207
          if(Log.isDebug())
chris@1
   208
          {
chris@1
   209
            ex.printStackTrace();
chris@1
   210
          }
chris@1
   211
        }
chris@1
   212
      }
chris@1
   213
    }, 3000);
chris@1
   214
  }
chris@1
   215
  
chris@1
   216
  public SocketChannel getChannel()
chris@1
   217
  {
chris@1
   218
    return this.channel;
chris@1
   219
  }
chris@1
   220
  
chris@1
   221
  public Article getCurrentArticle()
chris@1
   222
  {
chris@1
   223
    return this.currentArticle;
chris@1
   224
  }
chris@1
   225
  
chris@1
   226
  public Charset getCurrentCharset()
chris@1
   227
  {
chris@1
   228
    return this.charset;
chris@1
   229
  }
chris@1
   230
  
chris@1
   231
  public Group getCurrentGroup()
chris@1
   232
  {
chris@1
   233
    return this.currentGroup;
chris@1
   234
  }
chris@1
   235
  
chris@1
   236
  public void setCurrentArticle(final Article article)
chris@1
   237
  {
chris@1
   238
    this.currentArticle = article;
chris@1
   239
  }
chris@1
   240
  
chris@1
   241
  public void setCurrentGroup(final Group group)
chris@1
   242
  {
chris@1
   243
    this.currentGroup = group;
chris@1
   244
  }
chris@1
   245
  
chris@1
   246
  public long getLastActivity()
chris@1
   247
  {
chris@1
   248
    return this.lastActivity;
chris@1
   249
  }
chris@1
   250
  
chris@1
   251
  /**
chris@1
   252
   * Due to the readLockGate there is no need to synchronize this method.
chris@1
   253
   * @param raw
chris@1
   254
   * @throws IllegalArgumentException if raw is null.
chris@1
   255
   * @throws IllegalStateException if calling thread does not own the readLock.
chris@1
   256
   */
chris@1
   257
  void lineReceived(byte[] raw)
chris@1
   258
  {
chris@1
   259
    if(raw == null)
chris@1
   260
    {
chris@1
   261
      throw new IllegalArgumentException("raw is null");
chris@1
   262
    }
chris@1
   263
    
chris@1
   264
    if(readLock == 0 || readLock != Thread.currentThread().hashCode())
chris@1
   265
    {
chris@1
   266
      throw new IllegalStateException("readLock not properly set");
chris@1
   267
    }
chris@1
   268
chris@1
   269
    this.lastActivity = System.currentTimeMillis();
chris@1
   270
    
chris@1
   271
    String line = new String(raw, this.charset);
chris@1
   272
    
chris@1
   273
    // There might be a trailing \r, but trim() is a bad idea
chris@1
   274
    // as it removes also leading spaces from long header lines.
chris@1
   275
    if(line.endsWith("\r"))
chris@1
   276
    {
chris@1
   277
      line = line.substring(0, line.length() - 1);
chris@1
   278
    }
chris@1
   279
    
chris@1
   280
    Log.msg("<< " + line, true);
chris@1
   281
    
chris@1
   282
    if(command == null)
chris@1
   283
    {
chris@1
   284
      command = parseCommandLine(line);
chris@1
   285
      assert command != null;
chris@1
   286
    }
chris@1
   287
chris@1
   288
    try
chris@1
   289
    {
chris@1
   290
      // The command object will process the line we just received
chris@1
   291
      command.processLine(line);
chris@1
   292
    }
chris@1
   293
    catch(ClosedChannelException ex0)
chris@1
   294
    {
chris@1
   295
      try
chris@1
   296
      {
chris@1
   297
        Log.msg("Connection to " + channel.socket().getRemoteSocketAddress() 
chris@1
   298
            + " closed: " + ex0, true);
chris@1
   299
      }
chris@1
   300
      catch(Exception ex0a)
chris@1
   301
      {
chris@1
   302
        ex0a.printStackTrace();
chris@1
   303
      }
chris@1
   304
    }
chris@1
   305
    catch(Exception ex1)
chris@1
   306
    {
chris@1
   307
      try
chris@1
   308
      {
chris@1
   309
        command = null;
chris@1
   310
        ex1.printStackTrace();
chris@1
   311
        println("500 Internal server error");
chris@1
   312
      }
chris@1
   313
      catch(Exception ex2)
chris@1
   314
      {
chris@1
   315
        ex2.printStackTrace();
chris@1
   316
      }
chris@1
   317
    }
chris@1
   318
chris@1
   319
    if(command == null || command.hasFinished())
chris@1
   320
    {
chris@1
   321
      command = null;
chris@1
   322
      charset = Charset.forName("UTF-8"); // Reset to default
chris@1
   323
    }
chris@1
   324
  }
chris@1
   325
  
chris@1
   326
  /**
chris@1
   327
   * This method performes several if/elseif constructs to determine the
chris@1
   328
   * fitting command object. 
chris@1
   329
   * TODO: This string comparisons are probably slow!
chris@1
   330
   * @param line
chris@1
   331
   * @return
chris@1
   332
   */
chris@1
   333
  private AbstractCommand parseCommandLine(String line)
chris@1
   334
  {
chris@1
   335
    AbstractCommand  cmd    = new UnsupportedCommand(this);
chris@1
   336
    String   cmdStr = line.split(" ")[0];
chris@1
   337
    
chris@1
   338
    if(cmdStr.equalsIgnoreCase("ARTICLE") || 
chris@1
   339
      cmdStr.equalsIgnoreCase("BODY"))
chris@1
   340
    {
chris@1
   341
      cmd = new ArticleCommand(this);
chris@1
   342
    }
chris@1
   343
    else if(cmdStr.equalsIgnoreCase("CAPABILITIES"))
chris@1
   344
    {
chris@1
   345
      cmd = new CapabilitiesCommand(this);
chris@1
   346
    }
chris@1
   347
    else if(cmdStr.equalsIgnoreCase("GROUP"))
chris@1
   348
    {
chris@1
   349
      cmd = new GroupCommand(this);
chris@1
   350
    }
chris@1
   351
    else if(cmdStr.equalsIgnoreCase("HEAD"))
chris@1
   352
    {
chris@1
   353
      cmd = new ArticleCommand(this);
chris@1
   354
    }
chris@1
   355
    else if(cmdStr.equalsIgnoreCase("HELP"))
chris@1
   356
    {
chris@1
   357
      cmd = new HelpCommand(this);
chris@1
   358
    }
chris@1
   359
    else if(cmdStr.equalsIgnoreCase("LIST"))
chris@1
   360
    {
chris@1
   361
      cmd = new ListCommand(this);
chris@1
   362
    }
chris@1
   363
    else if(cmdStr.equalsIgnoreCase("LISTGROUP"))
chris@1
   364
    {
chris@1
   365
      cmd = new ListGroupCommand(this);
chris@1
   366
    }
chris@1
   367
    else if(cmdStr.equalsIgnoreCase("MODE"))
chris@1
   368
    {
chris@1
   369
      cmd = new ModeReaderCommand(this);
chris@1
   370
    }
chris@1
   371
    else if(cmdStr.equalsIgnoreCase("NEWGROUPS"))
chris@1
   372
    {
chris@1
   373
      cmd = new NewGroupsCommand(this);
chris@1
   374
    }
chris@1
   375
    else if(cmdStr.equalsIgnoreCase("NEXT") ||
chris@1
   376
      cmdStr.equalsIgnoreCase("PREV"))
chris@1
   377
    {
chris@1
   378
      cmd = new NextPrevCommand(this);
chris@1
   379
    }
chris@1
   380
    else if(cmdStr.equalsIgnoreCase("OVER") ||
chris@1
   381
      cmdStr.equalsIgnoreCase("XOVER")) // for compatibility with older RFCs
chris@1
   382
    {
chris@1
   383
      cmd = new OverCommand(this);
chris@1
   384
    }
chris@1
   385
    else if(cmdStr.equalsIgnoreCase("POST"))
chris@1
   386
    {
chris@1
   387
      cmd = new PostCommand(this);
chris@1
   388
    }
chris@1
   389
    else if(cmdStr.equalsIgnoreCase("QUIT"))
chris@1
   390
    {
chris@1
   391
      cmd = new QuitCommand(this);
chris@1
   392
    }
chris@1
   393
    else if(cmdStr.equalsIgnoreCase("STAT"))
chris@1
   394
    {
chris@1
   395
      cmd = new StatCommand(this);
chris@1
   396
    }
chris@1
   397
    else if(cmdStr.equalsIgnoreCase("XDAEMON"))
chris@1
   398
    {
chris@1
   399
      cmd = new XDaemonCommand(this);
chris@1
   400
    }
chris@1
   401
    else if(cmdStr.equalsIgnoreCase("XPAT"))
chris@1
   402
    {
chris@1
   403
      cmd = new XPatCommand(this);
chris@1
   404
    }
chris@1
   405
    
chris@1
   406
    return cmd;
chris@1
   407
  }
chris@1
   408
  
chris@1
   409
  /**
chris@1
   410
   * Puts the given line into the output buffer, adds a newline character
chris@1
   411
   * and returns. The method returns immediately and does not block until
chris@1
   412
   * the line was sent. If line is longer than 510 octets it is split up in
chris@1
   413
   * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE).
chris@1
   414
   * @param line
chris@1
   415
   */
chris@1
   416
  public void println(final CharSequence line, final Charset charset)
chris@1
   417
    throws IOException
chris@1
   418
  {    
chris@1
   419
    writeToChannel(CharBuffer.wrap(line), charset, line);
chris@1
   420
    writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
chris@1
   421
  }
chris@1
   422
  
chris@1
   423
  /**
chris@1
   424
   * Encodes the given CharBuffer using the given Charset to a bunch of
chris@1
   425
   * ByteBuffers (each 512 bytes large) and enqueues them for writing at the
chris@1
   426
   * connected SocketChannel.
chris@1
   427
   * @throws java.io.IOException
chris@1
   428
   */
chris@1
   429
  private void writeToChannel(CharBuffer characters, final Charset charset,
chris@1
   430
    CharSequence debugLine)
chris@1
   431
    throws IOException
chris@1
   432
  {
chris@1
   433
    if(!charset.canEncode())
chris@1
   434
    {
chris@1
   435
      Log.msg("FATAL: Charset " + charset + " cannot encode!", false);
chris@1
   436
      return;
chris@1
   437
    }
chris@1
   438
    
chris@1
   439
    // Write characters to output buffers
chris@1
   440
    LineEncoder lenc = new LineEncoder(characters, charset);
chris@1
   441
    lenc.encode(lineBuffers);
chris@1
   442
    
chris@1
   443
    // Enable OP_WRITE events so that the buffers are processed
chris@1
   444
    try
chris@1
   445
    {
chris@1
   446
      this.writeSelKey.interestOps(SelectionKey.OP_WRITE);
chris@1
   447
      ChannelWriter.getInstance().getSelector().wakeup();
chris@1
   448
    }
chris@1
   449
    catch (Exception ex) // CancelledKeyException and ChannelCloseException
chris@1
   450
    {
chris@1
   451
      Log.msg("NNTPConnection.writeToChannel(): " + ex, false);
chris@1
   452
      return;
chris@1
   453
    }
chris@1
   454
chris@1
   455
    // Update last activity timestamp
chris@1
   456
    this.lastActivity = System.currentTimeMillis();
chris@1
   457
    if(debugLine != null)
chris@1
   458
    {
chris@1
   459
      Log.msg(">> " + debugLine, true);
chris@1
   460
    }
chris@1
   461
  }
chris@1
   462
  
chris@1
   463
  public void println(final CharSequence line)
chris@1
   464
    throws IOException
chris@1
   465
  {
chris@1
   466
    println(line, charset);
chris@1
   467
  }
chris@1
   468
  
chris@1
   469
  public void print(final String line)
chris@1
   470
    throws IOException
chris@1
   471
  {
chris@1
   472
    writeToChannel(CharBuffer.wrap(line), charset, line);
chris@1
   473
  }
chris@1
   474
  
chris@1
   475
  public void setCurrentCharset(final Charset charset)
chris@1
   476
  {
chris@1
   477
    this.charset = charset;
chris@1
   478
  }
chris@1
   479
  
chris@1
   480
}