chris@1: /*
chris@1:  *   SONEWS News Server
chris@1:  *   see AUTHORS for the list of contributors
chris@1:  *
chris@1:  *   This program is free software: you can redistribute it and/or modify
chris@1:  *   it under the terms of the GNU General Public License as published by
chris@1:  *   the Free Software Foundation, either version 3 of the License, or
chris@1:  *   (at your option) any later version.
chris@1:  *
chris@1:  *   This program is distributed in the hope that it will be useful,
chris@1:  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1:  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1:  *   GNU General Public License for more details.
chris@1:  *
chris@1:  *   You should have received a copy of the GNU General Public License
chris@1:  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1:  */
chris@1: 
chris@1: package org.sonews.daemon;
chris@1: 
chris@1: import java.io.IOException;
chris@1: import java.nio.ByteBuffer;
chris@1: import java.nio.channels.CancelledKeyException;
chris@1: import java.nio.channels.SelectionKey;
chris@1: import java.nio.channels.Selector;
chris@1: import java.nio.channels.SocketChannel;
chris@1: import java.util.Iterator;
chris@1: import java.util.Set;
cli@15: import java.util.logging.Level;
chris@3: import org.sonews.util.Log;
chris@1: 
chris@1: /**
chris@1:  * A Thread task listening for OP_READ events from SocketChannels.
chris@1:  * @author Christian Lins
chris@1:  * @since sonews/0.5.0
chris@1:  */
chris@1: class ChannelReader extends AbstractDaemon
chris@1: {
chris@1: 
chris@1:   private static ChannelReader instance = new ChannelReader();
chris@1: 
chris@1:   /**
chris@1:    * @return Active ChannelReader instance.
chris@1:    */
chris@1:   public static ChannelReader getInstance()
chris@1:   {
chris@1:     return instance;
chris@1:   }
chris@1:   
chris@1:   private Selector selector = null;
chris@1:   
chris@1:   protected ChannelReader()
chris@1:   {
chris@1:   }
chris@1:   
chris@1:   /**
chris@1:    * Sets the selector which is used by this reader to determine the channel
chris@1:    * to read from.
chris@1:    * @param selector
chris@1:    */
chris@1:   public void setSelector(final Selector selector)
chris@1:   {
chris@1:     this.selector = selector;
chris@1:   }
chris@1:   
chris@1:   /**
chris@1:    * Run loop. Blocks until some data is available in a channel.
chris@1:    */
chris@1:   @Override
chris@1:   public void run()
chris@1:   {
chris@1:     assert selector != null;
chris@1: 
chris@1:     while(isRunning())
chris@1:     {
chris@1:       try
chris@1:       {
chris@1:         // select() blocks until some SelectableChannels are ready for
chris@1:         // processing. There is no need to lock the selector as we have only
chris@1:         // one thread per selector.
chris@1:         selector.select();
chris@1: 
chris@1:         // Get list of selection keys with pending events.
chris@1:         // Note: the selected key set is not thread-safe
chris@1:         SocketChannel channel = null;
chris@1:         NNTPConnection conn = null;
chris@1:         final Set<SelectionKey> selKeys = selector.selectedKeys();
chris@1:         SelectionKey selKey = null;
chris@1: 
chris@1:         synchronized (selKeys)
chris@1:         {
chris@1:           Iterator it = selKeys.iterator();
chris@1: 
chris@1:           // Process the first pending event
chris@1:           while (it.hasNext())
chris@1:           {
chris@1:             selKey = (SelectionKey) it.next();
chris@1:             channel = (SocketChannel) selKey.channel();
chris@1:             conn = Connections.getInstance().get(channel);
chris@1: 
chris@1:             // Because we cannot lock the selKey as that would cause a deadlock
chris@1:             // we lock the connection. To preserve the order of the received
chris@1:             // byte blocks a selection key for a connection that has pending
chris@1:             // read events is skipped.
chris@1:             if (conn == null || conn.tryReadLock())
chris@1:             {
chris@1:               // Remove from set to indicate that it's being processed
chris@1:               it.remove();
chris@1:               if (conn != null)
chris@1:               {
chris@1:                 break; // End while loop
chris@1:               }
chris@1:             }
chris@1:             else
chris@1:             {
chris@1:               selKey = null;
chris@1:               channel = null;
chris@1:               conn = null;
chris@1:             }
chris@1:           }
chris@1:         }
chris@1: 
chris@1:         // Do not lock the selKeys while processing because this causes
chris@1:         // a deadlock in sun.nio.ch.SelectorImpl.lockAndDoSelect()
chris@1:         if (selKey != null && channel != null && conn != null)
chris@1:         {
chris@1:           processSelectionKey(conn, channel, selKey);
chris@1:           conn.unlockReadLock();
chris@1:         }
chris@1: 
chris@1:       }
chris@1:       catch(CancelledKeyException ex)
chris@1:       {
cli@15:         Log.get().warning("ChannelReader.run(): " + ex);
cli@15:         Log.get().log(Level.INFO, "", ex);
chris@1:       }
chris@1:       catch(Exception ex)
chris@1:       {
chris@1:         ex.printStackTrace();
chris@1:       }
chris@1:       
chris@1:       // Eventually wait for a register operation
chris@1:       synchronized (NNTPDaemon.RegisterGate)
chris@1:       {
chris@1:       // Do nothing; FindBugs may warn about an empty synchronized 
chris@1:       // statement, but we cannot use a wait()/notify() mechanism here.
chris@1:       // If we used something like RegisterGate.wait() we block here
chris@1:       // until the NNTPDaemon calls notify(). But the daemon only
chris@1:       // calls notify() if itself is NOT blocked in the listening socket.
chris@1:       }
chris@1:     } // while(isRunning())
chris@1:   }
chris@1:   
chris@1:   private void processSelectionKey(final NNTPConnection connection,
chris@1:     final SocketChannel socketChannel, final SelectionKey selKey)
chris@1:     throws InterruptedException, IOException
chris@1:   {
chris@1:     assert selKey != null;
chris@1:     assert selKey.isReadable();
chris@1:     
chris@1:     // Some bytes are available for reading
chris@1:     if(selKey.isValid())
chris@3:     {   
chris@1:       // Lock the channel
chris@1:       //synchronized(socketChannel)
chris@1:       {
chris@1:         // Read the data into the appropriate buffer
chris@1:         ByteBuffer buf = connection.getInputBuffer();
chris@1:         int read = -1;
chris@1:         try 
chris@1:         {
chris@1:           read = socketChannel.read(buf);
chris@3:         }
chris@3:         catch(IOException ex)
chris@3:         {
chris@3:           // The connection was probably closed by the remote host
chris@3:           // in a non-clean fashion
cli@15:           Log.get().info("ChannelReader.processSelectionKey(): " + ex);
chris@3:         }
chris@1:         catch(Exception ex) 
chris@1:         {
cli@15:           Log.get().warning("ChannelReader.processSelectionKey(): " + ex);
chris@1:         }
chris@1:         
chris@1:         if(read == -1) // End of stream
chris@1:         {
chris@1:           selKey.cancel();
chris@1:         }
chris@1:         else if(read > 0) // If some data was read
chris@1:         {
chris@1:           ConnectionWorker.addChannel(socketChannel);
chris@1:         }
chris@1:       }
chris@1:     }
chris@1:     else
chris@1:     {
chris@1:       // Should not happen
cli@15:       Log.get().severe("Should not happen: " + selKey.toString());
chris@1:     }
chris@1:   }
chris@1:   
chris@1: }