src/org/sonews/daemon/ChannelReader.java
changeset 39 73b21e9f3958
parent 35 ed84c8bdd87b
     1.1 --- a/src/org/sonews/daemon/ChannelReader.java	Sun Aug 29 17:28:58 2010 +0200
     1.2 +++ b/src/org/sonews/daemon/ChannelReader.java	Mon Aug 30 00:20:06 2010 +0200
     1.3 @@ -37,166 +37,141 @@
     1.4  class ChannelReader extends AbstractDaemon
     1.5  {
     1.6  
     1.7 -  private static ChannelReader instance = new ChannelReader();
     1.8 +	private static ChannelReader instance = new ChannelReader();
     1.9  
    1.10 -  /**
    1.11 -   * @return Active ChannelReader instance.
    1.12 -   */
    1.13 -  public static ChannelReader getInstance()
    1.14 -  {
    1.15 -    return instance;
    1.16 -  }
    1.17 -  
    1.18 -  private Selector selector = null;
    1.19 -  
    1.20 -  protected ChannelReader()
    1.21 -  {
    1.22 -  }
    1.23 -  
    1.24 -  /**
    1.25 -   * Sets the selector which is used by this reader to determine the channel
    1.26 -   * to read from.
    1.27 -   * @param selector
    1.28 -   */
    1.29 -  public void setSelector(final Selector selector)
    1.30 -  {
    1.31 -    this.selector = selector;
    1.32 -  }
    1.33 -  
    1.34 -  /**
    1.35 -   * Run loop. Blocks until some data is available in a channel.
    1.36 -   */
    1.37 -  @Override
    1.38 -  public void run()
    1.39 -  {
    1.40 -    assert selector != null;
    1.41 +	/**
    1.42 +	 * @return Active ChannelReader instance.
    1.43 +	 */
    1.44 +	public static ChannelReader getInstance()
    1.45 +	{
    1.46 +		return instance;
    1.47 +	}
    1.48 +	private Selector selector = null;
    1.49  
    1.50 -    while(isRunning())
    1.51 -    {
    1.52 -      try
    1.53 -      {
    1.54 -        // select() blocks until some SelectableChannels are ready for
    1.55 -        // processing. There is no need to lock the selector as we have only
    1.56 -        // one thread per selector.
    1.57 -        selector.select();
    1.58 +	protected ChannelReader()
    1.59 +	{
    1.60 +	}
    1.61  
    1.62 -        // Get list of selection keys with pending events.
    1.63 -        // Note: the selected key set is not thread-safe
    1.64 -        SocketChannel channel = null;
    1.65 -        NNTPConnection conn = null;
    1.66 -        final Set<SelectionKey> selKeys = selector.selectedKeys();
    1.67 -        SelectionKey selKey = null;
    1.68 +	/**
    1.69 +	 * Sets the selector which is used by this reader to determine the channel
    1.70 +	 * to read from.
    1.71 +	 * @param selector
    1.72 +	 */
    1.73 +	public void setSelector(final Selector selector)
    1.74 +	{
    1.75 +		this.selector = selector;
    1.76 +	}
    1.77  
    1.78 -        synchronized (selKeys)
    1.79 -        {
    1.80 -          Iterator it = selKeys.iterator();
    1.81 +	/**
    1.82 +	 * Run loop. Blocks until some data is available in a channel.
    1.83 +	 */
    1.84 +	@Override
    1.85 +	public void run()
    1.86 +	{
    1.87 +		assert selector != null;
    1.88  
    1.89 -          // Process the first pending event
    1.90 -          while (it.hasNext())
    1.91 -          {
    1.92 -            selKey = (SelectionKey) it.next();
    1.93 -            channel = (SocketChannel) selKey.channel();
    1.94 -            conn = Connections.getInstance().get(channel);
    1.95 +		while (isRunning()) {
    1.96 +			try {
    1.97 +				// select() blocks until some SelectableChannels are ready for
    1.98 +				// processing. There is no need to lock the selector as we have only
    1.99 +				// one thread per selector.
   1.100 +				selector.select();
   1.101  
   1.102 -            // Because we cannot lock the selKey as that would cause a deadlock
   1.103 -            // we lock the connection. To preserve the order of the received
   1.104 -            // byte blocks a selection key for a connection that has pending
   1.105 -            // read events is skipped.
   1.106 -            if (conn == null || conn.tryReadLock())
   1.107 -            {
   1.108 -              // Remove from set to indicate that it's being processed
   1.109 -              it.remove();
   1.110 -              if (conn != null)
   1.111 -              {
   1.112 -                break; // End while loop
   1.113 -              }
   1.114 -            }
   1.115 -            else
   1.116 -            {
   1.117 -              selKey = null;
   1.118 -              channel = null;
   1.119 -              conn = null;
   1.120 -            }
   1.121 -          }
   1.122 -        }
   1.123 +				// Get list of selection keys with pending events.
   1.124 +				// Note: the selected key set is not thread-safe
   1.125 +				SocketChannel channel = null;
   1.126 +				NNTPConnection conn = null;
   1.127 +				final Set<SelectionKey> selKeys = selector.selectedKeys();
   1.128 +				SelectionKey selKey = null;
   1.129  
   1.130 -        // Do not lock the selKeys while processing because this causes
   1.131 -        // a deadlock in sun.nio.ch.SelectorImpl.lockAndDoSelect()
   1.132 -        if (selKey != null && channel != null && conn != null)
   1.133 -        {
   1.134 -          processSelectionKey(conn, channel, selKey);
   1.135 -          conn.unlockReadLock();
   1.136 -        }
   1.137 +				synchronized (selKeys) {
   1.138 +					Iterator it = selKeys.iterator();
   1.139  
   1.140 -      }
   1.141 -      catch(CancelledKeyException ex)
   1.142 -      {
   1.143 -        Log.get().warning("ChannelReader.run(): " + ex);
   1.144 -        Log.get().log(Level.INFO, "", ex);
   1.145 -      }
   1.146 -      catch(Exception ex)
   1.147 -      {
   1.148 -        ex.printStackTrace();
   1.149 -      }
   1.150 -      
   1.151 -      // Eventually wait for a register operation
   1.152 -      synchronized (NNTPDaemon.RegisterGate)
   1.153 -      {
   1.154 -      // Do nothing; FindBugs may warn about an empty synchronized 
   1.155 -      // statement, but we cannot use a wait()/notify() mechanism here.
   1.156 -      // If we used something like RegisterGate.wait() we block here
   1.157 -      // until the NNTPDaemon calls notify(). But the daemon only
   1.158 -      // calls notify() if itself is NOT blocked in the listening socket.
   1.159 -      }
   1.160 -    } // while(isRunning())
   1.161 -  }
   1.162 -  
   1.163 -  private void processSelectionKey(final NNTPConnection connection,
   1.164 -    final SocketChannel socketChannel, final SelectionKey selKey)
   1.165 -    throws InterruptedException, IOException
   1.166 -  {
   1.167 -    assert selKey != null;
   1.168 -    assert selKey.isReadable();
   1.169 -    
   1.170 -    // Some bytes are available for reading
   1.171 -    if(selKey.isValid())
   1.172 -    {   
   1.173 -      // Lock the channel
   1.174 -      //synchronized(socketChannel)
   1.175 -      {
   1.176 -        // Read the data into the appropriate buffer
   1.177 -        ByteBuffer buf = connection.getInputBuffer();
   1.178 -        int read = -1;
   1.179 -        try 
   1.180 -        {
   1.181 -          read = socketChannel.read(buf);
   1.182 -        }
   1.183 -        catch(IOException ex)
   1.184 -        {
   1.185 -          // The connection was probably closed by the remote host
   1.186 -          // in a non-clean fashion
   1.187 -          Log.get().info("ChannelReader.processSelectionKey(): " + ex);
   1.188 -        }
   1.189 -        catch(Exception ex) 
   1.190 -        {
   1.191 -          Log.get().warning("ChannelReader.processSelectionKey(): " + ex);
   1.192 -        }
   1.193 -        
   1.194 -        if(read == -1) // End of stream
   1.195 -        {
   1.196 -          selKey.cancel();
   1.197 -        }
   1.198 -        else if(read > 0) // If some data was read
   1.199 -        {
   1.200 -          ConnectionWorker.addChannel(socketChannel);
   1.201 -        }
   1.202 -      }
   1.203 -    }
   1.204 -    else
   1.205 -    {
   1.206 -      // Should not happen
   1.207 -      Log.get().severe("Should not happen: " + selKey.toString());
   1.208 -    }
   1.209 -  }
   1.210 -  
   1.211 +					// Process the first pending event
   1.212 +					while (it.hasNext()) {
   1.213 +						selKey = (SelectionKey) it.next();
   1.214 +						channel = (SocketChannel) selKey.channel();
   1.215 +						conn = Connections.getInstance().get(channel);
   1.216 +
   1.217 +						// Because we cannot lock the selKey as that would cause a deadlock
   1.218 +						// we lock the connection. To preserve the order of the received
   1.219 +						// byte blocks a selection key for a connection that has pending
   1.220 +						// read events is skipped.
   1.221 +						if (conn == null || conn.tryReadLock()) {
   1.222 +							// Remove from set to indicate that it's being processed
   1.223 +							it.remove();
   1.224 +							if (conn != null) {
   1.225 +								break; // End while loop
   1.226 +							}
   1.227 +						} else {
   1.228 +							selKey = null;
   1.229 +							channel = null;
   1.230 +							conn = null;
   1.231 +						}
   1.232 +					}
   1.233 +				}
   1.234 +
   1.235 +				// Do not lock the selKeys while processing because this causes
   1.236 +				// a deadlock in sun.nio.ch.SelectorImpl.lockAndDoSelect()
   1.237 +				if (selKey != null && channel != null && conn != null) {
   1.238 +					processSelectionKey(conn, channel, selKey);
   1.239 +					conn.unlockReadLock();
   1.240 +				}
   1.241 +
   1.242 +			} catch (CancelledKeyException ex) {
   1.243 +				Log.get().warning("ChannelReader.run(): " + ex);
   1.244 +				Log.get().log(Level.INFO, "", ex);
   1.245 +			} catch (Exception ex) {
   1.246 +				ex.printStackTrace();
   1.247 +			}
   1.248 +
   1.249 +			// Eventually wait for a register operation
   1.250 +			synchronized (NNTPDaemon.RegisterGate) {
   1.251 +				// Do nothing; FindBugs may warn about an empty synchronized
   1.252 +				// statement, but we cannot use a wait()/notify() mechanism here.
   1.253 +				// If we used something like RegisterGate.wait() we block here
   1.254 +				// until the NNTPDaemon calls notify(). But the daemon only
   1.255 +				// calls notify() if itself is NOT blocked in the listening socket.
   1.256 +			}
   1.257 +		} // while(isRunning())
   1.258 +	}
   1.259 +
   1.260 +	private void processSelectionKey(final NNTPConnection connection,
   1.261 +		final SocketChannel socketChannel, final SelectionKey selKey)
   1.262 +		throws InterruptedException, IOException
   1.263 +	{
   1.264 +		assert selKey != null;
   1.265 +		assert selKey.isReadable();
   1.266 +
   1.267 +		// Some bytes are available for reading
   1.268 +		if (selKey.isValid()) {
   1.269 +			// Lock the channel
   1.270 +			//synchronized(socketChannel)
   1.271 +			{
   1.272 +				// Read the data into the appropriate buffer
   1.273 +				ByteBuffer buf = connection.getInputBuffer();
   1.274 +				int read = -1;
   1.275 +				try {
   1.276 +					read = socketChannel.read(buf);
   1.277 +				} catch (IOException ex) {
   1.278 +					// The connection was probably closed by the remote host
   1.279 +					// in a non-clean fashion
   1.280 +					Log.get().info("ChannelReader.processSelectionKey(): " + ex);
   1.281 +				} catch (Exception ex) {
   1.282 +					Log.get().warning("ChannelReader.processSelectionKey(): " + ex);
   1.283 +				}
   1.284 +
   1.285 +				if (read == -1) // End of stream
   1.286 +				{
   1.287 +					selKey.cancel();
   1.288 +				} else if (read > 0) // If some data was read
   1.289 +				{
   1.290 +					ConnectionWorker.addChannel(socketChannel);
   1.291 +				}
   1.292 +			}
   1.293 +		} else {
   1.294 +			// Should not happen
   1.295 +			Log.get().severe("Should not happen: " + selKey.toString());
   1.296 +		}
   1.297 +	}
   1.298  }