org/sonews/daemon/NNTPConnection.java
author cli
Fri Dec 25 15:42:46 2009 +0100 (2009-12-25)
changeset 25 dd05c3f2fa24
parent 15 f2293e8566f5
child 30 146b3275b792
permissions -rw-r--r--
Fix for too early disconnects on slow client connections. (#563)
chris@1
     1
/*
chris@1
     2
 *   SONEWS News Server
chris@1
     3
 *   see AUTHORS for the list of contributors
chris@1
     4
 *
chris@1
     5
 *   This program is free software: you can redistribute it and/or modify
chris@1
     6
 *   it under the terms of the GNU General Public License as published by
chris@1
     7
 *   the Free Software Foundation, either version 3 of the License, or
chris@1
     8
 *   (at your option) any later version.
chris@1
     9
 *
chris@1
    10
 *   This program is distributed in the hope that it will be useful,
chris@1
    11
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1
    12
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1
    13
 *   GNU General Public License for more details.
chris@1
    14
 *
chris@1
    15
 *   You should have received a copy of the GNU General Public License
chris@1
    16
 *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1
    17
 */
chris@1
    18
chris@1
    19
package org.sonews.daemon;
chris@1
    20
chris@1
    21
import java.io.IOException;
chris@1
    22
import java.net.InetSocketAddress;
chris@3
    23
import java.net.SocketException;
chris@1
    24
import java.nio.ByteBuffer;
chris@1
    25
import java.nio.CharBuffer;
chris@1
    26
import java.nio.channels.ClosedChannelException;
chris@1
    27
import java.nio.channels.SelectionKey;
chris@1
    28
import java.nio.channels.SocketChannel;
chris@1
    29
import java.nio.charset.Charset;
chris@3
    30
import java.util.Arrays;
chris@1
    31
import java.util.Timer;
chris@1
    32
import java.util.TimerTask;
chris@3
    33
import org.sonews.daemon.command.Command;
chris@3
    34
import org.sonews.storage.Article;
chris@3
    35
import org.sonews.storage.Channel;
cli@25
    36
import org.sonews.util.Log;
chris@1
    37
import org.sonews.util.Stats;
chris@1
    38
chris@1
    39
/**
chris@1
    40
 * For every SocketChannel (so TCP/IP connection) there is an instance of
chris@1
    41
 * this class.
chris@1
    42
 * @author Christian Lins
chris@1
    43
 * @since sonews/0.5.0
chris@1
    44
 */
chris@1
    45
public final class NNTPConnection
chris@1
    46
{
chris@1
    47
chris@1
    48
  public static final String NEWLINE            = "\r\n";    // RFC defines this as newline
chris@1
    49
  public static final String MESSAGE_ID_PATTERN = "<[^>]+>";
chris@1
    50
  
chris@1
    51
  private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon
chris@1
    52
  
chris@1
    53
  /** SocketChannel is generally thread-safe */
chris@1
    54
  private SocketChannel   channel        = null;
chris@1
    55
  private Charset         charset        = Charset.forName("UTF-8");
chris@3
    56
  private Command         command        = null;
chris@1
    57
  private Article         currentArticle = null;
chris@3
    58
  private Channel         currentGroup   = null;
chris@1
    59
  private volatile long   lastActivity   = System.currentTimeMillis();
chris@1
    60
  private ChannelLineBuffers lineBuffers = new ChannelLineBuffers();
chris@1
    61
  private int             readLock       = 0;
chris@1
    62
  private final Object    readLockGate   = new Object();
chris@1
    63
  private SelectionKey    writeSelKey    = null;
chris@1
    64
  
chris@1
    65
  public NNTPConnection(final SocketChannel channel)
chris@1
    66
    throws IOException
chris@1
    67
  {
chris@1
    68
    if(channel == null)
chris@1
    69
    {
chris@1
    70
      throw new IllegalArgumentException("channel is null");
chris@1
    71
    }
chris@1
    72
chris@1
    73
    this.channel = channel;
chris@1
    74
    Stats.getInstance().clientConnect();
chris@1
    75
  }
chris@1
    76
  
chris@1
    77
  /**
chris@1
    78
   * Tries to get the read lock for this NNTPConnection. This method is Thread-
chris@1
    79
   * safe and returns true of the read lock was successfully set. If the lock
chris@1
    80
   * is still hold by another Thread the method returns false.
chris@1
    81
   */
chris@1
    82
  boolean tryReadLock()
chris@1
    83
  {
chris@1
    84
    // As synchronizing simple types may cause deadlocks,
chris@1
    85
    // we use a gate object.
chris@1
    86
    synchronized(readLockGate)
chris@1
    87
    {
chris@1
    88
      if(readLock != 0)
chris@1
    89
      {
chris@1
    90
        return false;
chris@1
    91
      }
chris@1
    92
      else
chris@1
    93
      {
chris@1
    94
        readLock = Thread.currentThread().hashCode();
chris@1
    95
        return true;
chris@1
    96
      }
chris@1
    97
    }
chris@1
    98
  }
chris@1
    99
  
chris@1
   100
  /**
chris@1
   101
   * Releases the read lock in a Thread-safe way.
chris@1
   102
   * @throws IllegalMonitorStateException if a Thread not holding the lock
chris@1
   103
   * tries to release it.
chris@1
   104
   */
chris@1
   105
  void unlockReadLock()
chris@1
   106
  {
chris@1
   107
    synchronized(readLockGate)
chris@1
   108
    {
chris@1
   109
      if(readLock == Thread.currentThread().hashCode())
chris@1
   110
      {
chris@1
   111
        readLock = 0;
chris@1
   112
      }
chris@1
   113
      else
chris@1
   114
      {
chris@1
   115
        throw new IllegalMonitorStateException();
chris@1
   116
      }
chris@1
   117
    }
chris@1
   118
  }
chris@1
   119
  
chris@1
   120
  /**
chris@1
   121
   * @return Current input buffer of this NNTPConnection instance.
chris@1
   122
   */
chris@1
   123
  public ByteBuffer getInputBuffer()
chris@1
   124
  {
chris@1
   125
    return this.lineBuffers.getInputBuffer();
chris@1
   126
  }
chris@1
   127
  
chris@1
   128
  /**
chris@1
   129
   * @return Output buffer of this NNTPConnection which has at least one byte
chris@1
   130
   * free storage.
chris@1
   131
   */
chris@1
   132
  public ByteBuffer getOutputBuffer()
chris@1
   133
  {
chris@1
   134
    return this.lineBuffers.getOutputBuffer();
chris@1
   135
  }
chris@1
   136
  
chris@1
   137
  /**
chris@1
   138
   * @return ChannelLineBuffers instance associated with this NNTPConnection.
chris@1
   139
   */
chris@1
   140
  public ChannelLineBuffers getBuffers()
chris@1
   141
  {
chris@1
   142
    return this.lineBuffers;
chris@1
   143
  }
chris@1
   144
  
chris@1
   145
  /**
chris@1
   146
   * @return true if this connection comes from a local remote address.
chris@1
   147
   */
chris@1
   148
  public boolean isLocalConnection()
chris@1
   149
  {
chris@1
   150
    return ((InetSocketAddress)this.channel.socket().getRemoteSocketAddress())
chris@1
   151
      .getHostName().equalsIgnoreCase("localhost");
chris@1
   152
  }
chris@1
   153
chris@1
   154
  void setWriteSelectionKey(SelectionKey selKey)
chris@1
   155
  {
chris@1
   156
    this.writeSelKey = selKey;
chris@1
   157
  }
chris@1
   158
chris@1
   159
  public void shutdownInput()
chris@1
   160
  {
chris@1
   161
    try
chris@1
   162
    {
chris@1
   163
      // Closes the input line of the channel's socket, so no new data
chris@1
   164
      // will be received and a timeout can be triggered.
chris@1
   165
      this.channel.socket().shutdownInput();
chris@1
   166
    }
chris@1
   167
    catch(IOException ex)
chris@1
   168
    {
cli@15
   169
      Log.get().warning("Exception in NNTPConnection.shutdownInput(): " + ex);
chris@1
   170
    }
chris@1
   171
  }
chris@1
   172
  
chris@1
   173
  public void shutdownOutput()
chris@1
   174
  {
chris@1
   175
    cancelTimer.schedule(new TimerTask() 
chris@1
   176
    {
chris@1
   177
      @Override
chris@1
   178
      public void run()
chris@1
   179
      {
chris@1
   180
        try
chris@1
   181
        {
chris@1
   182
          // Closes the output line of the channel's socket.
chris@1
   183
          channel.socket().shutdownOutput();
chris@1
   184
          channel.close();
chris@1
   185
        }
chris@3
   186
        catch(SocketException ex)
chris@3
   187
        {
chris@3
   188
          // Socket was already disconnected
cli@15
   189
          Log.get().info("NNTPConnection.shutdownOutput(): " + ex);
chris@3
   190
        }
chris@1
   191
        catch(Exception ex)
chris@1
   192
        {
cli@15
   193
          Log.get().warning("NNTPConnection.shutdownOutput(): " + ex);
chris@1
   194
        }
chris@1
   195
      }
chris@1
   196
    }, 3000);
chris@1
   197
  }
chris@1
   198
  
chris@3
   199
  public SocketChannel getSocketChannel()
chris@1
   200
  {
chris@1
   201
    return this.channel;
chris@1
   202
  }
chris@1
   203
  
chris@1
   204
  public Article getCurrentArticle()
chris@1
   205
  {
chris@1
   206
    return this.currentArticle;
chris@1
   207
  }
chris@1
   208
  
chris@1
   209
  public Charset getCurrentCharset()
chris@1
   210
  {
chris@1
   211
    return this.charset;
chris@1
   212
  }
chris@3
   213
chris@3
   214
  /**
chris@3
   215
   * @return The currently selected communication channel (not SocketChannel)
chris@3
   216
   */
chris@3
   217
  public Channel getCurrentChannel()
chris@1
   218
  {
chris@1
   219
    return this.currentGroup;
chris@1
   220
  }
chris@1
   221
  
chris@1
   222
  public void setCurrentArticle(final Article article)
chris@1
   223
  {
chris@1
   224
    this.currentArticle = article;
chris@1
   225
  }
chris@1
   226
  
chris@3
   227
  public void setCurrentGroup(final Channel group)
chris@1
   228
  {
chris@1
   229
    this.currentGroup = group;
chris@1
   230
  }
chris@1
   231
  
chris@1
   232
  public long getLastActivity()
chris@1
   233
  {
chris@1
   234
    return this.lastActivity;
chris@1
   235
  }
chris@1
   236
  
chris@1
   237
  /**
chris@1
   238
   * Due to the readLockGate there is no need to synchronize this method.
chris@1
   239
   * @param raw
chris@1
   240
   * @throws IllegalArgumentException if raw is null.
chris@1
   241
   * @throws IllegalStateException if calling thread does not own the readLock.
chris@1
   242
   */
chris@1
   243
  void lineReceived(byte[] raw)
chris@1
   244
  {
chris@1
   245
    if(raw == null)
chris@1
   246
    {
chris@1
   247
      throw new IllegalArgumentException("raw is null");
chris@1
   248
    }
chris@1
   249
    
chris@1
   250
    if(readLock == 0 || readLock != Thread.currentThread().hashCode())
chris@1
   251
    {
chris@1
   252
      throw new IllegalStateException("readLock not properly set");
chris@1
   253
    }
chris@1
   254
chris@1
   255
    this.lastActivity = System.currentTimeMillis();
chris@1
   256
    
chris@1
   257
    String line = new String(raw, this.charset);
chris@1
   258
    
chris@1
   259
    // There might be a trailing \r, but trim() is a bad idea
chris@1
   260
    // as it removes also leading spaces from long header lines.
chris@1
   261
    if(line.endsWith("\r"))
chris@1
   262
    {
chris@1
   263
      line = line.substring(0, line.length() - 1);
chris@3
   264
      raw  = Arrays.copyOf(raw, raw.length - 1);
chris@1
   265
    }
chris@1
   266
    
cli@15
   267
    Log.get().fine("<< " + line);
chris@1
   268
    
chris@1
   269
    if(command == null)
chris@1
   270
    {
chris@1
   271
      command = parseCommandLine(line);
chris@1
   272
      assert command != null;
chris@1
   273
    }
chris@1
   274
chris@1
   275
    try
chris@1
   276
    {
chris@1
   277
      // The command object will process the line we just received
chris@3
   278
      command.processLine(this, line, raw);
chris@1
   279
    }
chris@1
   280
    catch(ClosedChannelException ex0)
chris@1
   281
    {
chris@1
   282
      try
chris@1
   283
      {
cli@15
   284
        Log.get().info("Connection to " + channel.socket().getRemoteSocketAddress()
cli@15
   285
            + " closed: " + ex0);
chris@1
   286
      }
chris@1
   287
      catch(Exception ex0a)
chris@1
   288
      {
chris@1
   289
        ex0a.printStackTrace();
chris@1
   290
      }
chris@1
   291
    }
chris@1
   292
    catch(Exception ex1)
chris@1
   293
    {
chris@1
   294
      try
chris@1
   295
      {
chris@1
   296
        command = null;
chris@1
   297
        ex1.printStackTrace();
chris@1
   298
        println("500 Internal server error");
chris@1
   299
      }
chris@1
   300
      catch(Exception ex2)
chris@1
   301
      {
chris@1
   302
        ex2.printStackTrace();
chris@1
   303
      }
chris@1
   304
    }
chris@1
   305
chris@1
   306
    if(command == null || command.hasFinished())
chris@1
   307
    {
chris@1
   308
      command = null;
chris@1
   309
      charset = Charset.forName("UTF-8"); // Reset to default
chris@1
   310
    }
chris@1
   311
  }
chris@1
   312
  
chris@1
   313
  /**
chris@3
   314
   * This method determines the fitting command processing class.
chris@1
   315
   * @param line
chris@1
   316
   * @return
chris@1
   317
   */
chris@3
   318
  private Command parseCommandLine(String line)
chris@1
   319
  {
chris@3
   320
    String cmdStr = line.split(" ")[0];
chris@3
   321
    return CommandSelector.getInstance().get(cmdStr);
chris@1
   322
  }
chris@1
   323
  
chris@1
   324
  /**
chris@1
   325
   * Puts the given line into the output buffer, adds a newline character
chris@1
   326
   * and returns. The method returns immediately and does not block until
chris@1
   327
   * the line was sent. If line is longer than 510 octets it is split up in
chris@1
   328
   * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE).
chris@1
   329
   * @param line
chris@1
   330
   */
chris@1
   331
  public void println(final CharSequence line, final Charset charset)
chris@1
   332
    throws IOException
chris@1
   333
  {    
chris@1
   334
    writeToChannel(CharBuffer.wrap(line), charset, line);
chris@1
   335
    writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
chris@1
   336
  }
chris@3
   337
chris@3
   338
  /**
chris@3
   339
   * Writes the given raw lines to the output buffers and finishes with
chris@3
   340
   * a newline character (\r\n).
chris@3
   341
   * @param rawLines
chris@3
   342
   */
chris@3
   343
  public void println(final byte[] rawLines)
chris@3
   344
    throws IOException
chris@3
   345
  {
chris@3
   346
    this.lineBuffers.addOutputBuffer(ByteBuffer.wrap(rawLines));
chris@3
   347
    writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
chris@3
   348
  }
chris@1
   349
  
chris@1
   350
  /**
chris@1
   351
   * Encodes the given CharBuffer using the given Charset to a bunch of
chris@1
   352
   * ByteBuffers (each 512 bytes large) and enqueues them for writing at the
chris@1
   353
   * connected SocketChannel.
chris@1
   354
   * @throws java.io.IOException
chris@1
   355
   */
chris@1
   356
  private void writeToChannel(CharBuffer characters, final Charset charset,
chris@1
   357
    CharSequence debugLine)
chris@1
   358
    throws IOException
chris@1
   359
  {
chris@1
   360
    if(!charset.canEncode())
chris@1
   361
    {
cli@15
   362
      Log.get().severe("FATAL: Charset " + charset + " cannot encode!");
chris@1
   363
      return;
chris@1
   364
    }
chris@1
   365
    
chris@1
   366
    // Write characters to output buffers
chris@1
   367
    LineEncoder lenc = new LineEncoder(characters, charset);
chris@1
   368
    lenc.encode(lineBuffers);
chris@1
   369
    
chris@3
   370
    enableWriteEvents(debugLine);
chris@3
   371
  }
chris@3
   372
chris@3
   373
  private void enableWriteEvents(CharSequence debugLine)
chris@3
   374
  {
chris@1
   375
    // Enable OP_WRITE events so that the buffers are processed
chris@1
   376
    try
chris@1
   377
    {
chris@1
   378
      this.writeSelKey.interestOps(SelectionKey.OP_WRITE);
chris@1
   379
      ChannelWriter.getInstance().getSelector().wakeup();
chris@1
   380
    }
cli@15
   381
    catch(Exception ex) // CancelledKeyException and ChannelCloseException
chris@1
   382
    {
cli@15
   383
      Log.get().warning("NNTPConnection.writeToChannel(): " + ex);
chris@1
   384
      return;
chris@1
   385
    }
chris@1
   386
chris@1
   387
    // Update last activity timestamp
chris@1
   388
    this.lastActivity = System.currentTimeMillis();
chris@1
   389
    if(debugLine != null)
chris@1
   390
    {
cli@15
   391
      Log.get().fine(">> " + debugLine);
chris@1
   392
    }
chris@1
   393
  }
chris@1
   394
  
chris@1
   395
  public void println(final CharSequence line)
chris@1
   396
    throws IOException
chris@1
   397
  {
chris@1
   398
    println(line, charset);
chris@1
   399
  }
chris@1
   400
  
chris@1
   401
  public void print(final String line)
chris@1
   402
    throws IOException
chris@1
   403
  {
chris@1
   404
    writeToChannel(CharBuffer.wrap(line), charset, line);
chris@1
   405
  }
chris@1
   406
  
chris@1
   407
  public void setCurrentCharset(final Charset charset)
chris@1
   408
  {
chris@1
   409
    this.charset = charset;
chris@1
   410
  }
cli@25
   411
cli@25
   412
  void setLastActivity(long timestamp)
cli@25
   413
  {
cli@25
   414
    this.lastActivity = timestamp;
cli@25
   415
  }
chris@1
   416
  
chris@1
   417
}