3 * see AUTHORS for the list of contributors
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 package org.sonews.daemon;
21 import org.sonews.config.Config;
22 import org.sonews.Main;
23 import org.sonews.util.Log;
24 import java.io.IOException;
25 import java.net.BindException;
26 import java.net.InetSocketAddress;
27 import java.net.ServerSocket;
28 import java.nio.channels.CancelledKeyException;
29 import java.nio.channels.ClosedChannelException;
30 import java.nio.channels.SelectionKey;
31 import java.nio.channels.Selector;
32 import java.nio.channels.ServerSocketChannel;
33 import java.nio.channels.SocketChannel;
36 * NNTP daemon using SelectableChannels.
37 * @author Christian Lins
40 public final class NNTPDaemon extends AbstractDaemon
43 public static final Object RegisterGate = new Object();
44 private static NNTPDaemon instance = null;
46 public static synchronized NNTPDaemon createInstance(int port)
48 if (instance == null) {
49 instance = new NNTPDaemon(port);
52 throw new RuntimeException("NNTPDaemon.createInstance() called twice");
57 private NNTPDaemon(final int port)
59 Log.get().info("Server listening on port " + port);
67 // Create a Selector that handles the SocketChannel multiplexing
68 final Selector readSelector = Selector.open();
69 final Selector writeSelector = Selector.open();
71 // Start working threads
72 final int workerThreads = Runtime.getRuntime().availableProcessors() * 4;
73 ConnectionWorker[] cworkers = new ConnectionWorker[workerThreads];
74 for (int n = 0; n < workerThreads; n++) {
75 cworkers[n] = new ConnectionWorker();
79 ChannelWriter.getInstance().setSelector(writeSelector);
80 ChannelReader.getInstance().setSelector(readSelector);
81 ChannelWriter.getInstance().start();
82 ChannelReader.getInstance().start();
84 final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
85 serverSocketChannel.configureBlocking(true); // Set to blocking mode
87 // Configure ServerSocket; bind to socket...
88 final ServerSocket serverSocket = serverSocketChannel.socket();
89 serverSocket.bind(new InetSocketAddress(this.port));
92 SocketChannel socketChannel;
95 // As we set the server socket channel to blocking mode the accept()
97 socketChannel = serverSocketChannel.accept();
98 socketChannel.configureBlocking(false);
99 assert socketChannel.isConnected();
100 assert socketChannel.finishConnect();
101 } catch (IOException ex) {
102 // Under heavy load an IOException "Too many open files may
103 // be thrown. It most cases we should slow down the connection
104 // accepting, to give the worker threads some time to process work.
105 Log.get().severe("IOException while accepting connection: " + ex.getMessage());
106 Log.get().info("Connection accepting sleeping for seconds...");
107 Thread.sleep(5000); // 5 seconds
111 final NNTPConnection conn;
113 conn = new NNTPConnection(socketChannel);
114 Connections.getInstance().add(conn);
115 } catch (IOException ex) {
116 Log.get().warning(ex.toString());
117 socketChannel.close();
122 SelectionKey selKeyWrite =
123 registerSelector(writeSelector, socketChannel, SelectionKey.OP_WRITE);
124 registerSelector(readSelector, socketChannel, SelectionKey.OP_READ);
126 Log.get().info("Connected: " + socketChannel.socket().getRemoteSocketAddress());
128 // Set write selection key and send hello to client
129 conn.setWriteSelectionKey(selKeyWrite);
130 conn.println("200 " + Config.inst().get(Config.HOSTNAME, "localhost")
131 + " " + Main.VERSION + " news server ready - (posting ok).");
132 } catch (CancelledKeyException cke) {
133 Log.get().warning("CancelledKeyException " + cke.getMessage() + " was thrown: "
134 + socketChannel.socket());
135 } catch (ClosedChannelException cce) {
136 Log.get().warning("ClosedChannelException " + cce.getMessage() + " was thrown: "
137 + socketChannel.socket());
140 } catch (BindException ex) {
141 // Could not bind to socket; this is a fatal problem; so perform shutdown
142 ex.printStackTrace();
144 } catch (IOException ex) {
145 ex.printStackTrace();
146 } catch (Exception ex) {
147 ex.printStackTrace();
151 public static SelectionKey registerSelector(final Selector selector,
152 final SocketChannel channel, final int op)
153 throws CancelledKeyException, ClosedChannelException
155 // Register the selector at the channel, so that it will be notified
156 // on the socket's events
157 synchronized (RegisterGate) {
158 // Wakeup the currently blocking reader/writer thread; we have locked
159 // the RegisterGate to prevent the awakened thread to block again
162 // Lock the selector to prevent the waiting worker threads going into
163 // selector.select() which would block the selector.
164 synchronized (selector) {
165 return channel.register(selector, op, null);