src/org/sonews/daemon/NNTPDaemon.java
author cli
Sun Sep 11 15:05:04 2011 +0200 (2011-09-11)
changeset 48 b78e77619152
parent 35 ed84c8bdd87b
permissions -rwxr-xr-x
Merge Channel and Group classes.
chris@1
     1
/*
chris@1
     2
 *   SONEWS News Server
chris@1
     3
 *   see AUTHORS for the list of contributors
chris@1
     4
 *
chris@1
     5
 *   This program is free software: you can redistribute it and/or modify
chris@1
     6
 *   it under the terms of the GNU General Public License as published by
chris@1
     7
 *   the Free Software Foundation, either version 3 of the License, or
chris@1
     8
 *   (at your option) any later version.
chris@1
     9
 *
chris@1
    10
 *   This program is distributed in the hope that it will be useful,
chris@1
    11
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1
    12
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1
    13
 *   GNU General Public License for more details.
chris@1
    14
 *
chris@1
    15
 *   You should have received a copy of the GNU General Public License
chris@1
    16
 *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1
    17
 */
chris@1
    18
chris@1
    19
package org.sonews.daemon;
chris@1
    20
chris@3
    21
import org.sonews.config.Config;
chris@3
    22
import org.sonews.Main;
chris@1
    23
import org.sonews.util.Log;
chris@1
    24
import java.io.IOException;
chris@1
    25
import java.net.BindException;
chris@1
    26
import java.net.InetSocketAddress;
chris@1
    27
import java.net.ServerSocket;
chris@1
    28
import java.nio.channels.CancelledKeyException;
chris@1
    29
import java.nio.channels.ClosedChannelException;
chris@1
    30
import java.nio.channels.SelectionKey;
chris@1
    31
import java.nio.channels.Selector;
chris@1
    32
import java.nio.channels.ServerSocketChannel;
chris@1
    33
import java.nio.channels.SocketChannel;
chris@1
    34
chris@1
    35
/**
chris@1
    36
 * NNTP daemon using SelectableChannels.
chris@1
    37
 * @author Christian Lins
chris@1
    38
 * @since sonews/0.5.0
chris@1
    39
 */
chris@1
    40
public final class NNTPDaemon extends AbstractDaemon
chris@1
    41
{
chris@1
    42
cli@37
    43
	public static final Object RegisterGate = new Object();
cli@37
    44
	private static NNTPDaemon instance = null;
chris@1
    45
cli@37
    46
	public static synchronized NNTPDaemon createInstance(int port)
cli@37
    47
	{
cli@37
    48
		if (instance == null) {
cli@37
    49
			instance = new NNTPDaemon(port);
cli@37
    50
			return instance;
cli@37
    51
		} else {
cli@37
    52
			throw new RuntimeException("NNTPDaemon.createInstance() called twice");
cli@37
    53
		}
cli@37
    54
	}
cli@37
    55
	private int port;
chris@1
    56
cli@37
    57
	private NNTPDaemon(final int port)
cli@37
    58
	{
cli@37
    59
		Log.get().info("Server listening on port " + port);
cli@37
    60
		this.port = port;
cli@37
    61
	}
cli@37
    62
cli@37
    63
	@Override
cli@37
    64
	public void run()
cli@37
    65
	{
cli@37
    66
		try {
cli@37
    67
			// Create a Selector that handles the SocketChannel multiplexing
cli@37
    68
			final Selector readSelector = Selector.open();
cli@37
    69
			final Selector writeSelector = Selector.open();
cli@37
    70
cli@37
    71
			// Start working threads
cli@37
    72
			final int workerThreads = Runtime.getRuntime().availableProcessors() * 4;
cli@37
    73
			ConnectionWorker[] cworkers = new ConnectionWorker[workerThreads];
cli@37
    74
			for (int n = 0; n < workerThreads; n++) {
cli@37
    75
				cworkers[n] = new ConnectionWorker();
cli@37
    76
				cworkers[n].start();
cli@37
    77
			}
cli@37
    78
cli@37
    79
			ChannelWriter.getInstance().setSelector(writeSelector);
cli@37
    80
			ChannelReader.getInstance().setSelector(readSelector);
cli@37
    81
			ChannelWriter.getInstance().start();
cli@37
    82
			ChannelReader.getInstance().start();
cli@37
    83
cli@37
    84
			final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
cli@37
    85
			serverSocketChannel.configureBlocking(true);  // Set to blocking mode
cli@37
    86
cli@37
    87
			// Configure ServerSocket; bind to socket...
cli@37
    88
			final ServerSocket serverSocket = serverSocketChannel.socket();
cli@37
    89
			serverSocket.bind(new InetSocketAddress(this.port));
cli@37
    90
cli@37
    91
			while (isRunning()) {
cli@37
    92
				SocketChannel socketChannel;
cli@37
    93
cli@37
    94
				try {
cli@37
    95
					// As we set the server socket channel to blocking mode the accept()
cli@37
    96
					// method will block.
cli@37
    97
					socketChannel = serverSocketChannel.accept();
cli@37
    98
					socketChannel.configureBlocking(false);
cli@37
    99
					assert socketChannel.isConnected();
cli@37
   100
					assert socketChannel.finishConnect();
cli@37
   101
				} catch (IOException ex) {
cli@37
   102
					// Under heavy load an IOException "Too many open files may
cli@37
   103
					// be thrown. It most cases we should slow down the connection
cli@37
   104
					// accepting, to give the worker threads some time to process work.
cli@37
   105
					Log.get().severe("IOException while accepting connection: " + ex.getMessage());
cli@37
   106
					Log.get().info("Connection accepting sleeping for seconds...");
cli@37
   107
					Thread.sleep(5000); // 5 seconds
cli@37
   108
					continue;
cli@37
   109
				}
cli@37
   110
cli@37
   111
				final NNTPConnection conn;
cli@37
   112
				try {
cli@37
   113
					conn = new NNTPConnection(socketChannel);
cli@37
   114
					Connections.getInstance().add(conn);
cli@37
   115
				} catch (IOException ex) {
cli@37
   116
					Log.get().warning(ex.toString());
cli@37
   117
					socketChannel.close();
cli@37
   118
					continue;
cli@37
   119
				}
cli@37
   120
cli@37
   121
				try {
cli@37
   122
					SelectionKey selKeyWrite =
cli@37
   123
						registerSelector(writeSelector, socketChannel, SelectionKey.OP_WRITE);
cli@37
   124
					registerSelector(readSelector, socketChannel, SelectionKey.OP_READ);
cli@37
   125
cli@37
   126
					Log.get().info("Connected: " + socketChannel.socket().getRemoteSocketAddress());
cli@37
   127
cli@37
   128
					// Set write selection key and send hello to client
cli@37
   129
					conn.setWriteSelectionKey(selKeyWrite);
cli@37
   130
					conn.println("200 " + Config.inst().get(Config.HOSTNAME, "localhost")
cli@37
   131
						+ " " + Main.VERSION + " news server ready - (posting ok).");
cli@37
   132
				} catch (CancelledKeyException cke) {
cli@37
   133
					Log.get().warning("CancelledKeyException " + cke.getMessage() + " was thrown: "
cli@37
   134
						+ socketChannel.socket());
cli@37
   135
				} catch (ClosedChannelException cce) {
cli@37
   136
					Log.get().warning("ClosedChannelException " + cce.getMessage() + " was thrown: "
cli@37
   137
						+ socketChannel.socket());
cli@37
   138
				}
cli@37
   139
			}
cli@37
   140
		} catch (BindException ex) {
cli@37
   141
			// Could not bind to socket; this is a fatal problem; so perform shutdown
cli@37
   142
			ex.printStackTrace();
cli@37
   143
			System.exit(1);
cli@37
   144
		} catch (IOException ex) {
cli@37
   145
			ex.printStackTrace();
cli@37
   146
		} catch (Exception ex) {
cli@37
   147
			ex.printStackTrace();
cli@37
   148
		}
cli@37
   149
	}
cli@37
   150
cli@37
   151
	public static SelectionKey registerSelector(final Selector selector,
cli@37
   152
		final SocketChannel channel, final int op)
cli@37
   153
		throws CancelledKeyException, ClosedChannelException
cli@37
   154
	{
cli@37
   155
		// Register the selector at the channel, so that it will be notified
cli@37
   156
		// on the socket's events
cli@37
   157
		synchronized (RegisterGate) {
cli@37
   158
			// Wakeup the currently blocking reader/writer thread; we have locked
cli@37
   159
			// the RegisterGate to prevent the awakened thread to block again
cli@37
   160
			selector.wakeup();
cli@37
   161
cli@37
   162
			// Lock the selector to prevent the waiting worker threads going into
cli@37
   163
			// selector.select() which would block the selector.
cli@37
   164
			synchronized (selector) {
cli@37
   165
				return channel.register(selector, op, null);
cli@37
   166
			}
cli@37
   167
		}
cli@37
   168
	}
chris@1
   169
}