chris@1: /*
chris@1:  *   SONEWS News Server
chris@1:  *   see AUTHORS for the list of contributors
chris@1:  *
chris@1:  *   This program is free software: you can redistribute it and/or modify
chris@1:  *   it under the terms of the GNU General Public License as published by
chris@1:  *   the Free Software Foundation, either version 3 of the License, or
chris@1:  *   (at your option) any later version.
chris@1:  *
chris@1:  *   This program is distributed in the hope that it will be useful,
chris@1:  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1:  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1:  *   GNU General Public License for more details.
chris@1:  *
chris@1:  *   You should have received a copy of the GNU General Public License
chris@1:  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1:  */
chris@1: 
chris@1: package org.sonews.daemon;
chris@1: 
chris@1: import java.io.IOException;
chris@1: import java.net.InetSocketAddress;
chris@3: import java.net.SocketException;
chris@1: import java.nio.ByteBuffer;
chris@1: import java.nio.CharBuffer;
chris@1: import java.nio.channels.ClosedChannelException;
chris@1: import java.nio.channels.SelectionKey;
chris@1: import java.nio.channels.SocketChannel;
chris@1: import java.nio.charset.Charset;
chris@3: import java.util.Arrays;
chris@1: import java.util.Timer;
chris@1: import java.util.TimerTask;
chris@3: import org.sonews.daemon.command.Command;
chris@3: import org.sonews.storage.Article;
chris@3: import org.sonews.storage.Channel;
cli@30: import org.sonews.storage.StorageBackendException;
cli@25: import org.sonews.util.Log;
chris@1: import org.sonews.util.Stats;
chris@1: 
chris@1: /**
chris@1:  * For every SocketChannel (so TCP/IP connection) there is an instance of
chris@1:  * this class.
chris@1:  * @author Christian Lins
chris@1:  * @since sonews/0.5.0
chris@1:  */
chris@1: public final class NNTPConnection
chris@1: {
chris@1: 
cli@37: 	public static final String NEWLINE = "\r\n";    // RFC defines this as newline
cli@37: 	public static final String MESSAGE_ID_PATTERN = "<[^>]+>";
cli@37: 	private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon
cli@37: 	/** SocketChannel is generally thread-safe */
cli@37: 	private SocketChannel channel = null;
cli@37: 	private Charset charset = Charset.forName("UTF-8");
cli@37: 	private Command command = null;
cli@37: 	private Article currentArticle = null;
cli@37: 	private Channel currentGroup = null;
cli@37: 	private volatile long lastActivity = System.currentTimeMillis();
cli@37: 	private ChannelLineBuffers lineBuffers = new ChannelLineBuffers();
cli@37: 	private int readLock = 0;
cli@37: 	private final Object readLockGate = new Object();
cli@37: 	private SelectionKey writeSelKey = null;
chris@1: 
cli@37: 	public NNTPConnection(final SocketChannel channel)
cli@37: 		throws IOException
cli@37: 	{
cli@37: 		if (channel == null) {
cli@37: 			throw new IllegalArgumentException("channel is null");
cli@37: 		}
chris@1: 
cli@37: 		this.channel = channel;
cli@37: 		Stats.getInstance().clientConnect();
cli@37: 	}
chris@1: 
cli@37: 	/**
cli@37: 	 * Tries to get the read lock for this NNTPConnection. This method is Thread-
cli@37: 	 * safe and returns true of the read lock was successfully set. If the lock
cli@37: 	 * is still hold by another Thread the method returns false.
cli@37: 	 */
cli@37: 	boolean tryReadLock()
cli@37: 	{
cli@37: 		// As synchronizing simple types may cause deadlocks,
cli@37: 		// we use a gate object.
cli@37: 		synchronized (readLockGate) {
cli@37: 			if (readLock != 0) {
cli@37: 				return false;
cli@37: 			} else {
cli@37: 				readLock = Thread.currentThread().hashCode();
cli@37: 				return true;
cli@37: 			}
cli@37: 		}
cli@37: 	}
chris@3: 
cli@37: 	/**
cli@37: 	 * Releases the read lock in a Thread-safe way.
cli@37: 	 * @throws IllegalMonitorStateException if a Thread not holding the lock
cli@37: 	 * tries to release it.
cli@37: 	 */
cli@37: 	void unlockReadLock()
cli@37: 	{
cli@37: 		synchronized (readLockGate) {
cli@37: 			if (readLock == Thread.currentThread().hashCode()) {
cli@37: 				readLock = 0;
cli@37: 			} else {
cli@37: 				throw new IllegalMonitorStateException();
cli@37: 			}
cli@37: 		}
cli@37: 	}
chris@1: 
cli@37: 	/**
cli@37: 	 * @return Current input buffer of this NNTPConnection instance.
cli@37: 	 */
cli@37: 	public ByteBuffer getInputBuffer()
cli@37: 	{
cli@37: 		return this.lineBuffers.getInputBuffer();
cli@37: 	}
chris@1: 
cli@37: 	/**
cli@37: 	 * @return Output buffer of this NNTPConnection which has at least one byte
cli@37: 	 * free storage.
cli@37: 	 */
cli@37: 	public ByteBuffer getOutputBuffer()
cli@37: 	{
cli@37: 		return this.lineBuffers.getOutputBuffer();
cli@37: 	}
cli@30: 
cli@37: 	/**
cli@37: 	 * @return ChannelLineBuffers instance associated with this NNTPConnection.
cli@37: 	 */
cli@37: 	public ChannelLineBuffers getBuffers()
cli@37: 	{
cli@37: 		return this.lineBuffers;
cli@37: 	}
chris@1: 
cli@37: 	/**
cli@37: 	 * @return true if this connection comes from a local remote address.
cli@37: 	 */
cli@37: 	public boolean isLocalConnection()
cli@37: 	{
cli@37: 		return ((InetSocketAddress) this.channel.socket().getRemoteSocketAddress()).getHostName().equalsIgnoreCase("localhost");
cli@37: 	}
chris@3: 
cli@37: 	void setWriteSelectionKey(SelectionKey selKey)
cli@37: 	{
cli@37: 		this.writeSelKey = selKey;
cli@37: 	}
chris@3: 
cli@37: 	public void shutdownInput()
cli@37: 	{
cli@37: 		try {
cli@37: 			// Closes the input line of the channel's socket, so no new data
cli@37: 			// will be received and a timeout can be triggered.
cli@37: 			this.channel.socket().shutdownInput();
cli@37: 		} catch (IOException ex) {
cli@37: 			Log.get().warning("Exception in NNTPConnection.shutdownInput(): " + ex);
cli@37: 		}
cli@37: 	}
chris@1: 
cli@37: 	public void shutdownOutput()
cli@37: 	{
cli@37: 		cancelTimer.schedule(new TimerTask()
cli@37: 		{
cli@25: 
cli@37: 			@Override
cli@37: 			public void run()
cli@37: 			{
cli@37: 				try {
cli@37: 					// Closes the output line of the channel's socket.
cli@37: 					channel.socket().shutdownOutput();
cli@37: 					channel.close();
cli@37: 				} catch (SocketException ex) {
cli@37: 					// Socket was already disconnected
cli@37: 					Log.get().info("NNTPConnection.shutdownOutput(): " + ex);
cli@37: 				} catch (Exception ex) {
cli@37: 					Log.get().warning("NNTPConnection.shutdownOutput(): " + ex);
cli@37: 				}
cli@37: 			}
cli@37: 		}, 3000);
cli@37: 	}
cli@37: 
cli@37: 	public SocketChannel getSocketChannel()
cli@37: 	{
cli@37: 		return this.channel;
cli@37: 	}
cli@37: 
cli@37: 	public Article getCurrentArticle()
cli@37: 	{
cli@37: 		return this.currentArticle;
cli@37: 	}
cli@37: 
cli@37: 	public Charset getCurrentCharset()
cli@37: 	{
cli@37: 		return this.charset;
cli@37: 	}
cli@37: 
cli@37: 	/**
cli@37: 	 * @return The currently selected communication channel (not SocketChannel)
cli@37: 	 */
cli@37: 	public Channel getCurrentChannel()
cli@37: 	{
cli@37: 		return this.currentGroup;
cli@37: 	}
cli@37: 
cli@37: 	public void setCurrentArticle(final Article article)
cli@37: 	{
cli@37: 		this.currentArticle = article;
cli@37: 	}
cli@37: 
cli@37: 	public void setCurrentGroup(final Channel group)
cli@37: 	{
cli@37: 		this.currentGroup = group;
cli@37: 	}
cli@37: 
cli@37: 	public long getLastActivity()
cli@37: 	{
cli@37: 		return this.lastActivity;
cli@37: 	}
cli@37: 
cli@37: 	/**
cli@37: 	 * Due to the readLockGate there is no need to synchronize this method.
cli@37: 	 * @param raw
cli@37: 	 * @throws IllegalArgumentException if raw is null.
cli@37: 	 * @throws IllegalStateException if calling thread does not own the readLock.
cli@37: 	 */
cli@37: 	void lineReceived(byte[] raw)
cli@37: 	{
cli@37: 		if (raw == null) {
cli@37: 			throw new IllegalArgumentException("raw is null");
cli@37: 		}
cli@37: 
cli@37: 		if (readLock == 0 || readLock != Thread.currentThread().hashCode()) {
cli@37: 			throw new IllegalStateException("readLock not properly set");
cli@37: 		}
cli@37: 
cli@37: 		this.lastActivity = System.currentTimeMillis();
cli@37: 
cli@37: 		String line = new String(raw, this.charset);
cli@37: 
cli@37: 		// There might be a trailing \r, but trim() is a bad idea
cli@37: 		// as it removes also leading spaces from long header lines.
cli@37: 		if (line.endsWith("\r")) {
cli@37: 			line = line.substring(0, line.length() - 1);
cli@37: 			raw = Arrays.copyOf(raw, raw.length - 1);
cli@37: 		}
cli@37: 
cli@37: 		Log.get().fine("<< " + line);
cli@37: 
cli@37: 		if (command == null) {
cli@37: 			command = parseCommandLine(line);
cli@37: 			assert command != null;
cli@37: 		}
cli@37: 
cli@37: 		try {
cli@37: 			// The command object will process the line we just received
cli@37: 			try {
cli@37: 				command.processLine(this, line, raw);
cli@37: 			} catch (StorageBackendException ex) {
cli@37: 				Log.get().info("Retry command processing after StorageBackendException");
cli@37: 
cli@37: 				// Try it a second time, so that the backend has time to recover
cli@37: 				command.processLine(this, line, raw);
cli@37: 			}
cli@37: 		} catch (ClosedChannelException ex0) {
cli@37: 			try {
cli@37: 				Log.get().info("Connection to " + channel.socket().getRemoteSocketAddress()
cli@37: 					+ " closed: " + ex0);
cli@37: 			} catch (Exception ex0a) {
cli@37: 				ex0a.printStackTrace();
cli@37: 			}
cli@37: 		} catch (Exception ex1) // This will catch a second StorageBackendException
cli@37: 		{
cli@37: 			try {
cli@37: 				command = null;
cli@37: 				ex1.printStackTrace();
cli@37: 				println("500 Internal server error");
cli@37: 			} catch (Exception ex2) {
cli@37: 				ex2.printStackTrace();
cli@37: 			}
cli@37: 		}
cli@37: 
cli@37: 		if (command == null || command.hasFinished()) {
cli@37: 			command = null;
cli@37: 			charset = Charset.forName("UTF-8"); // Reset to default
cli@37: 		}
cli@37: 	}
cli@37: 
cli@37: 	/**
cli@37: 	 * This method determines the fitting command processing class.
cli@37: 	 * @param line
cli@37: 	 * @return
cli@37: 	 */
cli@37: 	private Command parseCommandLine(String line)
cli@37: 	{
cli@37: 		String cmdStr = line.split(" ")[0];
cli@37: 		return CommandSelector.getInstance().get(cmdStr);
cli@37: 	}
cli@37: 
cli@37: 	/**
cli@37: 	 * Puts the given line into the output buffer, adds a newline character
cli@37: 	 * and returns. The method returns immediately and does not block until
cli@37: 	 * the line was sent. If line is longer than 510 octets it is split up in
cli@37: 	 * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE).
cli@37: 	 * @param line
cli@37: 	 */
cli@37: 	public void println(final CharSequence line, final Charset charset)
cli@37: 		throws IOException
cli@37: 	{
cli@37: 		writeToChannel(CharBuffer.wrap(line), charset, line);
cli@37: 		writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
cli@37: 	}
cli@37: 
cli@37: 	/**
cli@37: 	 * Writes the given raw lines to the output buffers and finishes with
cli@37: 	 * a newline character (\r\n).
cli@37: 	 * @param rawLines
cli@37: 	 */
cli@37: 	public void println(final byte[] rawLines)
cli@37: 		throws IOException
cli@37: 	{
cli@37: 		this.lineBuffers.addOutputBuffer(ByteBuffer.wrap(rawLines));
cli@37: 		writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
cli@37: 	}
cli@37: 
cli@37: 	/**
cli@37: 	 * Encodes the given CharBuffer using the given Charset to a bunch of
cli@37: 	 * ByteBuffers (each 512 bytes large) and enqueues them for writing at the
cli@37: 	 * connected SocketChannel.
cli@37: 	 * @throws java.io.IOException
cli@37: 	 */
cli@37: 	private void writeToChannel(CharBuffer characters, final Charset charset,
cli@37: 		CharSequence debugLine)
cli@37: 		throws IOException
cli@37: 	{
cli@37: 		if (!charset.canEncode()) {
cli@37: 			Log.get().severe("FATAL: Charset " + charset + " cannot encode!");
cli@37: 			return;
cli@37: 		}
cli@37: 
cli@37: 		// Write characters to output buffers
cli@37: 		LineEncoder lenc = new LineEncoder(characters, charset);
cli@37: 		lenc.encode(lineBuffers);
cli@37: 
cli@37: 		enableWriteEvents(debugLine);
cli@37: 	}
cli@37: 
cli@37: 	private void enableWriteEvents(CharSequence debugLine)
cli@37: 	{
cli@37: 		// Enable OP_WRITE events so that the buffers are processed
cli@37: 		try {
cli@37: 			this.writeSelKey.interestOps(SelectionKey.OP_WRITE);
cli@37: 			ChannelWriter.getInstance().getSelector().wakeup();
cli@37: 		} catch (Exception ex) // CancelledKeyException and ChannelCloseException
cli@37: 		{
cli@37: 			Log.get().warning("NNTPConnection.writeToChannel(): " + ex);
cli@37: 			return;
cli@37: 		}
cli@37: 
cli@37: 		// Update last activity timestamp
cli@37: 		this.lastActivity = System.currentTimeMillis();
cli@37: 		if (debugLine != null) {
cli@37: 			Log.get().fine(">> " + debugLine);
cli@37: 		}
cli@37: 	}
cli@37: 
cli@37: 	public void println(final CharSequence line)
cli@37: 		throws IOException
cli@37: 	{
cli@37: 		println(line, charset);
cli@37: 	}
cli@37: 
cli@37: 	public void print(final String line)
cli@37: 		throws IOException
cli@37: 	{
cli@37: 		writeToChannel(CharBuffer.wrap(line), charset, line);
cli@37: 	}
cli@37: 
cli@37: 	public void setCurrentCharset(final Charset charset)
cli@37: 	{
cli@37: 		this.charset = charset;
cli@37: 	}
cli@37: 
cli@37: 	void setLastActivity(long timestamp)
cli@37: 	{
cli@37: 		this.lastActivity = timestamp;
cli@37: 	}
chris@1: }