org/sonews/daemon/NNTPConnection.java
author cli
Wed Aug 12 13:03:23 2009 +0200 (2009-08-12)
changeset 7 0b76e099eb96
parent 1 6fceb66e1ad7
child 15 f2293e8566f5
permissions -rw-r--r--
PullFeeder sends an addition "MODE READER" to peers.
chris@1
     1
/*
chris@1
     2
 *   SONEWS News Server
chris@1
     3
 *   see AUTHORS for the list of contributors
chris@1
     4
 *
chris@1
     5
 *   This program is free software: you can redistribute it and/or modify
chris@1
     6
 *   it under the terms of the GNU General Public License as published by
chris@1
     7
 *   the Free Software Foundation, either version 3 of the License, or
chris@1
     8
 *   (at your option) any later version.
chris@1
     9
 *
chris@1
    10
 *   This program is distributed in the hope that it will be useful,
chris@1
    11
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1
    12
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1
    13
 *   GNU General Public License for more details.
chris@1
    14
 *
chris@1
    15
 *   You should have received a copy of the GNU General Public License
chris@1
    16
 *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1
    17
 */
chris@1
    18
chris@1
    19
package org.sonews.daemon;
chris@1
    20
chris@1
    21
import org.sonews.util.Log;
chris@1
    22
import java.io.IOException;
chris@1
    23
import java.net.InetSocketAddress;
chris@3
    24
import java.net.SocketException;
chris@1
    25
import java.nio.ByteBuffer;
chris@1
    26
import java.nio.CharBuffer;
chris@1
    27
import java.nio.channels.ClosedChannelException;
chris@1
    28
import java.nio.channels.SelectionKey;
chris@1
    29
import java.nio.channels.SocketChannel;
chris@1
    30
import java.nio.charset.Charset;
chris@3
    31
import java.util.Arrays;
chris@1
    32
import java.util.Timer;
chris@1
    33
import java.util.TimerTask;
chris@3
    34
import org.sonews.daemon.command.Command;
chris@3
    35
import org.sonews.storage.Article;
chris@3
    36
import org.sonews.storage.Channel;
chris@1
    37
import org.sonews.util.Stats;
chris@1
    38
chris@1
    39
/**
chris@1
    40
 * For every SocketChannel (so TCP/IP connection) there is an instance of
chris@1
    41
 * this class.
chris@1
    42
 * @author Christian Lins
chris@1
    43
 * @since sonews/0.5.0
chris@1
    44
 */
chris@1
    45
public final class NNTPConnection
chris@1
    46
{
chris@1
    47
chris@1
    48
  public static final String NEWLINE            = "\r\n";    // RFC defines this as newline
chris@1
    49
  public static final String MESSAGE_ID_PATTERN = "<[^>]+>";
chris@1
    50
  
chris@1
    51
  private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon
chris@1
    52
  
chris@1
    53
  /** SocketChannel is generally thread-safe */
chris@1
    54
  private SocketChannel   channel        = null;
chris@1
    55
  private Charset         charset        = Charset.forName("UTF-8");
chris@3
    56
  private Command         command        = null;
chris@1
    57
  private Article         currentArticle = null;
chris@3
    58
  private Channel         currentGroup   = null;
chris@1
    59
  private volatile long   lastActivity   = System.currentTimeMillis();
chris@1
    60
  private ChannelLineBuffers lineBuffers = new ChannelLineBuffers();
chris@1
    61
  private int             readLock       = 0;
chris@1
    62
  private final Object    readLockGate   = new Object();
chris@1
    63
  private SelectionKey    writeSelKey    = null;
chris@1
    64
  
chris@1
    65
  public NNTPConnection(final SocketChannel channel)
chris@1
    66
    throws IOException
chris@1
    67
  {
chris@1
    68
    if(channel == null)
chris@1
    69
    {
chris@1
    70
      throw new IllegalArgumentException("channel is null");
chris@1
    71
    }
chris@1
    72
chris@1
    73
    this.channel = channel;
chris@1
    74
    Stats.getInstance().clientConnect();
chris@1
    75
  }
chris@1
    76
  
chris@1
    77
  /**
chris@1
    78
   * Tries to get the read lock for this NNTPConnection. This method is Thread-
chris@1
    79
   * safe and returns true of the read lock was successfully set. If the lock
chris@1
    80
   * is still hold by another Thread the method returns false.
chris@1
    81
   */
chris@1
    82
  boolean tryReadLock()
chris@1
    83
  {
chris@1
    84
    // As synchronizing simple types may cause deadlocks,
chris@1
    85
    // we use a gate object.
chris@1
    86
    synchronized(readLockGate)
chris@1
    87
    {
chris@1
    88
      if(readLock != 0)
chris@1
    89
      {
chris@1
    90
        return false;
chris@1
    91
      }
chris@1
    92
      else
chris@1
    93
      {
chris@1
    94
        readLock = Thread.currentThread().hashCode();
chris@1
    95
        return true;
chris@1
    96
      }
chris@1
    97
    }
chris@1
    98
  }
chris@1
    99
  
chris@1
   100
  /**
chris@1
   101
   * Releases the read lock in a Thread-safe way.
chris@1
   102
   * @throws IllegalMonitorStateException if a Thread not holding the lock
chris@1
   103
   * tries to release it.
chris@1
   104
   */
chris@1
   105
  void unlockReadLock()
chris@1
   106
  {
chris@1
   107
    synchronized(readLockGate)
chris@1
   108
    {
chris@1
   109
      if(readLock == Thread.currentThread().hashCode())
chris@1
   110
      {
chris@1
   111
        readLock = 0;
chris@1
   112
      }
chris@1
   113
      else
chris@1
   114
      {
chris@1
   115
        throw new IllegalMonitorStateException();
chris@1
   116
      }
chris@1
   117
    }
chris@1
   118
  }
chris@1
   119
  
chris@1
   120
  /**
chris@1
   121
   * @return Current input buffer of this NNTPConnection instance.
chris@1
   122
   */
chris@1
   123
  public ByteBuffer getInputBuffer()
chris@1
   124
  {
chris@1
   125
    return this.lineBuffers.getInputBuffer();
chris@1
   126
  }
chris@1
   127
  
chris@1
   128
  /**
chris@1
   129
   * @return Output buffer of this NNTPConnection which has at least one byte
chris@1
   130
   * free storage.
chris@1
   131
   */
chris@1
   132
  public ByteBuffer getOutputBuffer()
chris@1
   133
  {
chris@1
   134
    return this.lineBuffers.getOutputBuffer();
chris@1
   135
  }
chris@1
   136
  
chris@1
   137
  /**
chris@1
   138
   * @return ChannelLineBuffers instance associated with this NNTPConnection.
chris@1
   139
   */
chris@1
   140
  public ChannelLineBuffers getBuffers()
chris@1
   141
  {
chris@1
   142
    return this.lineBuffers;
chris@1
   143
  }
chris@1
   144
  
chris@1
   145
  /**
chris@1
   146
   * @return true if this connection comes from a local remote address.
chris@1
   147
   */
chris@1
   148
  public boolean isLocalConnection()
chris@1
   149
  {
chris@1
   150
    return ((InetSocketAddress)this.channel.socket().getRemoteSocketAddress())
chris@1
   151
      .getHostName().equalsIgnoreCase("localhost");
chris@1
   152
  }
chris@1
   153
chris@1
   154
  void setWriteSelectionKey(SelectionKey selKey)
chris@1
   155
  {
chris@1
   156
    this.writeSelKey = selKey;
chris@1
   157
  }
chris@1
   158
chris@1
   159
  public void shutdownInput()
chris@1
   160
  {
chris@1
   161
    try
chris@1
   162
    {
chris@1
   163
      // Closes the input line of the channel's socket, so no new data
chris@1
   164
      // will be received and a timeout can be triggered.
chris@1
   165
      this.channel.socket().shutdownInput();
chris@1
   166
    }
chris@1
   167
    catch(IOException ex)
chris@1
   168
    {
chris@1
   169
      Log.msg("Exception in NNTPConnection.shutdownInput(): " + ex, false);
chris@1
   170
      if(Log.isDebug())
chris@1
   171
      {
chris@1
   172
        ex.printStackTrace();
chris@1
   173
      }
chris@1
   174
    }
chris@1
   175
  }
chris@1
   176
  
chris@1
   177
  public void shutdownOutput()
chris@1
   178
  {
chris@1
   179
    cancelTimer.schedule(new TimerTask() 
chris@1
   180
    {
chris@1
   181
      @Override
chris@1
   182
      public void run()
chris@1
   183
      {
chris@1
   184
        try
chris@1
   185
        {
chris@1
   186
          // Closes the output line of the channel's socket.
chris@1
   187
          channel.socket().shutdownOutput();
chris@1
   188
          channel.close();
chris@1
   189
        }
chris@3
   190
        catch(SocketException ex)
chris@3
   191
        {
chris@3
   192
          // Socket was already disconnected
chris@3
   193
          Log.msg("NNTPConnection.shutdownOutput(): " + ex, true);
chris@3
   194
        }
chris@1
   195
        catch(Exception ex)
chris@1
   196
        {
chris@1
   197
          Log.msg("NNTPConnection.shutdownOutput(): " + ex, false);
chris@1
   198
          if(Log.isDebug())
chris@1
   199
          {
chris@1
   200
            ex.printStackTrace();
chris@1
   201
          }
chris@1
   202
        }
chris@1
   203
      }
chris@1
   204
    }, 3000);
chris@1
   205
  }
chris@1
   206
  
chris@3
   207
  public SocketChannel getSocketChannel()
chris@1
   208
  {
chris@1
   209
    return this.channel;
chris@1
   210
  }
chris@1
   211
  
chris@1
   212
  public Article getCurrentArticle()
chris@1
   213
  {
chris@1
   214
    return this.currentArticle;
chris@1
   215
  }
chris@1
   216
  
chris@1
   217
  public Charset getCurrentCharset()
chris@1
   218
  {
chris@1
   219
    return this.charset;
chris@1
   220
  }
chris@3
   221
chris@3
   222
  /**
chris@3
   223
   * @return The currently selected communication channel (not SocketChannel)
chris@3
   224
   */
chris@3
   225
  public Channel getCurrentChannel()
chris@1
   226
  {
chris@1
   227
    return this.currentGroup;
chris@1
   228
  }
chris@1
   229
  
chris@1
   230
  public void setCurrentArticle(final Article article)
chris@1
   231
  {
chris@1
   232
    this.currentArticle = article;
chris@1
   233
  }
chris@1
   234
  
chris@3
   235
  public void setCurrentGroup(final Channel group)
chris@1
   236
  {
chris@1
   237
    this.currentGroup = group;
chris@1
   238
  }
chris@1
   239
  
chris@1
   240
  public long getLastActivity()
chris@1
   241
  {
chris@1
   242
    return this.lastActivity;
chris@1
   243
  }
chris@1
   244
  
chris@1
   245
  /**
chris@1
   246
   * Due to the readLockGate there is no need to synchronize this method.
chris@1
   247
   * @param raw
chris@1
   248
   * @throws IllegalArgumentException if raw is null.
chris@1
   249
   * @throws IllegalStateException if calling thread does not own the readLock.
chris@1
   250
   */
chris@1
   251
  void lineReceived(byte[] raw)
chris@1
   252
  {
chris@1
   253
    if(raw == null)
chris@1
   254
    {
chris@1
   255
      throw new IllegalArgumentException("raw is null");
chris@1
   256
    }
chris@1
   257
    
chris@1
   258
    if(readLock == 0 || readLock != Thread.currentThread().hashCode())
chris@1
   259
    {
chris@1
   260
      throw new IllegalStateException("readLock not properly set");
chris@1
   261
    }
chris@1
   262
chris@1
   263
    this.lastActivity = System.currentTimeMillis();
chris@1
   264
    
chris@1
   265
    String line = new String(raw, this.charset);
chris@1
   266
    
chris@1
   267
    // There might be a trailing \r, but trim() is a bad idea
chris@1
   268
    // as it removes also leading spaces from long header lines.
chris@1
   269
    if(line.endsWith("\r"))
chris@1
   270
    {
chris@1
   271
      line = line.substring(0, line.length() - 1);
chris@3
   272
      raw  = Arrays.copyOf(raw, raw.length - 1);
chris@1
   273
    }
chris@1
   274
    
chris@1
   275
    Log.msg("<< " + line, true);
chris@1
   276
    
chris@1
   277
    if(command == null)
chris@1
   278
    {
chris@1
   279
      command = parseCommandLine(line);
chris@1
   280
      assert command != null;
chris@1
   281
    }
chris@1
   282
chris@1
   283
    try
chris@1
   284
    {
chris@1
   285
      // The command object will process the line we just received
chris@3
   286
      command.processLine(this, line, raw);
chris@1
   287
    }
chris@1
   288
    catch(ClosedChannelException ex0)
chris@1
   289
    {
chris@1
   290
      try
chris@1
   291
      {
chris@1
   292
        Log.msg("Connection to " + channel.socket().getRemoteSocketAddress() 
chris@1
   293
            + " closed: " + ex0, true);
chris@1
   294
      }
chris@1
   295
      catch(Exception ex0a)
chris@1
   296
      {
chris@1
   297
        ex0a.printStackTrace();
chris@1
   298
      }
chris@1
   299
    }
chris@1
   300
    catch(Exception ex1)
chris@1
   301
    {
chris@1
   302
      try
chris@1
   303
      {
chris@1
   304
        command = null;
chris@1
   305
        ex1.printStackTrace();
chris@1
   306
        println("500 Internal server error");
chris@1
   307
      }
chris@1
   308
      catch(Exception ex2)
chris@1
   309
      {
chris@1
   310
        ex2.printStackTrace();
chris@1
   311
      }
chris@1
   312
    }
chris@1
   313
chris@1
   314
    if(command == null || command.hasFinished())
chris@1
   315
    {
chris@1
   316
      command = null;
chris@1
   317
      charset = Charset.forName("UTF-8"); // Reset to default
chris@1
   318
    }
chris@1
   319
  }
chris@1
   320
  
chris@1
   321
  /**
chris@3
   322
   * This method determines the fitting command processing class.
chris@1
   323
   * @param line
chris@1
   324
   * @return
chris@1
   325
   */
chris@3
   326
  private Command parseCommandLine(String line)
chris@1
   327
  {
chris@3
   328
    String cmdStr = line.split(" ")[0];
chris@3
   329
    return CommandSelector.getInstance().get(cmdStr);
chris@1
   330
  }
chris@1
   331
  
chris@1
   332
  /**
chris@1
   333
   * Puts the given line into the output buffer, adds a newline character
chris@1
   334
   * and returns. The method returns immediately and does not block until
chris@1
   335
   * the line was sent. If line is longer than 510 octets it is split up in
chris@1
   336
   * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE).
chris@1
   337
   * @param line
chris@1
   338
   */
chris@1
   339
  public void println(final CharSequence line, final Charset charset)
chris@1
   340
    throws IOException
chris@1
   341
  {    
chris@1
   342
    writeToChannel(CharBuffer.wrap(line), charset, line);
chris@1
   343
    writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
chris@1
   344
  }
chris@3
   345
chris@3
   346
  /**
chris@3
   347
   * Writes the given raw lines to the output buffers and finishes with
chris@3
   348
   * a newline character (\r\n).
chris@3
   349
   * @param rawLines
chris@3
   350
   */
chris@3
   351
  public void println(final byte[] rawLines)
chris@3
   352
    throws IOException
chris@3
   353
  {
chris@3
   354
    this.lineBuffers.addOutputBuffer(ByteBuffer.wrap(rawLines));
chris@3
   355
    writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
chris@3
   356
  }
chris@1
   357
  
chris@1
   358
  /**
chris@1
   359
   * Encodes the given CharBuffer using the given Charset to a bunch of
chris@1
   360
   * ByteBuffers (each 512 bytes large) and enqueues them for writing at the
chris@1
   361
   * connected SocketChannel.
chris@1
   362
   * @throws java.io.IOException
chris@1
   363
   */
chris@1
   364
  private void writeToChannel(CharBuffer characters, final Charset charset,
chris@1
   365
    CharSequence debugLine)
chris@1
   366
    throws IOException
chris@1
   367
  {
chris@1
   368
    if(!charset.canEncode())
chris@1
   369
    {
chris@1
   370
      Log.msg("FATAL: Charset " + charset + " cannot encode!", false);
chris@1
   371
      return;
chris@1
   372
    }
chris@1
   373
    
chris@1
   374
    // Write characters to output buffers
chris@1
   375
    LineEncoder lenc = new LineEncoder(characters, charset);
chris@1
   376
    lenc.encode(lineBuffers);
chris@1
   377
    
chris@3
   378
    enableWriteEvents(debugLine);
chris@3
   379
  }
chris@3
   380
chris@3
   381
  private void enableWriteEvents(CharSequence debugLine)
chris@3
   382
  {
chris@1
   383
    // Enable OP_WRITE events so that the buffers are processed
chris@1
   384
    try
chris@1
   385
    {
chris@1
   386
      this.writeSelKey.interestOps(SelectionKey.OP_WRITE);
chris@1
   387
      ChannelWriter.getInstance().getSelector().wakeup();
chris@1
   388
    }
chris@1
   389
    catch (Exception ex) // CancelledKeyException and ChannelCloseException
chris@1
   390
    {
chris@1
   391
      Log.msg("NNTPConnection.writeToChannel(): " + ex, false);
chris@1
   392
      return;
chris@1
   393
    }
chris@1
   394
chris@1
   395
    // Update last activity timestamp
chris@1
   396
    this.lastActivity = System.currentTimeMillis();
chris@1
   397
    if(debugLine != null)
chris@1
   398
    {
chris@1
   399
      Log.msg(">> " + debugLine, true);
chris@1
   400
    }
chris@1
   401
  }
chris@1
   402
  
chris@1
   403
  public void println(final CharSequence line)
chris@1
   404
    throws IOException
chris@1
   405
  {
chris@1
   406
    println(line, charset);
chris@1
   407
  }
chris@1
   408
  
chris@1
   409
  public void print(final String line)
chris@1
   410
    throws IOException
chris@1
   411
  {
chris@1
   412
    writeToChannel(CharBuffer.wrap(line), charset, line);
chris@1
   413
  }
chris@1
   414
  
chris@1
   415
  public void setCurrentCharset(final Charset charset)
chris@1
   416
  {
chris@1
   417
    this.charset = charset;
chris@1
   418
  }
chris@1
   419
  
chris@1
   420
}