chris@1: /* chris@1: * SONEWS News Server chris@1: * see AUTHORS for the list of contributors chris@1: * chris@1: * This program is free software: you can redistribute it and/or modify chris@1: * it under the terms of the GNU General Public License as published by chris@1: * the Free Software Foundation, either version 3 of the License, or chris@1: * (at your option) any later version. chris@1: * chris@1: * This program is distributed in the hope that it will be useful, chris@1: * but WITHOUT ANY WARRANTY; without even the implied warranty of chris@1: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the chris@1: * GNU General Public License for more details. chris@1: * chris@1: * You should have received a copy of the GNU General Public License chris@1: * along with this program. If not, see . chris@1: */ chris@1: chris@1: package org.sonews.daemon; chris@1: chris@1: import org.sonews.util.Log; chris@1: import java.io.IOException; chris@1: import java.net.InetSocketAddress; chris@3: import java.net.SocketException; chris@1: import java.nio.ByteBuffer; chris@1: import java.nio.CharBuffer; chris@1: import java.nio.channels.ClosedChannelException; chris@1: import java.nio.channels.SelectionKey; chris@1: import java.nio.channels.SocketChannel; chris@1: import java.nio.charset.Charset; chris@3: import java.util.Arrays; chris@1: import java.util.Timer; chris@1: import java.util.TimerTask; chris@3: import org.sonews.daemon.command.Command; chris@3: import org.sonews.storage.Article; chris@3: import org.sonews.storage.Channel; chris@1: import org.sonews.util.Stats; chris@1: chris@1: /** chris@1: * For every SocketChannel (so TCP/IP connection) there is an instance of chris@1: * this class. chris@1: * @author Christian Lins chris@1: * @since sonews/0.5.0 chris@1: */ chris@1: public final class NNTPConnection chris@1: { chris@1: chris@1: public static final String NEWLINE = "\r\n"; // RFC defines this as newline chris@1: public static final String MESSAGE_ID_PATTERN = "<[^>]+>"; chris@1: chris@1: private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon chris@1: chris@1: /** SocketChannel is generally thread-safe */ chris@1: private SocketChannel channel = null; chris@1: private Charset charset = Charset.forName("UTF-8"); chris@3: private Command command = null; chris@1: private Article currentArticle = null; chris@3: private Channel currentGroup = null; chris@1: private volatile long lastActivity = System.currentTimeMillis(); chris@1: private ChannelLineBuffers lineBuffers = new ChannelLineBuffers(); chris@1: private int readLock = 0; chris@1: private final Object readLockGate = new Object(); chris@1: private SelectionKey writeSelKey = null; chris@1: chris@1: public NNTPConnection(final SocketChannel channel) chris@1: throws IOException chris@1: { chris@1: if(channel == null) chris@1: { chris@1: throw new IllegalArgumentException("channel is null"); chris@1: } chris@1: chris@1: this.channel = channel; chris@1: Stats.getInstance().clientConnect(); chris@1: } chris@1: chris@1: /** chris@1: * Tries to get the read lock for this NNTPConnection. This method is Thread- chris@1: * safe and returns true of the read lock was successfully set. If the lock chris@1: * is still hold by another Thread the method returns false. chris@1: */ chris@1: boolean tryReadLock() chris@1: { chris@1: // As synchronizing simple types may cause deadlocks, chris@1: // we use a gate object. chris@1: synchronized(readLockGate) chris@1: { chris@1: if(readLock != 0) chris@1: { chris@1: return false; chris@1: } chris@1: else chris@1: { chris@1: readLock = Thread.currentThread().hashCode(); chris@1: return true; chris@1: } chris@1: } chris@1: } chris@1: chris@1: /** chris@1: * Releases the read lock in a Thread-safe way. chris@1: * @throws IllegalMonitorStateException if a Thread not holding the lock chris@1: * tries to release it. chris@1: */ chris@1: void unlockReadLock() chris@1: { chris@1: synchronized(readLockGate) chris@1: { chris@1: if(readLock == Thread.currentThread().hashCode()) chris@1: { chris@1: readLock = 0; chris@1: } chris@1: else chris@1: { chris@1: throw new IllegalMonitorStateException(); chris@1: } chris@1: } chris@1: } chris@1: chris@1: /** chris@1: * @return Current input buffer of this NNTPConnection instance. chris@1: */ chris@1: public ByteBuffer getInputBuffer() chris@1: { chris@1: return this.lineBuffers.getInputBuffer(); chris@1: } chris@1: chris@1: /** chris@1: * @return Output buffer of this NNTPConnection which has at least one byte chris@1: * free storage. chris@1: */ chris@1: public ByteBuffer getOutputBuffer() chris@1: { chris@1: return this.lineBuffers.getOutputBuffer(); chris@1: } chris@1: chris@1: /** chris@1: * @return ChannelLineBuffers instance associated with this NNTPConnection. chris@1: */ chris@1: public ChannelLineBuffers getBuffers() chris@1: { chris@1: return this.lineBuffers; chris@1: } chris@1: chris@1: /** chris@1: * @return true if this connection comes from a local remote address. chris@1: */ chris@1: public boolean isLocalConnection() chris@1: { chris@1: return ((InetSocketAddress)this.channel.socket().getRemoteSocketAddress()) chris@1: .getHostName().equalsIgnoreCase("localhost"); chris@1: } chris@1: chris@1: void setWriteSelectionKey(SelectionKey selKey) chris@1: { chris@1: this.writeSelKey = selKey; chris@1: } chris@1: chris@1: public void shutdownInput() chris@1: { chris@1: try chris@1: { chris@1: // Closes the input line of the channel's socket, so no new data chris@1: // will be received and a timeout can be triggered. chris@1: this.channel.socket().shutdownInput(); chris@1: } chris@1: catch(IOException ex) chris@1: { chris@1: Log.msg("Exception in NNTPConnection.shutdownInput(): " + ex, false); chris@1: if(Log.isDebug()) chris@1: { chris@1: ex.printStackTrace(); chris@1: } chris@1: } chris@1: } chris@1: chris@1: public void shutdownOutput() chris@1: { chris@1: cancelTimer.schedule(new TimerTask() chris@1: { chris@1: @Override chris@1: public void run() chris@1: { chris@1: try chris@1: { chris@1: // Closes the output line of the channel's socket. chris@1: channel.socket().shutdownOutput(); chris@1: channel.close(); chris@1: } chris@3: catch(SocketException ex) chris@3: { chris@3: // Socket was already disconnected chris@3: Log.msg("NNTPConnection.shutdownOutput(): " + ex, true); chris@3: } chris@1: catch(Exception ex) chris@1: { chris@1: Log.msg("NNTPConnection.shutdownOutput(): " + ex, false); chris@1: if(Log.isDebug()) chris@1: { chris@1: ex.printStackTrace(); chris@1: } chris@1: } chris@1: } chris@1: }, 3000); chris@1: } chris@1: chris@3: public SocketChannel getSocketChannel() chris@1: { chris@1: return this.channel; chris@1: } chris@1: chris@1: public Article getCurrentArticle() chris@1: { chris@1: return this.currentArticle; chris@1: } chris@1: chris@1: public Charset getCurrentCharset() chris@1: { chris@1: return this.charset; chris@1: } chris@3: chris@3: /** chris@3: * @return The currently selected communication channel (not SocketChannel) chris@3: */ chris@3: public Channel getCurrentChannel() chris@1: { chris@1: return this.currentGroup; chris@1: } chris@1: chris@1: public void setCurrentArticle(final Article article) chris@1: { chris@1: this.currentArticle = article; chris@1: } chris@1: chris@3: public void setCurrentGroup(final Channel group) chris@1: { chris@1: this.currentGroup = group; chris@1: } chris@1: chris@1: public long getLastActivity() chris@1: { chris@1: return this.lastActivity; chris@1: } chris@1: chris@1: /** chris@1: * Due to the readLockGate there is no need to synchronize this method. chris@1: * @param raw chris@1: * @throws IllegalArgumentException if raw is null. chris@1: * @throws IllegalStateException if calling thread does not own the readLock. chris@1: */ chris@1: void lineReceived(byte[] raw) chris@1: { chris@1: if(raw == null) chris@1: { chris@1: throw new IllegalArgumentException("raw is null"); chris@1: } chris@1: chris@1: if(readLock == 0 || readLock != Thread.currentThread().hashCode()) chris@1: { chris@1: throw new IllegalStateException("readLock not properly set"); chris@1: } chris@1: chris@1: this.lastActivity = System.currentTimeMillis(); chris@1: chris@1: String line = new String(raw, this.charset); chris@1: chris@1: // There might be a trailing \r, but trim() is a bad idea chris@1: // as it removes also leading spaces from long header lines. chris@1: if(line.endsWith("\r")) chris@1: { chris@1: line = line.substring(0, line.length() - 1); chris@3: raw = Arrays.copyOf(raw, raw.length - 1); chris@1: } chris@1: chris@1: Log.msg("<< " + line, true); chris@1: chris@1: if(command == null) chris@1: { chris@1: command = parseCommandLine(line); chris@1: assert command != null; chris@1: } chris@1: chris@1: try chris@1: { chris@1: // The command object will process the line we just received chris@3: command.processLine(this, line, raw); chris@1: } chris@1: catch(ClosedChannelException ex0) chris@1: { chris@1: try chris@1: { chris@1: Log.msg("Connection to " + channel.socket().getRemoteSocketAddress() chris@1: + " closed: " + ex0, true); chris@1: } chris@1: catch(Exception ex0a) chris@1: { chris@1: ex0a.printStackTrace(); chris@1: } chris@1: } chris@1: catch(Exception ex1) chris@1: { chris@1: try chris@1: { chris@1: command = null; chris@1: ex1.printStackTrace(); chris@1: println("500 Internal server error"); chris@1: } chris@1: catch(Exception ex2) chris@1: { chris@1: ex2.printStackTrace(); chris@1: } chris@1: } chris@1: chris@1: if(command == null || command.hasFinished()) chris@1: { chris@1: command = null; chris@1: charset = Charset.forName("UTF-8"); // Reset to default chris@1: } chris@1: } chris@1: chris@1: /** chris@3: * This method determines the fitting command processing class. chris@1: * @param line chris@1: * @return chris@1: */ chris@3: private Command parseCommandLine(String line) chris@1: { chris@3: String cmdStr = line.split(" ")[0]; chris@3: return CommandSelector.getInstance().get(cmdStr); chris@1: } chris@1: chris@1: /** chris@1: * Puts the given line into the output buffer, adds a newline character chris@1: * and returns. The method returns immediately and does not block until chris@1: * the line was sent. If line is longer than 510 octets it is split up in chris@1: * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE). chris@1: * @param line chris@1: */ chris@1: public void println(final CharSequence line, final Charset charset) chris@1: throws IOException chris@1: { chris@1: writeToChannel(CharBuffer.wrap(line), charset, line); chris@1: writeToChannel(CharBuffer.wrap(NEWLINE), charset, null); chris@1: } chris@3: chris@3: /** chris@3: * Writes the given raw lines to the output buffers and finishes with chris@3: * a newline character (\r\n). chris@3: * @param rawLines chris@3: */ chris@3: public void println(final byte[] rawLines) chris@3: throws IOException chris@3: { chris@3: this.lineBuffers.addOutputBuffer(ByteBuffer.wrap(rawLines)); chris@3: writeToChannel(CharBuffer.wrap(NEWLINE), charset, null); chris@3: } chris@1: chris@1: /** chris@1: * Encodes the given CharBuffer using the given Charset to a bunch of chris@1: * ByteBuffers (each 512 bytes large) and enqueues them for writing at the chris@1: * connected SocketChannel. chris@1: * @throws java.io.IOException chris@1: */ chris@1: private void writeToChannel(CharBuffer characters, final Charset charset, chris@1: CharSequence debugLine) chris@1: throws IOException chris@1: { chris@1: if(!charset.canEncode()) chris@1: { chris@1: Log.msg("FATAL: Charset " + charset + " cannot encode!", false); chris@1: return; chris@1: } chris@1: chris@1: // Write characters to output buffers chris@1: LineEncoder lenc = new LineEncoder(characters, charset); chris@1: lenc.encode(lineBuffers); chris@1: chris@3: enableWriteEvents(debugLine); chris@3: } chris@3: chris@3: private void enableWriteEvents(CharSequence debugLine) chris@3: { chris@1: // Enable OP_WRITE events so that the buffers are processed chris@1: try chris@1: { chris@1: this.writeSelKey.interestOps(SelectionKey.OP_WRITE); chris@1: ChannelWriter.getInstance().getSelector().wakeup(); chris@1: } chris@1: catch (Exception ex) // CancelledKeyException and ChannelCloseException chris@1: { chris@1: Log.msg("NNTPConnection.writeToChannel(): " + ex, false); chris@1: return; chris@1: } chris@1: chris@1: // Update last activity timestamp chris@1: this.lastActivity = System.currentTimeMillis(); chris@1: if(debugLine != null) chris@1: { chris@1: Log.msg(">> " + debugLine, true); chris@1: } chris@1: } chris@1: chris@1: public void println(final CharSequence line) chris@1: throws IOException chris@1: { chris@1: println(line, charset); chris@1: } chris@1: chris@1: public void print(final String line) chris@1: throws IOException chris@1: { chris@1: writeToChannel(CharBuffer.wrap(line), charset, line); chris@1: } chris@1: chris@1: public void setCurrentCharset(final Charset charset) chris@1: { chris@1: this.charset = charset; chris@1: } chris@1: chris@1: }