src/org/sonews/daemon/NNTPConnection.java
author František Kučera <franta-hg@frantovo.cz>
Wed Oct 19 21:40:51 2011 +0200 (2011-10-19)
changeset 101 d54786065fa3
parent 49 8df94bfd3e2f
child 108 fdc075324ef3
permissions -rwxr-xr-x
Drupal: ověřování uživatelů.
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 package org.sonews.daemon;
    19 
    20 import java.io.IOException;
    21 import java.net.InetSocketAddress;
    22 import java.net.SocketException;
    23 import java.nio.ByteBuffer;
    24 import java.nio.CharBuffer;
    25 import java.nio.channels.ClosedChannelException;
    26 import java.nio.channels.SelectionKey;
    27 import java.nio.channels.SocketChannel;
    28 import java.nio.charset.Charset;
    29 import java.util.Arrays;
    30 import java.util.Timer;
    31 import java.util.TimerTask;
    32 import java.util.logging.Level;
    33 import org.sonews.daemon.command.Command;
    34 import org.sonews.storage.Article;
    35 import org.sonews.storage.Group;
    36 import org.sonews.storage.StorageBackendException;
    37 import org.sonews.util.Log;
    38 import org.sonews.util.Stats;
    39 
    40 /**
    41  * For every SocketChannel (so TCP/IP connection) there is an instance of
    42  * this class.
    43  * @author Christian Lins
    44  * @since sonews/0.5.0
    45  */
    46 public final class NNTPConnection {
    47 
    48 	public static final String NEWLINE = "\r\n";    // RFC defines this as newline
    49 	public static final String MESSAGE_ID_PATTERN = "<[^>]+>";
    50 	private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon
    51 	/** SocketChannel is generally thread-safe */
    52 	private SocketChannel channel = null;
    53 	private Charset charset = Charset.forName("UTF-8");
    54 	private Command command = null;
    55 	private Article currentArticle = null;
    56 	private Group currentGroup = null;
    57 	private volatile long lastActivity = System.currentTimeMillis();
    58 	private ChannelLineBuffers lineBuffers = new ChannelLineBuffers();
    59 	private int readLock = 0;
    60 	private final Object readLockGate = new Object();
    61 	private SelectionKey writeSelKey = null;
    62 	
    63 	private String username;
    64 	private boolean userAuthenticated = false;
    65 
    66 	public NNTPConnection(final SocketChannel channel)
    67 			throws IOException {
    68 		if (channel == null) {
    69 			throw new IllegalArgumentException("channel is null");
    70 		}
    71 
    72 		this.channel = channel;
    73 		Stats.getInstance().clientConnect();
    74 	}
    75 
    76 	/**
    77 	 * Tries to get the read lock for this NNTPConnection. This method is Thread-
    78 	 * safe and returns true of the read lock was successfully set. If the lock
    79 	 * is still hold by another Thread the method returns false.
    80 	 */
    81 	boolean tryReadLock() {
    82 		// As synchronizing simple types may cause deadlocks,
    83 		// we use a gate object.
    84 		synchronized (readLockGate) {
    85 			if (readLock != 0) {
    86 				return false;
    87 			} else {
    88 				readLock = Thread.currentThread().hashCode();
    89 				return true;
    90 			}
    91 		}
    92 	}
    93 
    94 	/**
    95 	 * Releases the read lock in a Thread-safe way.
    96 	 * @throws IllegalMonitorStateException if a Thread not holding the lock
    97 	 * tries to release it.
    98 	 */
    99 	void unlockReadLock() {
   100 		synchronized (readLockGate) {
   101 			if (readLock == Thread.currentThread().hashCode()) {
   102 				readLock = 0;
   103 			} else {
   104 				throw new IllegalMonitorStateException();
   105 			}
   106 		}
   107 	}
   108 
   109 	/**
   110 	 * @return Current input buffer of this NNTPConnection instance.
   111 	 */
   112 	public ByteBuffer getInputBuffer() {
   113 		return this.lineBuffers.getInputBuffer();
   114 	}
   115 
   116 	/**
   117 	 * @return Output buffer of this NNTPConnection which has at least one byte
   118 	 * free storage.
   119 	 */
   120 	public ByteBuffer getOutputBuffer() {
   121 		return this.lineBuffers.getOutputBuffer();
   122 	}
   123 
   124 	/**
   125 	 * @return ChannelLineBuffers instance associated with this NNTPConnection.
   126 	 */
   127 	public ChannelLineBuffers getBuffers() {
   128 		return this.lineBuffers;
   129 	}
   130 
   131 	/**
   132 	 * @return true if this connection comes from a local remote address.
   133 	 */
   134 	public boolean isLocalConnection() {
   135 		return ((InetSocketAddress) this.channel.socket().getRemoteSocketAddress()).getHostName().equalsIgnoreCase("localhost");
   136 	}
   137 
   138 	void setWriteSelectionKey(SelectionKey selKey) {
   139 		this.writeSelKey = selKey;
   140 	}
   141 
   142 	public void shutdownInput() {
   143 		try {
   144 			// Closes the input line of the channel's socket, so no new data
   145 			// will be received and a timeout can be triggered.
   146 			this.channel.socket().shutdownInput();
   147 		} catch (IOException ex) {
   148 			Log.get().warning("Exception in NNTPConnection.shutdownInput(): " + ex);
   149 		}
   150 	}
   151 
   152 	public void shutdownOutput() {
   153 		cancelTimer.schedule(new TimerTask() {
   154 			@Override
   155 			public void run() {
   156 				try {
   157 					// Closes the output line of the channel's socket.
   158 					channel.socket().shutdownOutput();
   159 					channel.close();
   160 				} catch (SocketException ex) {
   161 					// Socket was already disconnected
   162 					Log.get().info("NNTPConnection.shutdownOutput(): " + ex);
   163 				} catch (Exception ex) {
   164 					Log.get().warning("NNTPConnection.shutdownOutput(): " + ex);
   165 				}
   166 			}
   167 		}, 3000);
   168 	}
   169 
   170 	public SocketChannel getSocketChannel() {
   171 		return this.channel;
   172 	}
   173 
   174 	public Article getCurrentArticle() {
   175 		return this.currentArticle;
   176 	}
   177 
   178 	public Charset getCurrentCharset() {
   179 		return this.charset;
   180 	}
   181 
   182 	/**
   183 	 * @return The currently selected communication channel (not SocketChannel)
   184 	 */
   185 	public Group getCurrentChannel() {
   186 		return this.currentGroup;
   187 	}
   188 
   189 	public void setCurrentArticle(final Article article) {
   190 		this.currentArticle = article;
   191 	}
   192 
   193 	public void setCurrentGroup(final Group group) {
   194 		this.currentGroup = group;
   195 	}
   196 
   197 	public long getLastActivity() {
   198 		return this.lastActivity;
   199 	}
   200 
   201 	/**
   202 	 * Due to the readLockGate there is no need to synchronize this method.
   203 	 * @param raw
   204 	 * @throws IllegalArgumentException if raw is null.
   205 	 * @throws IllegalStateException if calling thread does not own the readLock.
   206 	 */
   207 	void lineReceived(byte[] raw) {
   208 		if (raw == null) {
   209 			throw new IllegalArgumentException("raw is null");
   210 		}
   211 
   212 		if (readLock == 0 || readLock != Thread.currentThread().hashCode()) {
   213 			throw new IllegalStateException("readLock not properly set");
   214 		}
   215 
   216 		this.lastActivity = System.currentTimeMillis();
   217 
   218 		String line = new String(raw, this.charset);
   219 
   220 		// There might be a trailing \r, but trim() is a bad idea
   221 		// as it removes also leading spaces from long header lines.
   222 		if (line.endsWith("\r")) {
   223 			line = line.substring(0, line.length() - 1);
   224 			raw = Arrays.copyOf(raw, raw.length - 1);
   225 		}
   226 
   227 		Log.get().fine("<< " + line);
   228 
   229 		if (command == null) {
   230 			command = parseCommandLine(line);
   231 			assert command != null;
   232 		}
   233 
   234 		try {
   235 			// The command object will process the line we just received
   236 			try {
   237 				command.processLine(this, line, raw);
   238 			} catch (StorageBackendException ex) {
   239 				Log.get().info("Retry command processing after StorageBackendException");
   240 
   241 				// Try it a second time, so that the backend has time to recover
   242 				command.processLine(this, line, raw);
   243 			}
   244 		} catch (ClosedChannelException ex0) {
   245 			try {
   246 				StringBuilder strBuf = new StringBuilder();
   247 				strBuf.append("Connection to ");
   248 				strBuf.append(channel.socket().getRemoteSocketAddress());
   249 				strBuf.append(" closed: ");
   250 				strBuf.append(ex0);
   251 				Log.get().info(strBuf.toString());
   252 			} catch (Exception ex0a) {
   253 				ex0a.printStackTrace();
   254 			}
   255 		} catch (Exception ex1) { // This will catch a second StorageBackendException
   256 			try {
   257 				command = null;
   258 				Log.get().log(Level.WARNING, ex1.getLocalizedMessage(), ex1);
   259 				println("403 Internal server error");
   260 
   261 				// Should we end the connection here?
   262 				// RFC says we MUST return 400 before closing the connection
   263 				shutdownInput();
   264 				shutdownOutput();
   265 			} catch (Exception ex2) {
   266 				ex2.printStackTrace();
   267 			}
   268 		}
   269 
   270 		if (command == null || command.hasFinished()) {
   271 			command = null;
   272 			charset = Charset.forName("UTF-8"); // Reset to default
   273 		}
   274 	}
   275 
   276 	/**
   277 	 * This method determines the fitting command processing class.
   278 	 * @param line
   279 	 * @return
   280 	 */
   281 	private Command parseCommandLine(String line) {
   282 		String cmdStr = line.split(" ")[0];
   283 		return CommandSelector.getInstance().get(cmdStr);
   284 	}
   285 
   286 	/**
   287 	 * Puts the given line into the output buffer, adds a newline character
   288 	 * and returns. The method returns immediately and does not block until
   289 	 * the line was sent. If line is longer than 510 octets it is split up in
   290 	 * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE).
   291 	 * @param line
   292 	 */
   293 	public void println(final CharSequence line, final Charset charset)
   294 			throws IOException {
   295 		writeToChannel(CharBuffer.wrap(line), charset, line);
   296 		writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
   297 	}
   298 
   299 	/**
   300 	 * Writes the given raw lines to the output buffers and finishes with
   301 	 * a newline character (\r\n).
   302 	 * @param rawLines
   303 	 */
   304 	public void println(final byte[] rawLines)
   305 			throws IOException {
   306 		this.lineBuffers.addOutputBuffer(ByteBuffer.wrap(rawLines));
   307 		writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
   308 	}
   309 
   310 	/**
   311 	 * Encodes the given CharBuffer using the given Charset to a bunch of
   312 	 * ByteBuffers (each 512 bytes large) and enqueues them for writing at the
   313 	 * connected SocketChannel.
   314 	 * @throws java.io.IOException
   315 	 */
   316 	private void writeToChannel(CharBuffer characters, final Charset charset,
   317 			CharSequence debugLine)
   318 			throws IOException {
   319 		if (!charset.canEncode()) {
   320 			Log.get().severe("FATAL: Charset " + charset + " cannot encode!");
   321 			return;
   322 		}
   323 
   324 		// Write characters to output buffers
   325 		LineEncoder lenc = new LineEncoder(characters, charset);
   326 		lenc.encode(lineBuffers);
   327 
   328 		enableWriteEvents(debugLine);
   329 	}
   330 
   331 	private void enableWriteEvents(CharSequence debugLine) {
   332 		// Enable OP_WRITE events so that the buffers are processed
   333 		try {
   334 			this.writeSelKey.interestOps(SelectionKey.OP_WRITE);
   335 			ChannelWriter.getInstance().getSelector().wakeup();
   336 		} catch (Exception ex) // CancelledKeyException and ChannelCloseException
   337 		{
   338 			Log.get().warning("NNTPConnection.writeToChannel(): " + ex);
   339 			return;
   340 		}
   341 
   342 		// Update last activity timestamp
   343 		this.lastActivity = System.currentTimeMillis();
   344 		if (debugLine != null) {
   345 			Log.get().fine(">> " + debugLine);
   346 		}
   347 	}
   348 
   349 	public void println(final CharSequence line)
   350 			throws IOException {
   351 		println(line, charset);
   352 	}
   353 
   354 	public void print(final String line)
   355 			throws IOException {
   356 		writeToChannel(CharBuffer.wrap(line), charset, line);
   357 	}
   358 
   359 	public void setCurrentCharset(final Charset charset) {
   360 		this.charset = charset;
   361 	}
   362 
   363 	void setLastActivity(long timestamp) {
   364 		this.lastActivity = timestamp;
   365 	}
   366 
   367 	/**
   368 	 * @return Current username. 
   369 	 * But user may not have been authenticated yet.
   370 	 * You must check {@link #isUserAuthenticated()}
   371 	 */
   372 	public String getUsername() {
   373 		return username;
   374 	}
   375 
   376 	/**
   377 	 * This method is to be called from AUTHINFO USER Command implementation.
   378 	 * @param username username from AUTHINFO USER username.
   379 	 */
   380 	public void setUsername(String username) {
   381 		this.username = username;
   382 	}
   383 
   384 	/**
   385 	 * @return true if current user (see {@link #getUsername()}) has been succesfully authenticated.
   386 	 */
   387 	public boolean isUserAuthenticated() {
   388 		return userAuthenticated;
   389 	}
   390 
   391 	/**
   392 	 * This method is to be called from AUTHINFO PASS Command implementation.
   393 	 * @param userAuthenticated true if user has provided right password in AUTHINFO PASS password.
   394 	 */
   395 	public void setUserAuthenticated(boolean userAuthenticated) {
   396 		this.userAuthenticated = userAuthenticated;
   397 	}
   398 }