src/org/sonews/daemon/NNTPDaemon.java
author cli
Mon Aug 30 00:20:06 2010 +0200 (2010-08-30)
changeset 39 73b21e9f3958
parent 35 ed84c8bdd87b
permissions -rw-r--r--
Some work on XDAEMON command.
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.daemon;
    20 
    21 import org.sonews.config.Config;
    22 import org.sonews.Main;
    23 import org.sonews.util.Log;
    24 import java.io.IOException;
    25 import java.net.BindException;
    26 import java.net.InetSocketAddress;
    27 import java.net.ServerSocket;
    28 import java.nio.channels.CancelledKeyException;
    29 import java.nio.channels.ClosedChannelException;
    30 import java.nio.channels.SelectionKey;
    31 import java.nio.channels.Selector;
    32 import java.nio.channels.ServerSocketChannel;
    33 import java.nio.channels.SocketChannel;
    34 
    35 /**
    36  * NNTP daemon using SelectableChannels.
    37  * @author Christian Lins
    38  * @since sonews/0.5.0
    39  */
    40 public final class NNTPDaemon extends AbstractDaemon
    41 {
    42 
    43 	public static final Object RegisterGate = new Object();
    44 	private static NNTPDaemon instance = null;
    45 
    46 	public static synchronized NNTPDaemon createInstance(int port)
    47 	{
    48 		if (instance == null) {
    49 			instance = new NNTPDaemon(port);
    50 			return instance;
    51 		} else {
    52 			throw new RuntimeException("NNTPDaemon.createInstance() called twice");
    53 		}
    54 	}
    55 	private int port;
    56 
    57 	private NNTPDaemon(final int port)
    58 	{
    59 		Log.get().info("Server listening on port " + port);
    60 		this.port = port;
    61 	}
    62 
    63 	@Override
    64 	public void run()
    65 	{
    66 		try {
    67 			// Create a Selector that handles the SocketChannel multiplexing
    68 			final Selector readSelector = Selector.open();
    69 			final Selector writeSelector = Selector.open();
    70 
    71 			// Start working threads
    72 			final int workerThreads = Runtime.getRuntime().availableProcessors() * 4;
    73 			ConnectionWorker[] cworkers = new ConnectionWorker[workerThreads];
    74 			for (int n = 0; n < workerThreads; n++) {
    75 				cworkers[n] = new ConnectionWorker();
    76 				cworkers[n].start();
    77 			}
    78 
    79 			ChannelWriter.getInstance().setSelector(writeSelector);
    80 			ChannelReader.getInstance().setSelector(readSelector);
    81 			ChannelWriter.getInstance().start();
    82 			ChannelReader.getInstance().start();
    83 
    84 			final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    85 			serverSocketChannel.configureBlocking(true);  // Set to blocking mode
    86 
    87 			// Configure ServerSocket; bind to socket...
    88 			final ServerSocket serverSocket = serverSocketChannel.socket();
    89 			serverSocket.bind(new InetSocketAddress(this.port));
    90 
    91 			while (isRunning()) {
    92 				SocketChannel socketChannel;
    93 
    94 				try {
    95 					// As we set the server socket channel to blocking mode the accept()
    96 					// method will block.
    97 					socketChannel = serverSocketChannel.accept();
    98 					socketChannel.configureBlocking(false);
    99 					assert socketChannel.isConnected();
   100 					assert socketChannel.finishConnect();
   101 				} catch (IOException ex) {
   102 					// Under heavy load an IOException "Too many open files may
   103 					// be thrown. It most cases we should slow down the connection
   104 					// accepting, to give the worker threads some time to process work.
   105 					Log.get().severe("IOException while accepting connection: " + ex.getMessage());
   106 					Log.get().info("Connection accepting sleeping for seconds...");
   107 					Thread.sleep(5000); // 5 seconds
   108 					continue;
   109 				}
   110 
   111 				final NNTPConnection conn;
   112 				try {
   113 					conn = new NNTPConnection(socketChannel);
   114 					Connections.getInstance().add(conn);
   115 				} catch (IOException ex) {
   116 					Log.get().warning(ex.toString());
   117 					socketChannel.close();
   118 					continue;
   119 				}
   120 
   121 				try {
   122 					SelectionKey selKeyWrite =
   123 						registerSelector(writeSelector, socketChannel, SelectionKey.OP_WRITE);
   124 					registerSelector(readSelector, socketChannel, SelectionKey.OP_READ);
   125 
   126 					Log.get().info("Connected: " + socketChannel.socket().getRemoteSocketAddress());
   127 
   128 					// Set write selection key and send hello to client
   129 					conn.setWriteSelectionKey(selKeyWrite);
   130 					conn.println("200 " + Config.inst().get(Config.HOSTNAME, "localhost")
   131 						+ " " + Main.VERSION + " news server ready - (posting ok).");
   132 				} catch (CancelledKeyException cke) {
   133 					Log.get().warning("CancelledKeyException " + cke.getMessage() + " was thrown: "
   134 						+ socketChannel.socket());
   135 				} catch (ClosedChannelException cce) {
   136 					Log.get().warning("ClosedChannelException " + cce.getMessage() + " was thrown: "
   137 						+ socketChannel.socket());
   138 				}
   139 			}
   140 		} catch (BindException ex) {
   141 			// Could not bind to socket; this is a fatal problem; so perform shutdown
   142 			ex.printStackTrace();
   143 			System.exit(1);
   144 		} catch (IOException ex) {
   145 			ex.printStackTrace();
   146 		} catch (Exception ex) {
   147 			ex.printStackTrace();
   148 		}
   149 	}
   150 
   151 	public static SelectionKey registerSelector(final Selector selector,
   152 		final SocketChannel channel, final int op)
   153 		throws CancelledKeyException, ClosedChannelException
   154 	{
   155 		// Register the selector at the channel, so that it will be notified
   156 		// on the socket's events
   157 		synchronized (RegisterGate) {
   158 			// Wakeup the currently blocking reader/writer thread; we have locked
   159 			// the RegisterGate to prevent the awakened thread to block again
   160 			selector.wakeup();
   161 
   162 			// Lock the selector to prevent the waiting worker threads going into
   163 			// selector.select() which would block the selector.
   164 			synchronized (selector) {
   165 				return channel.register(selector, op, null);
   166 			}
   167 		}
   168 	}
   169 }