org/sonews/daemon/NNTPDaemon.java
author cli
Sat May 01 14:27:30 2010 +0200 (2010-05-01)
changeset 28 15d14b110240
parent 3 2fdc9cc89502
permissions -rw-r--r--
Make mailinglist gateway more reliable, probably.
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.daemon;
    20 
    21 import org.sonews.config.Config;
    22 import org.sonews.Main;
    23 import org.sonews.util.Log;
    24 import java.io.IOException;
    25 import java.net.BindException;
    26 import java.net.InetSocketAddress;
    27 import java.net.ServerSocket;
    28 import java.nio.channels.CancelledKeyException;
    29 import java.nio.channels.ClosedChannelException;
    30 import java.nio.channels.SelectionKey;
    31 import java.nio.channels.Selector;
    32 import java.nio.channels.ServerSocketChannel;
    33 import java.nio.channels.SocketChannel;
    34 
    35 /**
    36  * NNTP daemon using SelectableChannels.
    37  * @author Christian Lins
    38  * @since sonews/0.5.0
    39  */
    40 public final class NNTPDaemon extends AbstractDaemon
    41 {
    42 
    43   public static final Object RegisterGate = new Object();
    44   
    45   private static NNTPDaemon instance = null;
    46   
    47   public static synchronized NNTPDaemon createInstance(int port)
    48   {
    49     if(instance == null)
    50     {
    51       instance = new NNTPDaemon(port);
    52       return instance;
    53     }
    54     else
    55     {
    56       throw new RuntimeException("NNTPDaemon.createInstance() called twice");
    57     }
    58   }
    59   
    60   private int port;
    61   
    62   private NNTPDaemon(final int port)
    63   {
    64     Log.get().info("Server listening on port " + port);
    65     this.port = port;
    66   }
    67 
    68   @Override
    69   public void run()
    70   {
    71     try
    72     {
    73       // Create a Selector that handles the SocketChannel multiplexing
    74       final Selector readSelector  = Selector.open();
    75       final Selector writeSelector = Selector.open();
    76       
    77       // Start working threads
    78       final int workerThreads = Runtime.getRuntime().availableProcessors() * 4;
    79       ConnectionWorker[] cworkers = new ConnectionWorker[workerThreads];
    80       for(int n = 0; n < workerThreads; n++)
    81       {
    82         cworkers[n] = new ConnectionWorker();
    83         cworkers[n].start();
    84       }
    85       
    86       ChannelWriter.getInstance().setSelector(writeSelector);
    87       ChannelReader.getInstance().setSelector(readSelector);
    88       ChannelWriter.getInstance().start();
    89       ChannelReader.getInstance().start();
    90       
    91       final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    92       serverSocketChannel.configureBlocking(true);  // Set to blocking mode
    93       
    94       // Configure ServerSocket; bind to socket...
    95       final ServerSocket serverSocket = serverSocketChannel.socket();
    96       serverSocket.bind(new InetSocketAddress(this.port));
    97       
    98       while(isRunning())
    99       {
   100         SocketChannel socketChannel;
   101         
   102         try
   103         {
   104           // As we set the server socket channel to blocking mode the accept()
   105           // method will block.
   106           socketChannel = serverSocketChannel.accept();
   107           socketChannel.configureBlocking(false);
   108           assert socketChannel.isConnected();
   109           assert socketChannel.finishConnect();
   110         }
   111         catch(IOException ex)
   112         {
   113           // Under heavy load an IOException "Too many open files may
   114           // be thrown. It most cases we should slow down the connection
   115           // accepting, to give the worker threads some time to process work.
   116           Log.get().severe("IOException while accepting connection: " + ex.getMessage());
   117           Log.get().info("Connection accepting sleeping for seconds...");
   118           Thread.sleep(5000); // 5 seconds
   119           continue;
   120         }
   121         
   122         final NNTPConnection conn;
   123         try
   124         {
   125           conn = new NNTPConnection(socketChannel);
   126           Connections.getInstance().add(conn);
   127         }
   128         catch(IOException ex)
   129         {
   130           Log.get().warning(ex.toString());
   131           socketChannel.close();
   132           continue;
   133         }
   134         
   135         try
   136         {
   137           SelectionKey selKeyWrite =
   138             registerSelector(writeSelector, socketChannel, SelectionKey.OP_WRITE);
   139           registerSelector(readSelector, socketChannel, SelectionKey.OP_READ);
   140           
   141           Log.get().info("Connected: " + socketChannel.socket().getRemoteSocketAddress());
   142 
   143           // Set write selection key and send hello to client
   144           conn.setWriteSelectionKey(selKeyWrite);
   145           conn.println("200 " + Config.inst().get(Config.HOSTNAME, "localhost")
   146               + " " + Main.VERSION + " news server ready - (posting ok).");
   147         }
   148         catch(CancelledKeyException cke)
   149         {
   150           Log.get().warning("CancelledKeyException " + cke.getMessage() + " was thrown: "
   151             + socketChannel.socket());
   152         }
   153         catch(ClosedChannelException cce)
   154         {
   155           Log.get().warning("ClosedChannelException " + cce.getMessage() + " was thrown: "
   156             + socketChannel.socket());
   157         }
   158       }
   159     }
   160     catch(BindException ex)
   161     {
   162       // Could not bind to socket; this is a fatal problem; so perform shutdown
   163       ex.printStackTrace();
   164       System.exit(1);
   165     }
   166     catch(IOException ex)
   167     {
   168       ex.printStackTrace();
   169     }
   170     catch(Exception ex)
   171     {
   172       ex.printStackTrace();
   173     }
   174   }
   175   
   176   public static SelectionKey registerSelector(final Selector selector,
   177     final SocketChannel channel, final int op)
   178     throws CancelledKeyException, ClosedChannelException
   179   {
   180     // Register the selector at the channel, so that it will be notified
   181     // on the socket's events
   182     synchronized(RegisterGate)
   183     {
   184       // Wakeup the currently blocking reader/writer thread; we have locked
   185       // the RegisterGate to prevent the awakened thread to block again
   186       selector.wakeup();
   187       
   188       // Lock the selector to prevent the waiting worker threads going into
   189       // selector.select() which would block the selector.
   190       synchronized (selector)
   191       {
   192         return channel.register(selector, op, null);
   193       }
   194     }
   195   }
   196   
   197 }