chris@1: /* chris@1: * SONEWS News Server chris@1: * see AUTHORS for the list of contributors chris@1: * chris@1: * This program is free software: you can redistribute it and/or modify chris@1: * it under the terms of the GNU General Public License as published by chris@1: * the Free Software Foundation, either version 3 of the License, or chris@1: * (at your option) any later version. chris@1: * chris@1: * This program is distributed in the hope that it will be useful, chris@1: * but WITHOUT ANY WARRANTY; without even the implied warranty of chris@1: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the chris@1: * GNU General Public License for more details. chris@1: * chris@1: * You should have received a copy of the GNU General Public License chris@1: * along with this program. If not, see . chris@1: */ chris@1: chris@1: package org.sonews.daemon; chris@1: chris@3: import org.sonews.config.Config; chris@3: import org.sonews.Main; chris@1: import org.sonews.util.Log; chris@1: import java.io.IOException; chris@1: import java.net.BindException; chris@1: import java.net.InetSocketAddress; chris@1: import java.net.ServerSocket; chris@1: import java.nio.channels.CancelledKeyException; chris@1: import java.nio.channels.ClosedChannelException; chris@1: import java.nio.channels.SelectionKey; chris@1: import java.nio.channels.Selector; chris@1: import java.nio.channels.ServerSocketChannel; chris@1: import java.nio.channels.SocketChannel; chris@1: chris@1: /** chris@1: * NNTP daemon using SelectableChannels. chris@1: * @author Christian Lins chris@1: * @since sonews/0.5.0 chris@1: */ chris@1: public final class NNTPDaemon extends AbstractDaemon chris@1: { chris@1: chris@1: public static final Object RegisterGate = new Object(); chris@1: chris@1: private static NNTPDaemon instance = null; chris@1: chris@1: public static synchronized NNTPDaemon createInstance(int port) chris@1: { chris@1: if(instance == null) chris@1: { chris@1: instance = new NNTPDaemon(port); chris@1: return instance; chris@1: } chris@1: else chris@1: { chris@1: throw new RuntimeException("NNTPDaemon.createInstance() called twice"); chris@1: } chris@1: } chris@1: chris@1: private int port; chris@1: chris@1: private NNTPDaemon(final int port) chris@1: { cli@15: Log.get().info("Server listening on port " + port); chris@1: this.port = port; chris@1: } chris@1: chris@1: @Override chris@1: public void run() chris@1: { chris@1: try chris@1: { chris@1: // Create a Selector that handles the SocketChannel multiplexing chris@1: final Selector readSelector = Selector.open(); chris@1: final Selector writeSelector = Selector.open(); chris@1: chris@1: // Start working threads chris@1: final int workerThreads = Runtime.getRuntime().availableProcessors() * 4; chris@1: ConnectionWorker[] cworkers = new ConnectionWorker[workerThreads]; chris@1: for(int n = 0; n < workerThreads; n++) chris@1: { chris@1: cworkers[n] = new ConnectionWorker(); chris@1: cworkers[n].start(); chris@1: } chris@1: chris@1: ChannelWriter.getInstance().setSelector(writeSelector); chris@1: ChannelReader.getInstance().setSelector(readSelector); chris@1: ChannelWriter.getInstance().start(); chris@1: ChannelReader.getInstance().start(); chris@1: chris@1: final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); chris@1: serverSocketChannel.configureBlocking(true); // Set to blocking mode chris@1: chris@1: // Configure ServerSocket; bind to socket... chris@1: final ServerSocket serverSocket = serverSocketChannel.socket(); chris@1: serverSocket.bind(new InetSocketAddress(this.port)); chris@1: chris@1: while(isRunning()) chris@1: { chris@1: SocketChannel socketChannel; chris@1: chris@1: try chris@1: { chris@1: // As we set the server socket channel to blocking mode the accept() chris@1: // method will block. chris@1: socketChannel = serverSocketChannel.accept(); chris@1: socketChannel.configureBlocking(false); chris@1: assert socketChannel.isConnected(); chris@1: assert socketChannel.finishConnect(); chris@1: } chris@1: catch(IOException ex) chris@1: { chris@1: // Under heavy load an IOException "Too many open files may chris@1: // be thrown. It most cases we should slow down the connection chris@1: // accepting, to give the worker threads some time to process work. cli@15: Log.get().severe("IOException while accepting connection: " + ex.getMessage()); cli@15: Log.get().info("Connection accepting sleeping for seconds..."); chris@1: Thread.sleep(5000); // 5 seconds chris@1: continue; chris@1: } chris@1: chris@1: final NNTPConnection conn; chris@1: try chris@1: { chris@1: conn = new NNTPConnection(socketChannel); chris@1: Connections.getInstance().add(conn); chris@1: } chris@1: catch(IOException ex) chris@1: { cli@15: Log.get().warning(ex.toString()); chris@1: socketChannel.close(); chris@1: continue; chris@1: } chris@1: chris@1: try chris@1: { chris@1: SelectionKey selKeyWrite = chris@1: registerSelector(writeSelector, socketChannel, SelectionKey.OP_WRITE); chris@1: registerSelector(readSelector, socketChannel, SelectionKey.OP_READ); chris@1: cli@15: Log.get().info("Connected: " + socketChannel.socket().getRemoteSocketAddress()); chris@1: chris@1: // Set write selection key and send hello to client chris@1: conn.setWriteSelectionKey(selKeyWrite); chris@3: conn.println("200 " + Config.inst().get(Config.HOSTNAME, "localhost") chris@1: + " " + Main.VERSION + " news server ready - (posting ok)."); chris@1: } chris@1: catch(CancelledKeyException cke) chris@1: { cli@15: Log.get().warning("CancelledKeyException " + cke.getMessage() + " was thrown: " cli@15: + socketChannel.socket()); chris@1: } chris@1: catch(ClosedChannelException cce) chris@1: { cli@15: Log.get().warning("ClosedChannelException " + cce.getMessage() + " was thrown: " cli@15: + socketChannel.socket()); chris@1: } chris@1: } chris@1: } chris@1: catch(BindException ex) chris@1: { chris@1: // Could not bind to socket; this is a fatal problem; so perform shutdown chris@1: ex.printStackTrace(); chris@1: System.exit(1); chris@1: } chris@1: catch(IOException ex) chris@1: { chris@1: ex.printStackTrace(); chris@1: } chris@1: catch(Exception ex) chris@1: { chris@1: ex.printStackTrace(); chris@1: } chris@1: } chris@1: chris@1: public static SelectionKey registerSelector(final Selector selector, chris@1: final SocketChannel channel, final int op) chris@1: throws CancelledKeyException, ClosedChannelException chris@1: { chris@1: // Register the selector at the channel, so that it will be notified chris@1: // on the socket's events chris@1: synchronized(RegisterGate) chris@1: { chris@1: // Wakeup the currently blocking reader/writer thread; we have locked chris@1: // the RegisterGate to prevent the awakened thread to block again chris@1: selector.wakeup(); chris@1: chris@1: // Lock the selector to prevent the waiting worker threads going into chris@1: // selector.select() which would block the selector. chris@1: synchronized (selector) chris@1: { chris@1: return channel.register(selector, op, null); chris@1: } chris@1: } chris@1: } chris@1: chris@1: }