PullFeeder sends an addition "MODE READER" to peers.
3 * see AUTHORS for the list of contributors
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 package org.sonews.daemon;
21 import org.sonews.config.Config;
22 import org.sonews.Main;
23 import org.sonews.util.Log;
24 import java.io.IOException;
25 import java.net.BindException;
26 import java.net.InetSocketAddress;
27 import java.net.ServerSocket;
28 import java.nio.channels.CancelledKeyException;
29 import java.nio.channels.ClosedChannelException;
30 import java.nio.channels.SelectionKey;
31 import java.nio.channels.Selector;
32 import java.nio.channels.ServerSocketChannel;
33 import java.nio.channels.SocketChannel;
36 * NNTP daemon using SelectableChannels.
37 * @author Christian Lins
40 public final class NNTPDaemon extends AbstractDaemon
43 public static final Object RegisterGate = new Object();
45 private static NNTPDaemon instance = null;
47 public static synchronized NNTPDaemon createInstance(int port)
51 instance = new NNTPDaemon(port);
56 throw new RuntimeException("NNTPDaemon.createInstance() called twice");
62 private NNTPDaemon(final int port)
64 Log.msg("Server listening on port " + port, false);
73 // Create a Selector that handles the SocketChannel multiplexing
74 final Selector readSelector = Selector.open();
75 final Selector writeSelector = Selector.open();
77 // Start working threads
78 final int workerThreads = Runtime.getRuntime().availableProcessors() * 4;
79 ConnectionWorker[] cworkers = new ConnectionWorker[workerThreads];
80 for(int n = 0; n < workerThreads; n++)
82 cworkers[n] = new ConnectionWorker();
86 ChannelWriter.getInstance().setSelector(writeSelector);
87 ChannelReader.getInstance().setSelector(readSelector);
88 ChannelWriter.getInstance().start();
89 ChannelReader.getInstance().start();
91 final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
92 serverSocketChannel.configureBlocking(true); // Set to blocking mode
94 // Configure ServerSocket; bind to socket...
95 final ServerSocket serverSocket = serverSocketChannel.socket();
96 serverSocket.bind(new InetSocketAddress(this.port));
100 SocketChannel socketChannel;
104 // As we set the server socket channel to blocking mode the accept()
105 // method will block.
106 socketChannel = serverSocketChannel.accept();
107 socketChannel.configureBlocking(false);
108 assert socketChannel.isConnected();
109 assert socketChannel.finishConnect();
111 catch(IOException ex)
113 // Under heavy load an IOException "Too many open files may
114 // be thrown. It most cases we should slow down the connection
115 // accepting, to give the worker threads some time to process work.
116 Log.msg("IOException while accepting connection: " + ex.getMessage(), false);
117 Log.msg("Connection accepting sleeping for seconds...", true);
118 Thread.sleep(5000); // 5 seconds
122 final NNTPConnection conn;
125 conn = new NNTPConnection(socketChannel);
126 Connections.getInstance().add(conn);
128 catch(IOException ex)
130 Log.msg(ex.getLocalizedMessage(), false);
131 socketChannel.close();
137 SelectionKey selKeyWrite =
138 registerSelector(writeSelector, socketChannel, SelectionKey.OP_WRITE);
139 registerSelector(readSelector, socketChannel, SelectionKey.OP_READ);
141 Log.msg("Connected: " + socketChannel.socket().getRemoteSocketAddress(), true);
143 // Set write selection key and send hello to client
144 conn.setWriteSelectionKey(selKeyWrite);
145 conn.println("200 " + Config.inst().get(Config.HOSTNAME, "localhost")
146 + " " + Main.VERSION + " news server ready - (posting ok).");
148 catch(CancelledKeyException cke)
150 Log.msg("CancelledKeyException " + cke.getMessage() + " was thrown: "
151 + socketChannel.socket(), false);
153 catch(ClosedChannelException cce)
155 Log.msg("ClosedChannelException " + cce.getMessage() + " was thrown: "
156 + socketChannel.socket(), false);
160 catch(BindException ex)
162 // Could not bind to socket; this is a fatal problem; so perform shutdown
163 ex.printStackTrace();
166 catch(IOException ex)
168 ex.printStackTrace();
172 ex.printStackTrace();
176 public static SelectionKey registerSelector(final Selector selector,
177 final SocketChannel channel, final int op)
178 throws CancelledKeyException, ClosedChannelException
180 // Register the selector at the channel, so that it will be notified
181 // on the socket's events
182 synchronized(RegisterGate)
184 // Wakeup the currently blocking reader/writer thread; we have locked
185 // the RegisterGate to prevent the awakened thread to block again
188 // Lock the selector to prevent the waiting worker threads going into
189 // selector.select() which would block the selector.
190 synchronized (selector)
192 return channel.register(selector, op, null);