1.1 --- a/org/sonews/daemon/NNTPConnection.java Sun Aug 29 17:04:25 2010 +0200
1.2 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000
1.3 @@ -1,428 +0,0 @@
1.4 -/*
1.5 - * SONEWS News Server
1.6 - * see AUTHORS for the list of contributors
1.7 - *
1.8 - * This program is free software: you can redistribute it and/or modify
1.9 - * it under the terms of the GNU General Public License as published by
1.10 - * the Free Software Foundation, either version 3 of the License, or
1.11 - * (at your option) any later version.
1.12 - *
1.13 - * This program is distributed in the hope that it will be useful,
1.14 - * but WITHOUT ANY WARRANTY; without even the implied warranty of
1.15 - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1.16 - * GNU General Public License for more details.
1.17 - *
1.18 - * You should have received a copy of the GNU General Public License
1.19 - * along with this program. If not, see <http://www.gnu.org/licenses/>.
1.20 - */
1.21 -
1.22 -package org.sonews.daemon;
1.23 -
1.24 -import java.io.IOException;
1.25 -import java.net.InetSocketAddress;
1.26 -import java.net.SocketException;
1.27 -import java.nio.ByteBuffer;
1.28 -import java.nio.CharBuffer;
1.29 -import java.nio.channels.ClosedChannelException;
1.30 -import java.nio.channels.SelectionKey;
1.31 -import java.nio.channels.SocketChannel;
1.32 -import java.nio.charset.Charset;
1.33 -import java.util.Arrays;
1.34 -import java.util.Timer;
1.35 -import java.util.TimerTask;
1.36 -import org.sonews.daemon.command.Command;
1.37 -import org.sonews.storage.Article;
1.38 -import org.sonews.storage.Channel;
1.39 -import org.sonews.storage.StorageBackendException;
1.40 -import org.sonews.util.Log;
1.41 -import org.sonews.util.Stats;
1.42 -
1.43 -/**
1.44 - * For every SocketChannel (so TCP/IP connection) there is an instance of
1.45 - * this class.
1.46 - * @author Christian Lins
1.47 - * @since sonews/0.5.0
1.48 - */
1.49 -public final class NNTPConnection
1.50 -{
1.51 -
1.52 - public static final String NEWLINE = "\r\n"; // RFC defines this as newline
1.53 - public static final String MESSAGE_ID_PATTERN = "<[^>]+>";
1.54 -
1.55 - private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon
1.56 -
1.57 - /** SocketChannel is generally thread-safe */
1.58 - private SocketChannel channel = null;
1.59 - private Charset charset = Charset.forName("UTF-8");
1.60 - private Command command = null;
1.61 - private Article currentArticle = null;
1.62 - private Channel currentGroup = null;
1.63 - private volatile long lastActivity = System.currentTimeMillis();
1.64 - private ChannelLineBuffers lineBuffers = new ChannelLineBuffers();
1.65 - private int readLock = 0;
1.66 - private final Object readLockGate = new Object();
1.67 - private SelectionKey writeSelKey = null;
1.68 -
1.69 - public NNTPConnection(final SocketChannel channel)
1.70 - throws IOException
1.71 - {
1.72 - if(channel == null)
1.73 - {
1.74 - throw new IllegalArgumentException("channel is null");
1.75 - }
1.76 -
1.77 - this.channel = channel;
1.78 - Stats.getInstance().clientConnect();
1.79 - }
1.80 -
1.81 - /**
1.82 - * Tries to get the read lock for this NNTPConnection. This method is Thread-
1.83 - * safe and returns true of the read lock was successfully set. If the lock
1.84 - * is still hold by another Thread the method returns false.
1.85 - */
1.86 - boolean tryReadLock()
1.87 - {
1.88 - // As synchronizing simple types may cause deadlocks,
1.89 - // we use a gate object.
1.90 - synchronized(readLockGate)
1.91 - {
1.92 - if(readLock != 0)
1.93 - {
1.94 - return false;
1.95 - }
1.96 - else
1.97 - {
1.98 - readLock = Thread.currentThread().hashCode();
1.99 - return true;
1.100 - }
1.101 - }
1.102 - }
1.103 -
1.104 - /**
1.105 - * Releases the read lock in a Thread-safe way.
1.106 - * @throws IllegalMonitorStateException if a Thread not holding the lock
1.107 - * tries to release it.
1.108 - */
1.109 - void unlockReadLock()
1.110 - {
1.111 - synchronized(readLockGate)
1.112 - {
1.113 - if(readLock == Thread.currentThread().hashCode())
1.114 - {
1.115 - readLock = 0;
1.116 - }
1.117 - else
1.118 - {
1.119 - throw new IllegalMonitorStateException();
1.120 - }
1.121 - }
1.122 - }
1.123 -
1.124 - /**
1.125 - * @return Current input buffer of this NNTPConnection instance.
1.126 - */
1.127 - public ByteBuffer getInputBuffer()
1.128 - {
1.129 - return this.lineBuffers.getInputBuffer();
1.130 - }
1.131 -
1.132 - /**
1.133 - * @return Output buffer of this NNTPConnection which has at least one byte
1.134 - * free storage.
1.135 - */
1.136 - public ByteBuffer getOutputBuffer()
1.137 - {
1.138 - return this.lineBuffers.getOutputBuffer();
1.139 - }
1.140 -
1.141 - /**
1.142 - * @return ChannelLineBuffers instance associated with this NNTPConnection.
1.143 - */
1.144 - public ChannelLineBuffers getBuffers()
1.145 - {
1.146 - return this.lineBuffers;
1.147 - }
1.148 -
1.149 - /**
1.150 - * @return true if this connection comes from a local remote address.
1.151 - */
1.152 - public boolean isLocalConnection()
1.153 - {
1.154 - return ((InetSocketAddress)this.channel.socket().getRemoteSocketAddress())
1.155 - .getHostName().equalsIgnoreCase("localhost");
1.156 - }
1.157 -
1.158 - void setWriteSelectionKey(SelectionKey selKey)
1.159 - {
1.160 - this.writeSelKey = selKey;
1.161 - }
1.162 -
1.163 - public void shutdownInput()
1.164 - {
1.165 - try
1.166 - {
1.167 - // Closes the input line of the channel's socket, so no new data
1.168 - // will be received and a timeout can be triggered.
1.169 - this.channel.socket().shutdownInput();
1.170 - }
1.171 - catch(IOException ex)
1.172 - {
1.173 - Log.get().warning("Exception in NNTPConnection.shutdownInput(): " + ex);
1.174 - }
1.175 - }
1.176 -
1.177 - public void shutdownOutput()
1.178 - {
1.179 - cancelTimer.schedule(new TimerTask()
1.180 - {
1.181 - @Override
1.182 - public void run()
1.183 - {
1.184 - try
1.185 - {
1.186 - // Closes the output line of the channel's socket.
1.187 - channel.socket().shutdownOutput();
1.188 - channel.close();
1.189 - }
1.190 - catch(SocketException ex)
1.191 - {
1.192 - // Socket was already disconnected
1.193 - Log.get().info("NNTPConnection.shutdownOutput(): " + ex);
1.194 - }
1.195 - catch(Exception ex)
1.196 - {
1.197 - Log.get().warning("NNTPConnection.shutdownOutput(): " + ex);
1.198 - }
1.199 - }
1.200 - }, 3000);
1.201 - }
1.202 -
1.203 - public SocketChannel getSocketChannel()
1.204 - {
1.205 - return this.channel;
1.206 - }
1.207 -
1.208 - public Article getCurrentArticle()
1.209 - {
1.210 - return this.currentArticle;
1.211 - }
1.212 -
1.213 - public Charset getCurrentCharset()
1.214 - {
1.215 - return this.charset;
1.216 - }
1.217 -
1.218 - /**
1.219 - * @return The currently selected communication channel (not SocketChannel)
1.220 - */
1.221 - public Channel getCurrentChannel()
1.222 - {
1.223 - return this.currentGroup;
1.224 - }
1.225 -
1.226 - public void setCurrentArticle(final Article article)
1.227 - {
1.228 - this.currentArticle = article;
1.229 - }
1.230 -
1.231 - public void setCurrentGroup(final Channel group)
1.232 - {
1.233 - this.currentGroup = group;
1.234 - }
1.235 -
1.236 - public long getLastActivity()
1.237 - {
1.238 - return this.lastActivity;
1.239 - }
1.240 -
1.241 - /**
1.242 - * Due to the readLockGate there is no need to synchronize this method.
1.243 - * @param raw
1.244 - * @throws IllegalArgumentException if raw is null.
1.245 - * @throws IllegalStateException if calling thread does not own the readLock.
1.246 - */
1.247 - void lineReceived(byte[] raw)
1.248 - {
1.249 - if(raw == null)
1.250 - {
1.251 - throw new IllegalArgumentException("raw is null");
1.252 - }
1.253 -
1.254 - if(readLock == 0 || readLock != Thread.currentThread().hashCode())
1.255 - {
1.256 - throw new IllegalStateException("readLock not properly set");
1.257 - }
1.258 -
1.259 - this.lastActivity = System.currentTimeMillis();
1.260 -
1.261 - String line = new String(raw, this.charset);
1.262 -
1.263 - // There might be a trailing \r, but trim() is a bad idea
1.264 - // as it removes also leading spaces from long header lines.
1.265 - if(line.endsWith("\r"))
1.266 - {
1.267 - line = line.substring(0, line.length() - 1);
1.268 - raw = Arrays.copyOf(raw, raw.length - 1);
1.269 - }
1.270 -
1.271 - Log.get().fine("<< " + line);
1.272 -
1.273 - if(command == null)
1.274 - {
1.275 - command = parseCommandLine(line);
1.276 - assert command != null;
1.277 - }
1.278 -
1.279 - try
1.280 - {
1.281 - // The command object will process the line we just received
1.282 - try
1.283 - {
1.284 - command.processLine(this, line, raw);
1.285 - }
1.286 - catch(StorageBackendException ex)
1.287 - {
1.288 - Log.get().info("Retry command processing after StorageBackendException");
1.289 -
1.290 - // Try it a second time, so that the backend has time to recover
1.291 - command.processLine(this, line, raw);
1.292 - }
1.293 - }
1.294 - catch(ClosedChannelException ex0)
1.295 - {
1.296 - try
1.297 - {
1.298 - Log.get().info("Connection to " + channel.socket().getRemoteSocketAddress()
1.299 - + " closed: " + ex0);
1.300 - }
1.301 - catch(Exception ex0a)
1.302 - {
1.303 - ex0a.printStackTrace();
1.304 - }
1.305 - }
1.306 - catch(Exception ex1) // This will catch a second StorageBackendException
1.307 - {
1.308 - try
1.309 - {
1.310 - command = null;
1.311 - ex1.printStackTrace();
1.312 - println("500 Internal server error");
1.313 - }
1.314 - catch(Exception ex2)
1.315 - {
1.316 - ex2.printStackTrace();
1.317 - }
1.318 - }
1.319 -
1.320 - if(command == null || command.hasFinished())
1.321 - {
1.322 - command = null;
1.323 - charset = Charset.forName("UTF-8"); // Reset to default
1.324 - }
1.325 - }
1.326 -
1.327 - /**
1.328 - * This method determines the fitting command processing class.
1.329 - * @param line
1.330 - * @return
1.331 - */
1.332 - private Command parseCommandLine(String line)
1.333 - {
1.334 - String cmdStr = line.split(" ")[0];
1.335 - return CommandSelector.getInstance().get(cmdStr);
1.336 - }
1.337 -
1.338 - /**
1.339 - * Puts the given line into the output buffer, adds a newline character
1.340 - * and returns. The method returns immediately and does not block until
1.341 - * the line was sent. If line is longer than 510 octets it is split up in
1.342 - * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE).
1.343 - * @param line
1.344 - */
1.345 - public void println(final CharSequence line, final Charset charset)
1.346 - throws IOException
1.347 - {
1.348 - writeToChannel(CharBuffer.wrap(line), charset, line);
1.349 - writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
1.350 - }
1.351 -
1.352 - /**
1.353 - * Writes the given raw lines to the output buffers and finishes with
1.354 - * a newline character (\r\n).
1.355 - * @param rawLines
1.356 - */
1.357 - public void println(final byte[] rawLines)
1.358 - throws IOException
1.359 - {
1.360 - this.lineBuffers.addOutputBuffer(ByteBuffer.wrap(rawLines));
1.361 - writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
1.362 - }
1.363 -
1.364 - /**
1.365 - * Encodes the given CharBuffer using the given Charset to a bunch of
1.366 - * ByteBuffers (each 512 bytes large) and enqueues them for writing at the
1.367 - * connected SocketChannel.
1.368 - * @throws java.io.IOException
1.369 - */
1.370 - private void writeToChannel(CharBuffer characters, final Charset charset,
1.371 - CharSequence debugLine)
1.372 - throws IOException
1.373 - {
1.374 - if(!charset.canEncode())
1.375 - {
1.376 - Log.get().severe("FATAL: Charset " + charset + " cannot encode!");
1.377 - return;
1.378 - }
1.379 -
1.380 - // Write characters to output buffers
1.381 - LineEncoder lenc = new LineEncoder(characters, charset);
1.382 - lenc.encode(lineBuffers);
1.383 -
1.384 - enableWriteEvents(debugLine);
1.385 - }
1.386 -
1.387 - private void enableWriteEvents(CharSequence debugLine)
1.388 - {
1.389 - // Enable OP_WRITE events so that the buffers are processed
1.390 - try
1.391 - {
1.392 - this.writeSelKey.interestOps(SelectionKey.OP_WRITE);
1.393 - ChannelWriter.getInstance().getSelector().wakeup();
1.394 - }
1.395 - catch(Exception ex) // CancelledKeyException and ChannelCloseException
1.396 - {
1.397 - Log.get().warning("NNTPConnection.writeToChannel(): " + ex);
1.398 - return;
1.399 - }
1.400 -
1.401 - // Update last activity timestamp
1.402 - this.lastActivity = System.currentTimeMillis();
1.403 - if(debugLine != null)
1.404 - {
1.405 - Log.get().fine(">> " + debugLine);
1.406 - }
1.407 - }
1.408 -
1.409 - public void println(final CharSequence line)
1.410 - throws IOException
1.411 - {
1.412 - println(line, charset);
1.413 - }
1.414 -
1.415 - public void print(final String line)
1.416 - throws IOException
1.417 - {
1.418 - writeToChannel(CharBuffer.wrap(line), charset, line);
1.419 - }
1.420 -
1.421 - public void setCurrentCharset(final Charset charset)
1.422 - {
1.423 - this.charset = charset;
1.424 - }
1.425 -
1.426 - void setLastActivity(long timestamp)
1.427 - {
1.428 - this.lastActivity = timestamp;
1.429 - }
1.430 -
1.431 -}