chris@1: /* chris@1: * SONEWS News Server chris@1: * see AUTHORS for the list of contributors chris@1: * chris@1: * This program is free software: you can redistribute it and/or modify chris@1: * it under the terms of the GNU General Public License as published by chris@1: * the Free Software Foundation, either version 3 of the License, or chris@1: * (at your option) any later version. chris@1: * chris@1: * This program is distributed in the hope that it will be useful, chris@1: * but WITHOUT ANY WARRANTY; without even the implied warranty of chris@1: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the chris@1: * GNU General Public License for more details. chris@1: * chris@1: * You should have received a copy of the GNU General Public License chris@1: * along with this program. If not, see . chris@1: */ chris@1: package org.sonews.daemon; chris@1: chris@1: import java.io.IOException; chris@1: import java.net.InetSocketAddress; chris@3: import java.net.SocketException; chris@1: import java.nio.ByteBuffer; chris@1: import java.nio.CharBuffer; chris@1: import java.nio.channels.ClosedChannelException; chris@1: import java.nio.channels.SelectionKey; chris@1: import java.nio.channels.SocketChannel; chris@1: import java.nio.charset.Charset; chris@3: import java.util.Arrays; chris@1: import java.util.Timer; chris@1: import java.util.TimerTask; cli@49: import java.util.logging.Level; chris@3: import org.sonews.daemon.command.Command; chris@3: import org.sonews.storage.Article; cli@48: import org.sonews.storage.Group; cli@30: import org.sonews.storage.StorageBackendException; cli@25: import org.sonews.util.Log; chris@1: import org.sonews.util.Stats; chris@1: chris@1: /** chris@1: * For every SocketChannel (so TCP/IP connection) there is an instance of chris@1: * this class. chris@1: * @author Christian Lins chris@1: * @since sonews/0.5.0 chris@1: */ cli@49: public final class NNTPConnection { chris@1: cli@37: public static final String NEWLINE = "\r\n"; // RFC defines this as newline cli@37: public static final String MESSAGE_ID_PATTERN = "<[^>]+>"; cli@37: private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon cli@37: /** SocketChannel is generally thread-safe */ cli@37: private SocketChannel channel = null; cli@37: private Charset charset = Charset.forName("UTF-8"); cli@37: private Command command = null; cli@37: private Article currentArticle = null; cli@48: private Group currentGroup = null; cli@37: private volatile long lastActivity = System.currentTimeMillis(); cli@37: private ChannelLineBuffers lineBuffers = new ChannelLineBuffers(); cli@37: private int readLock = 0; cli@37: private final Object readLockGate = new Object(); cli@37: private SelectionKey writeSelKey = null; franta-hg@101: franta-hg@101: private String username; franta-hg@101: private boolean userAuthenticated = false; chris@1: cli@37: public NNTPConnection(final SocketChannel channel) cli@49: throws IOException { cli@37: if (channel == null) { cli@37: throw new IllegalArgumentException("channel is null"); cli@37: } chris@1: cli@37: this.channel = channel; cli@37: Stats.getInstance().clientConnect(); cli@37: } chris@1: cli@37: /** cli@37: * Tries to get the read lock for this NNTPConnection. This method is Thread- cli@37: * safe and returns true of the read lock was successfully set. If the lock cli@37: * is still hold by another Thread the method returns false. cli@37: */ cli@49: boolean tryReadLock() { cli@37: // As synchronizing simple types may cause deadlocks, cli@37: // we use a gate object. cli@37: synchronized (readLockGate) { cli@37: if (readLock != 0) { cli@37: return false; cli@37: } else { cli@37: readLock = Thread.currentThread().hashCode(); cli@37: return true; cli@37: } cli@37: } cli@37: } chris@3: cli@37: /** cli@37: * Releases the read lock in a Thread-safe way. cli@37: * @throws IllegalMonitorStateException if a Thread not holding the lock cli@37: * tries to release it. cli@37: */ cli@49: void unlockReadLock() { cli@37: synchronized (readLockGate) { cli@37: if (readLock == Thread.currentThread().hashCode()) { cli@37: readLock = 0; cli@37: } else { cli@37: throw new IllegalMonitorStateException(); cli@37: } cli@37: } cli@37: } chris@1: cli@37: /** cli@37: * @return Current input buffer of this NNTPConnection instance. cli@37: */ cli@49: public ByteBuffer getInputBuffer() { cli@37: return this.lineBuffers.getInputBuffer(); cli@37: } chris@1: cli@37: /** cli@37: * @return Output buffer of this NNTPConnection which has at least one byte cli@37: * free storage. cli@37: */ cli@49: public ByteBuffer getOutputBuffer() { cli@37: return this.lineBuffers.getOutputBuffer(); cli@37: } cli@30: cli@37: /** cli@37: * @return ChannelLineBuffers instance associated with this NNTPConnection. cli@37: */ cli@49: public ChannelLineBuffers getBuffers() { cli@37: return this.lineBuffers; cli@37: } chris@1: cli@37: /** cli@37: * @return true if this connection comes from a local remote address. cli@37: */ cli@49: public boolean isLocalConnection() { cli@37: return ((InetSocketAddress) this.channel.socket().getRemoteSocketAddress()).getHostName().equalsIgnoreCase("localhost"); cli@37: } chris@3: cli@49: void setWriteSelectionKey(SelectionKey selKey) { cli@37: this.writeSelKey = selKey; cli@37: } chris@3: cli@49: public void shutdownInput() { cli@37: try { cli@37: // Closes the input line of the channel's socket, so no new data cli@37: // will be received and a timeout can be triggered. cli@37: this.channel.socket().shutdownInput(); cli@37: } catch (IOException ex) { cli@37: Log.get().warning("Exception in NNTPConnection.shutdownInput(): " + ex); cli@37: } cli@37: } chris@1: cli@49: public void shutdownOutput() { cli@49: cancelTimer.schedule(new TimerTask() { cli@37: @Override cli@49: public void run() { cli@37: try { cli@37: // Closes the output line of the channel's socket. cli@37: channel.socket().shutdownOutput(); cli@37: channel.close(); cli@37: } catch (SocketException ex) { cli@37: // Socket was already disconnected cli@37: Log.get().info("NNTPConnection.shutdownOutput(): " + ex); cli@37: } catch (Exception ex) { cli@37: Log.get().warning("NNTPConnection.shutdownOutput(): " + ex); cli@37: } cli@37: } cli@37: }, 3000); cli@37: } cli@37: cli@49: public SocketChannel getSocketChannel() { cli@37: return this.channel; cli@37: } cli@37: cli@49: public Article getCurrentArticle() { cli@37: return this.currentArticle; cli@37: } cli@37: cli@49: public Charset getCurrentCharset() { cli@37: return this.charset; cli@37: } cli@37: cli@37: /** cli@37: * @return The currently selected communication channel (not SocketChannel) cli@37: */ cli@49: public Group getCurrentChannel() { cli@37: return this.currentGroup; cli@37: } cli@37: cli@49: public void setCurrentArticle(final Article article) { cli@37: this.currentArticle = article; cli@37: } cli@37: cli@49: public void setCurrentGroup(final Group group) { cli@37: this.currentGroup = group; cli@37: } cli@37: cli@49: public long getLastActivity() { cli@37: return this.lastActivity; cli@37: } cli@37: cli@37: /** cli@37: * Due to the readLockGate there is no need to synchronize this method. cli@37: * @param raw cli@37: * @throws IllegalArgumentException if raw is null. cli@37: * @throws IllegalStateException if calling thread does not own the readLock. cli@37: */ cli@49: void lineReceived(byte[] raw) { cli@37: if (raw == null) { cli@37: throw new IllegalArgumentException("raw is null"); cli@37: } cli@37: cli@37: if (readLock == 0 || readLock != Thread.currentThread().hashCode()) { cli@37: throw new IllegalStateException("readLock not properly set"); cli@37: } cli@37: cli@37: this.lastActivity = System.currentTimeMillis(); cli@37: cli@37: String line = new String(raw, this.charset); cli@37: cli@37: // There might be a trailing \r, but trim() is a bad idea cli@37: // as it removes also leading spaces from long header lines. cli@37: if (line.endsWith("\r")) { cli@37: line = line.substring(0, line.length() - 1); cli@37: raw = Arrays.copyOf(raw, raw.length - 1); cli@37: } cli@37: cli@37: Log.get().fine("<< " + line); cli@37: cli@37: if (command == null) { cli@37: command = parseCommandLine(line); cli@37: assert command != null; cli@37: } cli@37: cli@37: try { cli@37: // The command object will process the line we just received cli@37: try { cli@37: command.processLine(this, line, raw); cli@37: } catch (StorageBackendException ex) { cli@37: Log.get().info("Retry command processing after StorageBackendException"); cli@37: cli@37: // Try it a second time, so that the backend has time to recover cli@37: command.processLine(this, line, raw); cli@37: } cli@37: } catch (ClosedChannelException ex0) { cli@37: try { cli@49: StringBuilder strBuf = new StringBuilder(); cli@49: strBuf.append("Connection to "); cli@49: strBuf.append(channel.socket().getRemoteSocketAddress()); cli@49: strBuf.append(" closed: "); cli@49: strBuf.append(ex0); cli@49: Log.get().info(strBuf.toString()); cli@37: } catch (Exception ex0a) { cli@37: ex0a.printStackTrace(); cli@37: } cli@49: } catch (Exception ex1) { // This will catch a second StorageBackendException cli@37: try { cli@37: command = null; cli@49: Log.get().log(Level.WARNING, ex1.getLocalizedMessage(), ex1); cli@49: println("403 Internal server error"); cli@49: cli@49: // Should we end the connection here? cli@49: // RFC says we MUST return 400 before closing the connection cli@49: shutdownInput(); cli@49: shutdownOutput(); cli@37: } catch (Exception ex2) { cli@37: ex2.printStackTrace(); cli@37: } cli@37: } cli@37: cli@37: if (command == null || command.hasFinished()) { cli@37: command = null; cli@37: charset = Charset.forName("UTF-8"); // Reset to default cli@37: } cli@37: } cli@37: cli@37: /** cli@37: * This method determines the fitting command processing class. cli@37: * @param line cli@37: * @return cli@37: */ cli@49: private Command parseCommandLine(String line) { cli@37: String cmdStr = line.split(" ")[0]; cli@37: return CommandSelector.getInstance().get(cmdStr); cli@37: } cli@37: cli@37: /** cli@37: * Puts the given line into the output buffer, adds a newline character cli@37: * and returns. The method returns immediately and does not block until cli@37: * the line was sent. If line is longer than 510 octets it is split up in cli@37: * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE). cli@37: * @param line cli@37: */ cli@37: public void println(final CharSequence line, final Charset charset) cli@49: throws IOException { cli@37: writeToChannel(CharBuffer.wrap(line), charset, line); cli@37: writeToChannel(CharBuffer.wrap(NEWLINE), charset, null); cli@37: } cli@37: cli@37: /** cli@37: * Writes the given raw lines to the output buffers and finishes with cli@37: * a newline character (\r\n). cli@37: * @param rawLines cli@37: */ cli@37: public void println(final byte[] rawLines) cli@49: throws IOException { cli@37: this.lineBuffers.addOutputBuffer(ByteBuffer.wrap(rawLines)); cli@37: writeToChannel(CharBuffer.wrap(NEWLINE), charset, null); cli@37: } cli@37: cli@37: /** cli@37: * Encodes the given CharBuffer using the given Charset to a bunch of cli@37: * ByteBuffers (each 512 bytes large) and enqueues them for writing at the cli@37: * connected SocketChannel. cli@37: * @throws java.io.IOException cli@37: */ cli@37: private void writeToChannel(CharBuffer characters, final Charset charset, cli@49: CharSequence debugLine) cli@49: throws IOException { cli@37: if (!charset.canEncode()) { cli@37: Log.get().severe("FATAL: Charset " + charset + " cannot encode!"); cli@37: return; cli@37: } cli@37: cli@37: // Write characters to output buffers cli@37: LineEncoder lenc = new LineEncoder(characters, charset); cli@37: lenc.encode(lineBuffers); cli@37: cli@37: enableWriteEvents(debugLine); cli@37: } cli@37: cli@49: private void enableWriteEvents(CharSequence debugLine) { cli@37: // Enable OP_WRITE events so that the buffers are processed cli@37: try { cli@37: this.writeSelKey.interestOps(SelectionKey.OP_WRITE); cli@37: ChannelWriter.getInstance().getSelector().wakeup(); cli@37: } catch (Exception ex) // CancelledKeyException and ChannelCloseException cli@37: { cli@37: Log.get().warning("NNTPConnection.writeToChannel(): " + ex); cli@37: return; cli@37: } cli@37: cli@37: // Update last activity timestamp cli@37: this.lastActivity = System.currentTimeMillis(); cli@37: if (debugLine != null) { cli@37: Log.get().fine(">> " + debugLine); cli@37: } cli@37: } cli@37: cli@37: public void println(final CharSequence line) cli@49: throws IOException { cli@37: println(line, charset); cli@37: } cli@37: cli@37: public void print(final String line) cli@49: throws IOException { cli@37: writeToChannel(CharBuffer.wrap(line), charset, line); cli@37: } cli@37: cli@49: public void setCurrentCharset(final Charset charset) { cli@37: this.charset = charset; cli@37: } cli@37: cli@49: void setLastActivity(long timestamp) { cli@37: this.lastActivity = timestamp; cli@37: } franta-hg@101: franta-hg@101: /** franta-hg@101: * @return Current username. franta-hg@101: * But user may not have been authenticated yet. franta-hg@101: * You must check {@link #isUserAuthenticated()} franta-hg@101: */ franta-hg@101: public String getUsername() { franta-hg@101: return username; franta-hg@101: } franta-hg@101: franta-hg@101: /** franta-hg@101: * This method is to be called from AUTHINFO USER Command implementation. franta-hg@101: * @param username username from AUTHINFO USER username. franta-hg@101: */ franta-hg@101: public void setUsername(String username) { franta-hg@101: this.username = username; franta-hg@101: } franta-hg@101: franta-hg@101: /** franta-hg@101: * @return true if current user (see {@link #getUsername()}) has been succesfully authenticated. franta-hg@101: */ franta-hg@101: public boolean isUserAuthenticated() { franta-hg@101: return userAuthenticated; franta-hg@101: } franta-hg@101: franta-hg@101: /** franta-hg@101: * This method is to be called from AUTHINFO PASS Command implementation. franta-hg@101: * @param userAuthenticated true if user has provided right password in AUTHINFO PASS password. franta-hg@101: */ franta-hg@101: public void setUserAuthenticated(boolean userAuthenticated) { franta-hg@101: this.userAuthenticated = userAuthenticated; franta-hg@101: } chris@1: }