src/org/sonews/daemon/NNTPConnection.java
author cli
Sun Aug 29 23:23:15 2010 +0200 (2010-08-29)
changeset 38 fdfc7225f799
parent 35 ed84c8bdd87b
child 48 b78e77619152
permissions -rw-r--r--
Implement JDBCDatabase.update(Article) method to fix issue #7.
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.daemon;
    20 
    21 import java.io.IOException;
    22 import java.net.InetSocketAddress;
    23 import java.net.SocketException;
    24 import java.nio.ByteBuffer;
    25 import java.nio.CharBuffer;
    26 import java.nio.channels.ClosedChannelException;
    27 import java.nio.channels.SelectionKey;
    28 import java.nio.channels.SocketChannel;
    29 import java.nio.charset.Charset;
    30 import java.util.Arrays;
    31 import java.util.Timer;
    32 import java.util.TimerTask;
    33 import org.sonews.daemon.command.Command;
    34 import org.sonews.storage.Article;
    35 import org.sonews.storage.Channel;
    36 import org.sonews.storage.StorageBackendException;
    37 import org.sonews.util.Log;
    38 import org.sonews.util.Stats;
    39 
    40 /**
    41  * For every SocketChannel (so TCP/IP connection) there is an instance of
    42  * this class.
    43  * @author Christian Lins
    44  * @since sonews/0.5.0
    45  */
    46 public final class NNTPConnection
    47 {
    48 
    49 	public static final String NEWLINE = "\r\n";    // RFC defines this as newline
    50 	public static final String MESSAGE_ID_PATTERN = "<[^>]+>";
    51 	private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon
    52 	/** SocketChannel is generally thread-safe */
    53 	private SocketChannel channel = null;
    54 	private Charset charset = Charset.forName("UTF-8");
    55 	private Command command = null;
    56 	private Article currentArticle = null;
    57 	private Channel currentGroup = null;
    58 	private volatile long lastActivity = System.currentTimeMillis();
    59 	private ChannelLineBuffers lineBuffers = new ChannelLineBuffers();
    60 	private int readLock = 0;
    61 	private final Object readLockGate = new Object();
    62 	private SelectionKey writeSelKey = null;
    63 
    64 	public NNTPConnection(final SocketChannel channel)
    65 		throws IOException
    66 	{
    67 		if (channel == null) {
    68 			throw new IllegalArgumentException("channel is null");
    69 		}
    70 
    71 		this.channel = channel;
    72 		Stats.getInstance().clientConnect();
    73 	}
    74 
    75 	/**
    76 	 * Tries to get the read lock for this NNTPConnection. This method is Thread-
    77 	 * safe and returns true of the read lock was successfully set. If the lock
    78 	 * is still hold by another Thread the method returns false.
    79 	 */
    80 	boolean tryReadLock()
    81 	{
    82 		// As synchronizing simple types may cause deadlocks,
    83 		// we use a gate object.
    84 		synchronized (readLockGate) {
    85 			if (readLock != 0) {
    86 				return false;
    87 			} else {
    88 				readLock = Thread.currentThread().hashCode();
    89 				return true;
    90 			}
    91 		}
    92 	}
    93 
    94 	/**
    95 	 * Releases the read lock in a Thread-safe way.
    96 	 * @throws IllegalMonitorStateException if a Thread not holding the lock
    97 	 * tries to release it.
    98 	 */
    99 	void unlockReadLock()
   100 	{
   101 		synchronized (readLockGate) {
   102 			if (readLock == Thread.currentThread().hashCode()) {
   103 				readLock = 0;
   104 			} else {
   105 				throw new IllegalMonitorStateException();
   106 			}
   107 		}
   108 	}
   109 
   110 	/**
   111 	 * @return Current input buffer of this NNTPConnection instance.
   112 	 */
   113 	public ByteBuffer getInputBuffer()
   114 	{
   115 		return this.lineBuffers.getInputBuffer();
   116 	}
   117 
   118 	/**
   119 	 * @return Output buffer of this NNTPConnection which has at least one byte
   120 	 * free storage.
   121 	 */
   122 	public ByteBuffer getOutputBuffer()
   123 	{
   124 		return this.lineBuffers.getOutputBuffer();
   125 	}
   126 
   127 	/**
   128 	 * @return ChannelLineBuffers instance associated with this NNTPConnection.
   129 	 */
   130 	public ChannelLineBuffers getBuffers()
   131 	{
   132 		return this.lineBuffers;
   133 	}
   134 
   135 	/**
   136 	 * @return true if this connection comes from a local remote address.
   137 	 */
   138 	public boolean isLocalConnection()
   139 	{
   140 		return ((InetSocketAddress) this.channel.socket().getRemoteSocketAddress()).getHostName().equalsIgnoreCase("localhost");
   141 	}
   142 
   143 	void setWriteSelectionKey(SelectionKey selKey)
   144 	{
   145 		this.writeSelKey = selKey;
   146 	}
   147 
   148 	public void shutdownInput()
   149 	{
   150 		try {
   151 			// Closes the input line of the channel's socket, so no new data
   152 			// will be received and a timeout can be triggered.
   153 			this.channel.socket().shutdownInput();
   154 		} catch (IOException ex) {
   155 			Log.get().warning("Exception in NNTPConnection.shutdownInput(): " + ex);
   156 		}
   157 	}
   158 
   159 	public void shutdownOutput()
   160 	{
   161 		cancelTimer.schedule(new TimerTask()
   162 		{
   163 
   164 			@Override
   165 			public void run()
   166 			{
   167 				try {
   168 					// Closes the output line of the channel's socket.
   169 					channel.socket().shutdownOutput();
   170 					channel.close();
   171 				} catch (SocketException ex) {
   172 					// Socket was already disconnected
   173 					Log.get().info("NNTPConnection.shutdownOutput(): " + ex);
   174 				} catch (Exception ex) {
   175 					Log.get().warning("NNTPConnection.shutdownOutput(): " + ex);
   176 				}
   177 			}
   178 		}, 3000);
   179 	}
   180 
   181 	public SocketChannel getSocketChannel()
   182 	{
   183 		return this.channel;
   184 	}
   185 
   186 	public Article getCurrentArticle()
   187 	{
   188 		return this.currentArticle;
   189 	}
   190 
   191 	public Charset getCurrentCharset()
   192 	{
   193 		return this.charset;
   194 	}
   195 
   196 	/**
   197 	 * @return The currently selected communication channel (not SocketChannel)
   198 	 */
   199 	public Channel getCurrentChannel()
   200 	{
   201 		return this.currentGroup;
   202 	}
   203 
   204 	public void setCurrentArticle(final Article article)
   205 	{
   206 		this.currentArticle = article;
   207 	}
   208 
   209 	public void setCurrentGroup(final Channel group)
   210 	{
   211 		this.currentGroup = group;
   212 	}
   213 
   214 	public long getLastActivity()
   215 	{
   216 		return this.lastActivity;
   217 	}
   218 
   219 	/**
   220 	 * Due to the readLockGate there is no need to synchronize this method.
   221 	 * @param raw
   222 	 * @throws IllegalArgumentException if raw is null.
   223 	 * @throws IllegalStateException if calling thread does not own the readLock.
   224 	 */
   225 	void lineReceived(byte[] raw)
   226 	{
   227 		if (raw == null) {
   228 			throw new IllegalArgumentException("raw is null");
   229 		}
   230 
   231 		if (readLock == 0 || readLock != Thread.currentThread().hashCode()) {
   232 			throw new IllegalStateException("readLock not properly set");
   233 		}
   234 
   235 		this.lastActivity = System.currentTimeMillis();
   236 
   237 		String line = new String(raw, this.charset);
   238 
   239 		// There might be a trailing \r, but trim() is a bad idea
   240 		// as it removes also leading spaces from long header lines.
   241 		if (line.endsWith("\r")) {
   242 			line = line.substring(0, line.length() - 1);
   243 			raw = Arrays.copyOf(raw, raw.length - 1);
   244 		}
   245 
   246 		Log.get().fine("<< " + line);
   247 
   248 		if (command == null) {
   249 			command = parseCommandLine(line);
   250 			assert command != null;
   251 		}
   252 
   253 		try {
   254 			// The command object will process the line we just received
   255 			try {
   256 				command.processLine(this, line, raw);
   257 			} catch (StorageBackendException ex) {
   258 				Log.get().info("Retry command processing after StorageBackendException");
   259 
   260 				// Try it a second time, so that the backend has time to recover
   261 				command.processLine(this, line, raw);
   262 			}
   263 		} catch (ClosedChannelException ex0) {
   264 			try {
   265 				Log.get().info("Connection to " + channel.socket().getRemoteSocketAddress()
   266 					+ " closed: " + ex0);
   267 			} catch (Exception ex0a) {
   268 				ex0a.printStackTrace();
   269 			}
   270 		} catch (Exception ex1) // This will catch a second StorageBackendException
   271 		{
   272 			try {
   273 				command = null;
   274 				ex1.printStackTrace();
   275 				println("500 Internal server error");
   276 			} catch (Exception ex2) {
   277 				ex2.printStackTrace();
   278 			}
   279 		}
   280 
   281 		if (command == null || command.hasFinished()) {
   282 			command = null;
   283 			charset = Charset.forName("UTF-8"); // Reset to default
   284 		}
   285 	}
   286 
   287 	/**
   288 	 * This method determines the fitting command processing class.
   289 	 * @param line
   290 	 * @return
   291 	 */
   292 	private Command parseCommandLine(String line)
   293 	{
   294 		String cmdStr = line.split(" ")[0];
   295 		return CommandSelector.getInstance().get(cmdStr);
   296 	}
   297 
   298 	/**
   299 	 * Puts the given line into the output buffer, adds a newline character
   300 	 * and returns. The method returns immediately and does not block until
   301 	 * the line was sent. If line is longer than 510 octets it is split up in
   302 	 * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE).
   303 	 * @param line
   304 	 */
   305 	public void println(final CharSequence line, final Charset charset)
   306 		throws IOException
   307 	{
   308 		writeToChannel(CharBuffer.wrap(line), charset, line);
   309 		writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
   310 	}
   311 
   312 	/**
   313 	 * Writes the given raw lines to the output buffers and finishes with
   314 	 * a newline character (\r\n).
   315 	 * @param rawLines
   316 	 */
   317 	public void println(final byte[] rawLines)
   318 		throws IOException
   319 	{
   320 		this.lineBuffers.addOutputBuffer(ByteBuffer.wrap(rawLines));
   321 		writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
   322 	}
   323 
   324 	/**
   325 	 * Encodes the given CharBuffer using the given Charset to a bunch of
   326 	 * ByteBuffers (each 512 bytes large) and enqueues them for writing at the
   327 	 * connected SocketChannel.
   328 	 * @throws java.io.IOException
   329 	 */
   330 	private void writeToChannel(CharBuffer characters, final Charset charset,
   331 		CharSequence debugLine)
   332 		throws IOException
   333 	{
   334 		if (!charset.canEncode()) {
   335 			Log.get().severe("FATAL: Charset " + charset + " cannot encode!");
   336 			return;
   337 		}
   338 
   339 		// Write characters to output buffers
   340 		LineEncoder lenc = new LineEncoder(characters, charset);
   341 		lenc.encode(lineBuffers);
   342 
   343 		enableWriteEvents(debugLine);
   344 	}
   345 
   346 	private void enableWriteEvents(CharSequence debugLine)
   347 	{
   348 		// Enable OP_WRITE events so that the buffers are processed
   349 		try {
   350 			this.writeSelKey.interestOps(SelectionKey.OP_WRITE);
   351 			ChannelWriter.getInstance().getSelector().wakeup();
   352 		} catch (Exception ex) // CancelledKeyException and ChannelCloseException
   353 		{
   354 			Log.get().warning("NNTPConnection.writeToChannel(): " + ex);
   355 			return;
   356 		}
   357 
   358 		// Update last activity timestamp
   359 		this.lastActivity = System.currentTimeMillis();
   360 		if (debugLine != null) {
   361 			Log.get().fine(">> " + debugLine);
   362 		}
   363 	}
   364 
   365 	public void println(final CharSequence line)
   366 		throws IOException
   367 	{
   368 		println(line, charset);
   369 	}
   370 
   371 	public void print(final String line)
   372 		throws IOException
   373 	{
   374 		writeToChannel(CharBuffer.wrap(line), charset, line);
   375 	}
   376 
   377 	public void setCurrentCharset(final Charset charset)
   378 	{
   379 		this.charset = charset;
   380 	}
   381 
   382 	void setLastActivity(long timestamp)
   383 	{
   384 		this.lastActivity = timestamp;
   385 	}
   386 }