chris@1: /* chris@1: * SONEWS News Server chris@1: * see AUTHORS for the list of contributors chris@1: * chris@1: * This program is free software: you can redistribute it and/or modify chris@1: * it under the terms of the GNU General Public License as published by chris@1: * the Free Software Foundation, either version 3 of the License, or chris@1: * (at your option) any later version. chris@1: * chris@1: * This program is distributed in the hope that it will be useful, chris@1: * but WITHOUT ANY WARRANTY; without even the implied warranty of chris@1: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the chris@1: * GNU General Public License for more details. chris@1: * chris@1: * You should have received a copy of the GNU General Public License chris@1: * along with this program. If not, see . chris@1: */ chris@1: chris@1: package org.sonews.daemon; chris@1: chris@3: import org.sonews.config.Config; chris@3: import org.sonews.Main; chris@1: import org.sonews.util.Log; chris@1: import java.io.IOException; chris@1: import java.net.BindException; chris@1: import java.net.InetSocketAddress; chris@1: import java.net.ServerSocket; chris@1: import java.nio.channels.CancelledKeyException; chris@1: import java.nio.channels.ClosedChannelException; chris@1: import java.nio.channels.SelectionKey; chris@1: import java.nio.channels.Selector; chris@1: import java.nio.channels.ServerSocketChannel; chris@1: import java.nio.channels.SocketChannel; chris@1: chris@1: /** chris@1: * NNTP daemon using SelectableChannels. chris@1: * @author Christian Lins chris@1: * @since sonews/0.5.0 chris@1: */ chris@1: public final class NNTPDaemon extends AbstractDaemon chris@1: { chris@1: cli@37: public static final Object RegisterGate = new Object(); cli@37: private static NNTPDaemon instance = null; chris@1: cli@37: public static synchronized NNTPDaemon createInstance(int port) cli@37: { cli@37: if (instance == null) { cli@37: instance = new NNTPDaemon(port); cli@37: return instance; cli@37: } else { cli@37: throw new RuntimeException("NNTPDaemon.createInstance() called twice"); cli@37: } cli@37: } cli@37: private int port; chris@1: cli@37: private NNTPDaemon(final int port) cli@37: { cli@37: Log.get().info("Server listening on port " + port); cli@37: this.port = port; cli@37: } cli@37: cli@37: @Override cli@37: public void run() cli@37: { cli@37: try { cli@37: // Create a Selector that handles the SocketChannel multiplexing cli@37: final Selector readSelector = Selector.open(); cli@37: final Selector writeSelector = Selector.open(); cli@37: cli@37: // Start working threads cli@37: final int workerThreads = Runtime.getRuntime().availableProcessors() * 4; cli@37: ConnectionWorker[] cworkers = new ConnectionWorker[workerThreads]; cli@37: for (int n = 0; n < workerThreads; n++) { cli@37: cworkers[n] = new ConnectionWorker(); cli@37: cworkers[n].start(); cli@37: } cli@37: cli@37: ChannelWriter.getInstance().setSelector(writeSelector); cli@37: ChannelReader.getInstance().setSelector(readSelector); cli@37: ChannelWriter.getInstance().start(); cli@37: ChannelReader.getInstance().start(); cli@37: cli@37: final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); cli@37: serverSocketChannel.configureBlocking(true); // Set to blocking mode cli@37: cli@37: // Configure ServerSocket; bind to socket... cli@37: final ServerSocket serverSocket = serverSocketChannel.socket(); cli@37: serverSocket.bind(new InetSocketAddress(this.port)); cli@37: cli@37: while (isRunning()) { cli@37: SocketChannel socketChannel; cli@37: cli@37: try { cli@37: // As we set the server socket channel to blocking mode the accept() cli@37: // method will block. cli@37: socketChannel = serverSocketChannel.accept(); cli@37: socketChannel.configureBlocking(false); cli@37: assert socketChannel.isConnected(); cli@37: assert socketChannel.finishConnect(); cli@37: } catch (IOException ex) { cli@37: // Under heavy load an IOException "Too many open files may cli@37: // be thrown. It most cases we should slow down the connection cli@37: // accepting, to give the worker threads some time to process work. cli@37: Log.get().severe("IOException while accepting connection: " + ex.getMessage()); cli@37: Log.get().info("Connection accepting sleeping for seconds..."); cli@37: Thread.sleep(5000); // 5 seconds cli@37: continue; cli@37: } cli@37: cli@37: final NNTPConnection conn; cli@37: try { cli@37: conn = new NNTPConnection(socketChannel); cli@37: Connections.getInstance().add(conn); cli@37: } catch (IOException ex) { cli@37: Log.get().warning(ex.toString()); cli@37: socketChannel.close(); cli@37: continue; cli@37: } cli@37: cli@37: try { cli@37: SelectionKey selKeyWrite = cli@37: registerSelector(writeSelector, socketChannel, SelectionKey.OP_WRITE); cli@37: registerSelector(readSelector, socketChannel, SelectionKey.OP_READ); cli@37: cli@37: Log.get().info("Connected: " + socketChannel.socket().getRemoteSocketAddress()); cli@37: cli@37: // Set write selection key and send hello to client cli@37: conn.setWriteSelectionKey(selKeyWrite); cli@37: conn.println("200 " + Config.inst().get(Config.HOSTNAME, "localhost") cli@37: + " " + Main.VERSION + " news server ready - (posting ok)."); cli@37: } catch (CancelledKeyException cke) { cli@37: Log.get().warning("CancelledKeyException " + cke.getMessage() + " was thrown: " cli@37: + socketChannel.socket()); cli@37: } catch (ClosedChannelException cce) { cli@37: Log.get().warning("ClosedChannelException " + cce.getMessage() + " was thrown: " cli@37: + socketChannel.socket()); cli@37: } cli@37: } cli@37: } catch (BindException ex) { cli@37: // Could not bind to socket; this is a fatal problem; so perform shutdown cli@37: ex.printStackTrace(); cli@37: System.exit(1); cli@37: } catch (IOException ex) { cli@37: ex.printStackTrace(); cli@37: } catch (Exception ex) { cli@37: ex.printStackTrace(); cli@37: } cli@37: } cli@37: cli@37: public static SelectionKey registerSelector(final Selector selector, cli@37: final SocketChannel channel, final int op) cli@37: throws CancelledKeyException, ClosedChannelException cli@37: { cli@37: // Register the selector at the channel, so that it will be notified cli@37: // on the socket's events cli@37: synchronized (RegisterGate) { cli@37: // Wakeup the currently blocking reader/writer thread; we have locked cli@37: // the RegisterGate to prevent the awakened thread to block again cli@37: selector.wakeup(); cli@37: cli@37: // Lock the selector to prevent the waiting worker threads going into cli@37: // selector.select() which would block the selector. cli@37: synchronized (selector) { cli@37: return channel.register(selector, op, null); cli@37: } cli@37: } cli@37: } chris@1: }