src/org/sonews/daemon/NNTPConnection.java
author František Kučera <franta-hg@frantovo.cz>
Tue Oct 25 15:06:59 2011 +0200 (2011-10-25)
changeset 110 c81406884e16
parent 101 d54786065fa3
child 113 a059aecd1794
permissions -rwxr-xr-x
Drupal: text/plain – podpora značky <br/>
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 package org.sonews.daemon;
    19 
    20 import java.io.ByteArrayOutputStream;
    21 import java.io.IOException;
    22 import java.net.InetSocketAddress;
    23 import java.net.SocketException;
    24 import java.nio.ByteBuffer;
    25 import java.nio.CharBuffer;
    26 import java.nio.channels.ClosedChannelException;
    27 import java.nio.channels.SelectionKey;
    28 import java.nio.channels.SocketChannel;
    29 import java.nio.charset.Charset;
    30 import java.util.Arrays;
    31 import java.util.Timer;
    32 import java.util.TimerTask;
    33 import java.util.logging.Level;
    34 import org.sonews.daemon.command.Command;
    35 import org.sonews.storage.Article;
    36 import org.sonews.storage.Group;
    37 import org.sonews.storage.StorageBackendException;
    38 import org.sonews.util.Log;
    39 import org.sonews.util.Stats;
    40 import org.sonews.util.io.CRLFOutputStream;
    41 import org.sonews.util.io.SMTPOutputStream;
    42 
    43 /**
    44  * For every SocketChannel (so TCP/IP connection) there is an instance of
    45  * this class.
    46  * @author Christian Lins
    47  * @since sonews/0.5.0
    48  */
    49 public final class NNTPConnection {
    50 
    51 	public static final String NEWLINE = "\r\n";    // RFC defines this as newline
    52 	public static final String MESSAGE_ID_PATTERN = "<[^>]+>";
    53 	private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon
    54 	/** SocketChannel is generally thread-safe */
    55 	private SocketChannel channel = null;
    56 	private Charset charset = Charset.forName("UTF-8");
    57 	private Command command = null;
    58 	private Article currentArticle = null;
    59 	private Group currentGroup = null;
    60 	private volatile long lastActivity = System.currentTimeMillis();
    61 	private ChannelLineBuffers lineBuffers = new ChannelLineBuffers();
    62 	private int readLock = 0;
    63 	private final Object readLockGate = new Object();
    64 	private SelectionKey writeSelKey = null;
    65 	
    66 	private String username;
    67 	private boolean userAuthenticated = false;
    68 
    69 	public NNTPConnection(final SocketChannel channel)
    70 			throws IOException {
    71 		if (channel == null) {
    72 			throw new IllegalArgumentException("channel is null");
    73 		}
    74 
    75 		this.channel = channel;
    76 		Stats.getInstance().clientConnect();
    77 	}
    78 
    79 	/**
    80 	 * Tries to get the read lock for this NNTPConnection. This method is Thread-
    81 	 * safe and returns true of the read lock was successfully set. If the lock
    82 	 * is still hold by another Thread the method returns false.
    83 	 */
    84 	boolean tryReadLock() {
    85 		// As synchronizing simple types may cause deadlocks,
    86 		// we use a gate object.
    87 		synchronized (readLockGate) {
    88 			if (readLock != 0) {
    89 				return false;
    90 			} else {
    91 				readLock = Thread.currentThread().hashCode();
    92 				return true;
    93 			}
    94 		}
    95 	}
    96 
    97 	/**
    98 	 * Releases the read lock in a Thread-safe way.
    99 	 * @throws IllegalMonitorStateException if a Thread not holding the lock
   100 	 * tries to release it.
   101 	 */
   102 	void unlockReadLock() {
   103 		synchronized (readLockGate) {
   104 			if (readLock == Thread.currentThread().hashCode()) {
   105 				readLock = 0;
   106 			} else {
   107 				throw new IllegalMonitorStateException();
   108 			}
   109 		}
   110 	}
   111 
   112 	/**
   113 	 * @return Current input buffer of this NNTPConnection instance.
   114 	 */
   115 	public ByteBuffer getInputBuffer() {
   116 		return this.lineBuffers.getInputBuffer();
   117 	}
   118 
   119 	/**
   120 	 * @return Output buffer of this NNTPConnection which has at least one byte
   121 	 * free storage.
   122 	 */
   123 	public ByteBuffer getOutputBuffer() {
   124 		return this.lineBuffers.getOutputBuffer();
   125 	}
   126 
   127 	/**
   128 	 * @return ChannelLineBuffers instance associated with this NNTPConnection.
   129 	 */
   130 	public ChannelLineBuffers getBuffers() {
   131 		return this.lineBuffers;
   132 	}
   133 
   134 	/**
   135 	 * @return true if this connection comes from a local remote address.
   136 	 */
   137 	public boolean isLocalConnection() {
   138 		return ((InetSocketAddress) this.channel.socket().getRemoteSocketAddress()).getHostName().equalsIgnoreCase("localhost");
   139 	}
   140 
   141 	void setWriteSelectionKey(SelectionKey selKey) {
   142 		this.writeSelKey = selKey;
   143 	}
   144 
   145 	public void shutdownInput() {
   146 		try {
   147 			// Closes the input line of the channel's socket, so no new data
   148 			// will be received and a timeout can be triggered.
   149 			this.channel.socket().shutdownInput();
   150 		} catch (IOException ex) {
   151 			Log.get().warning("Exception in NNTPConnection.shutdownInput(): " + ex);
   152 		}
   153 	}
   154 
   155 	public void shutdownOutput() {
   156 		cancelTimer.schedule(new TimerTask() {
   157 			@Override
   158 			public void run() {
   159 				try {
   160 					// Closes the output line of the channel's socket.
   161 					channel.socket().shutdownOutput();
   162 					channel.close();
   163 				} catch (SocketException ex) {
   164 					// Socket was already disconnected
   165 					Log.get().info("NNTPConnection.shutdownOutput(): " + ex);
   166 				} catch (Exception ex) {
   167 					Log.get().warning("NNTPConnection.shutdownOutput(): " + ex);
   168 				}
   169 			}
   170 		}, 3000);
   171 	}
   172 
   173 	public SocketChannel getSocketChannel() {
   174 		return this.channel;
   175 	}
   176 
   177 	public Article getCurrentArticle() {
   178 		return this.currentArticle;
   179 	}
   180 
   181 	public Charset getCurrentCharset() {
   182 		return this.charset;
   183 	}
   184 
   185 	/**
   186 	 * @return The currently selected communication channel (not SocketChannel)
   187 	 */
   188 	public Group getCurrentChannel() {
   189 		return this.currentGroup;
   190 	}
   191 
   192 	public void setCurrentArticle(final Article article) {
   193 		this.currentArticle = article;
   194 	}
   195 
   196 	public void setCurrentGroup(final Group group) {
   197 		this.currentGroup = group;
   198 	}
   199 
   200 	public long getLastActivity() {
   201 		return this.lastActivity;
   202 	}
   203 
   204 	/**
   205 	 * Due to the readLockGate there is no need to synchronize this method.
   206 	 * @param raw
   207 	 * @throws IllegalArgumentException if raw is null.
   208 	 * @throws IllegalStateException if calling thread does not own the readLock.
   209 	 */
   210 	void lineReceived(byte[] raw) {
   211 		if (raw == null) {
   212 			throw new IllegalArgumentException("raw is null");
   213 		}
   214 
   215 		if (readLock == 0 || readLock != Thread.currentThread().hashCode()) {
   216 			throw new IllegalStateException("readLock not properly set");
   217 		}
   218 
   219 		this.lastActivity = System.currentTimeMillis();
   220 
   221 		String line = new String(raw, this.charset);
   222 
   223 		// There might be a trailing \r, but trim() is a bad idea
   224 		// as it removes also leading spaces from long header lines.
   225 		if (line.endsWith("\r")) {
   226 			line = line.substring(0, line.length() - 1);
   227 			raw = Arrays.copyOf(raw, raw.length - 1);
   228 		}
   229 
   230 		Log.get().fine("<< " + line);
   231 
   232 		if (command == null) {
   233 			command = parseCommandLine(line);
   234 			assert command != null;
   235 		}
   236 
   237 		try {
   238 			// The command object will process the line we just received
   239 			try {
   240 				command.processLine(this, line, raw);
   241 			} catch (StorageBackendException ex) {
   242 				Log.get().info("Retry command processing after StorageBackendException");
   243 
   244 				// Try it a second time, so that the backend has time to recover
   245 				command.processLine(this, line, raw);
   246 			}
   247 		} catch (ClosedChannelException ex0) {
   248 			try {
   249 				StringBuilder strBuf = new StringBuilder();
   250 				strBuf.append("Connection to ");
   251 				strBuf.append(channel.socket().getRemoteSocketAddress());
   252 				strBuf.append(" closed: ");
   253 				strBuf.append(ex0);
   254 				Log.get().info(strBuf.toString());
   255 			} catch (Exception ex0a) {
   256 				ex0a.printStackTrace();
   257 			}
   258 		} catch (Exception ex1) { // This will catch a second StorageBackendException
   259 			try {
   260 				command = null;
   261 				Log.get().log(Level.WARNING, ex1.getLocalizedMessage(), ex1);
   262 				println("403 Internal server error");
   263 
   264 				// Should we end the connection here?
   265 				// RFC says we MUST return 400 before closing the connection
   266 				shutdownInput();
   267 				shutdownOutput();
   268 			} catch (Exception ex2) {
   269 				ex2.printStackTrace();
   270 			}
   271 		}
   272 
   273 		if (command == null || command.hasFinished()) {
   274 			command = null;
   275 			charset = Charset.forName("UTF-8"); // Reset to default
   276 		}
   277 	}
   278 
   279 	/**
   280 	 * This method determines the fitting command processing class.
   281 	 * @param line
   282 	 * @return
   283 	 */
   284 	private Command parseCommandLine(String line) {
   285 		String cmdStr = line.split(" ")[0];
   286 		return CommandSelector.getInstance().get(cmdStr);
   287 	}
   288 
   289 	/**
   290 	 * Puts the given line into the output buffer, adds a newline character
   291 	 * and returns. The method returns immediately and does not block until
   292 	 * the line was sent. If line is longer than 510 octets it is split up in
   293 	 * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE).
   294 	 * @param line
   295 	 */
   296 	public void println(final CharSequence line, final Charset charset)
   297 			throws IOException {
   298 		writeToChannel(CharBuffer.wrap(line), charset, line);
   299 		writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
   300 	}
   301 
   302 	/**
   303 	 * Writes the given raw lines to the output buffers and finishes with
   304 	 * a newline character (\r\n).
   305 	 * @param rawLines
   306 	 */
   307 	public void println(final byte[] rawLines)
   308 			throws IOException {
   309 		this.lineBuffers.addOutputBuffer(ByteBuffer.wrap(rawLines));
   310 		writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
   311 	}
   312 	
   313 	/**
   314 	 * Same as {@link #println(byte[]) } but escapes lines containing single dot,
   315 	 * which has special meaning in protocol (end of message).
   316 	 * 
   317 	 * This method is safe to be used for writing messages – if message contains 
   318 	 * a line with single dot, it will be doubled and thus not interpreted 
   319 	 * by NNTP client as end of message
   320 	 * 
   321 	 * @param rawLines
   322 	 * @throws IOException 
   323 	 */
   324 	public void printlnEscapeDots(final byte[] rawLines) throws IOException {
   325 		// TODO: optimalizace
   326 		
   327 		ByteArrayOutputStream baos = new ByteArrayOutputStream(rawLines.length + 10);
   328 		CRLFOutputStream crlfStream = new CRLFOutputStream(baos);
   329 		SMTPOutputStream smtpStream = new SMTPOutputStream(crlfStream);
   330 		smtpStream.write(rawLines);
   331 		
   332 		println(baos.toByteArray());
   333 		
   334 		smtpStream.close();
   335 	}
   336 
   337 	/**
   338 	 * Encodes the given CharBuffer using the given Charset to a bunch of
   339 	 * ByteBuffers (each 512 bytes large) and enqueues them for writing at the
   340 	 * connected SocketChannel.
   341 	 * @throws java.io.IOException
   342 	 */
   343 	private void writeToChannel(CharBuffer characters, final Charset charset,
   344 			CharSequence debugLine)
   345 			throws IOException {
   346 		if (!charset.canEncode()) {
   347 			Log.get().severe("FATAL: Charset " + charset + " cannot encode!");
   348 			return;
   349 		}
   350 
   351 		// Write characters to output buffers
   352 		LineEncoder lenc = new LineEncoder(characters, charset);
   353 		lenc.encode(lineBuffers);
   354 
   355 		enableWriteEvents(debugLine);
   356 	}
   357 
   358 	private void enableWriteEvents(CharSequence debugLine) {
   359 		// Enable OP_WRITE events so that the buffers are processed
   360 		try {
   361 			this.writeSelKey.interestOps(SelectionKey.OP_WRITE);
   362 			ChannelWriter.getInstance().getSelector().wakeup();
   363 		} catch (Exception ex) // CancelledKeyException and ChannelCloseException
   364 		{
   365 			Log.get().warning("NNTPConnection.writeToChannel(): " + ex);
   366 			return;
   367 		}
   368 
   369 		// Update last activity timestamp
   370 		this.lastActivity = System.currentTimeMillis();
   371 		if (debugLine != null) {
   372 			Log.get().fine(">> " + debugLine);
   373 		}
   374 	}
   375 
   376 	public void println(final CharSequence line)
   377 			throws IOException {
   378 		println(line, charset);
   379 	}
   380 
   381 	public void print(final String line)
   382 			throws IOException {
   383 		writeToChannel(CharBuffer.wrap(line), charset, line);
   384 	}
   385 
   386 	public void setCurrentCharset(final Charset charset) {
   387 		this.charset = charset;
   388 	}
   389 
   390 	void setLastActivity(long timestamp) {
   391 		this.lastActivity = timestamp;
   392 	}
   393 
   394 	/**
   395 	 * @return Current username. 
   396 	 * But user may not have been authenticated yet.
   397 	 * You must check {@link #isUserAuthenticated()}
   398 	 */
   399 	public String getUsername() {
   400 		return username;
   401 	}
   402 
   403 	/**
   404 	 * This method is to be called from AUTHINFO USER Command implementation.
   405 	 * @param username username from AUTHINFO USER username.
   406 	 */
   407 	public void setUsername(String username) {
   408 		this.username = username;
   409 	}
   410 
   411 	/**
   412 	 * @return true if current user (see {@link #getUsername()}) has been succesfully authenticated.
   413 	 */
   414 	public boolean isUserAuthenticated() {
   415 		return userAuthenticated;
   416 	}
   417 
   418 	/**
   419 	 * This method is to be called from AUTHINFO PASS Command implementation.
   420 	 * @param userAuthenticated true if user has provided right password in AUTHINFO PASS password.
   421 	 */
   422 	public void setUserAuthenticated(boolean userAuthenticated) {
   423 		this.userAuthenticated = userAuthenticated;
   424 	}
   425 }