PullFeeder sends an addition "MODE READER" to peers.
3 * see AUTHORS for the list of contributors
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 package org.sonews.daemon;
21 import org.sonews.util.Log;
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketException;
25 import java.nio.ByteBuffer;
26 import java.nio.CharBuffer;
27 import java.nio.channels.ClosedChannelException;
28 import java.nio.channels.SelectionKey;
29 import java.nio.channels.SocketChannel;
30 import java.nio.charset.Charset;
31 import java.util.Arrays;
32 import java.util.Timer;
33 import java.util.TimerTask;
34 import org.sonews.daemon.command.Command;
35 import org.sonews.storage.Article;
36 import org.sonews.storage.Channel;
37 import org.sonews.util.Stats;
40 * For every SocketChannel (so TCP/IP connection) there is an instance of
42 * @author Christian Lins
45 public final class NNTPConnection
48 public static final String NEWLINE = "\r\n"; // RFC defines this as newline
49 public static final String MESSAGE_ID_PATTERN = "<[^>]+>";
51 private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon
53 /** SocketChannel is generally thread-safe */
54 private SocketChannel channel = null;
55 private Charset charset = Charset.forName("UTF-8");
56 private Command command = null;
57 private Article currentArticle = null;
58 private Channel currentGroup = null;
59 private volatile long lastActivity = System.currentTimeMillis();
60 private ChannelLineBuffers lineBuffers = new ChannelLineBuffers();
61 private int readLock = 0;
62 private final Object readLockGate = new Object();
63 private SelectionKey writeSelKey = null;
65 public NNTPConnection(final SocketChannel channel)
70 throw new IllegalArgumentException("channel is null");
73 this.channel = channel;
74 Stats.getInstance().clientConnect();
78 * Tries to get the read lock for this NNTPConnection. This method is Thread-
79 * safe and returns true of the read lock was successfully set. If the lock
80 * is still hold by another Thread the method returns false.
84 // As synchronizing simple types may cause deadlocks,
85 // we use a gate object.
86 synchronized(readLockGate)
94 readLock = Thread.currentThread().hashCode();
101 * Releases the read lock in a Thread-safe way.
102 * @throws IllegalMonitorStateException if a Thread not holding the lock
103 * tries to release it.
105 void unlockReadLock()
107 synchronized(readLockGate)
109 if(readLock == Thread.currentThread().hashCode())
115 throw new IllegalMonitorStateException();
121 * @return Current input buffer of this NNTPConnection instance.
123 public ByteBuffer getInputBuffer()
125 return this.lineBuffers.getInputBuffer();
129 * @return Output buffer of this NNTPConnection which has at least one byte
132 public ByteBuffer getOutputBuffer()
134 return this.lineBuffers.getOutputBuffer();
138 * @return ChannelLineBuffers instance associated with this NNTPConnection.
140 public ChannelLineBuffers getBuffers()
142 return this.lineBuffers;
146 * @return true if this connection comes from a local remote address.
148 public boolean isLocalConnection()
150 return ((InetSocketAddress)this.channel.socket().getRemoteSocketAddress())
151 .getHostName().equalsIgnoreCase("localhost");
154 void setWriteSelectionKey(SelectionKey selKey)
156 this.writeSelKey = selKey;
159 public void shutdownInput()
163 // Closes the input line of the channel's socket, so no new data
164 // will be received and a timeout can be triggered.
165 this.channel.socket().shutdownInput();
167 catch(IOException ex)
169 Log.msg("Exception in NNTPConnection.shutdownInput(): " + ex, false);
172 ex.printStackTrace();
177 public void shutdownOutput()
179 cancelTimer.schedule(new TimerTask()
186 // Closes the output line of the channel's socket.
187 channel.socket().shutdownOutput();
190 catch(SocketException ex)
192 // Socket was already disconnected
193 Log.msg("NNTPConnection.shutdownOutput(): " + ex, true);
197 Log.msg("NNTPConnection.shutdownOutput(): " + ex, false);
200 ex.printStackTrace();
207 public SocketChannel getSocketChannel()
212 public Article getCurrentArticle()
214 return this.currentArticle;
217 public Charset getCurrentCharset()
223 * @return The currently selected communication channel (not SocketChannel)
225 public Channel getCurrentChannel()
227 return this.currentGroup;
230 public void setCurrentArticle(final Article article)
232 this.currentArticle = article;
235 public void setCurrentGroup(final Channel group)
237 this.currentGroup = group;
240 public long getLastActivity()
242 return this.lastActivity;
246 * Due to the readLockGate there is no need to synchronize this method.
248 * @throws IllegalArgumentException if raw is null.
249 * @throws IllegalStateException if calling thread does not own the readLock.
251 void lineReceived(byte[] raw)
255 throw new IllegalArgumentException("raw is null");
258 if(readLock == 0 || readLock != Thread.currentThread().hashCode())
260 throw new IllegalStateException("readLock not properly set");
263 this.lastActivity = System.currentTimeMillis();
265 String line = new String(raw, this.charset);
267 // There might be a trailing \r, but trim() is a bad idea
268 // as it removes also leading spaces from long header lines.
269 if(line.endsWith("\r"))
271 line = line.substring(0, line.length() - 1);
272 raw = Arrays.copyOf(raw, raw.length - 1);
275 Log.msg("<< " + line, true);
279 command = parseCommandLine(line);
280 assert command != null;
285 // The command object will process the line we just received
286 command.processLine(this, line, raw);
288 catch(ClosedChannelException ex0)
292 Log.msg("Connection to " + channel.socket().getRemoteSocketAddress()
293 + " closed: " + ex0, true);
295 catch(Exception ex0a)
297 ex0a.printStackTrace();
305 ex1.printStackTrace();
306 println("500 Internal server error");
310 ex2.printStackTrace();
314 if(command == null || command.hasFinished())
317 charset = Charset.forName("UTF-8"); // Reset to default
322 * This method determines the fitting command processing class.
326 private Command parseCommandLine(String line)
328 String cmdStr = line.split(" ")[0];
329 return CommandSelector.getInstance().get(cmdStr);
333 * Puts the given line into the output buffer, adds a newline character
334 * and returns. The method returns immediately and does not block until
335 * the line was sent. If line is longer than 510 octets it is split up in
336 * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE).
339 public void println(final CharSequence line, final Charset charset)
342 writeToChannel(CharBuffer.wrap(line), charset, line);
343 writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
347 * Writes the given raw lines to the output buffers and finishes with
348 * a newline character (\r\n).
351 public void println(final byte[] rawLines)
354 this.lineBuffers.addOutputBuffer(ByteBuffer.wrap(rawLines));
355 writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
359 * Encodes the given CharBuffer using the given Charset to a bunch of
360 * ByteBuffers (each 512 bytes large) and enqueues them for writing at the
361 * connected SocketChannel.
362 * @throws java.io.IOException
364 private void writeToChannel(CharBuffer characters, final Charset charset,
365 CharSequence debugLine)
368 if(!charset.canEncode())
370 Log.msg("FATAL: Charset " + charset + " cannot encode!", false);
374 // Write characters to output buffers
375 LineEncoder lenc = new LineEncoder(characters, charset);
376 lenc.encode(lineBuffers);
378 enableWriteEvents(debugLine);
381 private void enableWriteEvents(CharSequence debugLine)
383 // Enable OP_WRITE events so that the buffers are processed
386 this.writeSelKey.interestOps(SelectionKey.OP_WRITE);
387 ChannelWriter.getInstance().getSelector().wakeup();
389 catch (Exception ex) // CancelledKeyException and ChannelCloseException
391 Log.msg("NNTPConnection.writeToChannel(): " + ex, false);
395 // Update last activity timestamp
396 this.lastActivity = System.currentTimeMillis();
397 if(debugLine != null)
399 Log.msg(">> " + debugLine, true);
403 public void println(final CharSequence line)
406 println(line, charset);
409 public void print(final String line)
412 writeToChannel(CharBuffer.wrap(line), charset, line);
415 public void setCurrentCharset(final Charset charset)
417 this.charset = charset;