src/org/sonews/daemon/ChannelWriter.java
changeset 105 d566d08c20d6
parent 35 ed84c8bdd87b
     1.1 --- a/src/org/sonews/daemon/ChannelWriter.java	Sun Aug 29 17:28:58 2010 +0200
     1.2 +++ b/src/org/sonews/daemon/ChannelWriter.java	Fri Oct 21 18:21:30 2011 +0200
     1.3 @@ -35,176 +35,151 @@
     1.4  class ChannelWriter extends AbstractDaemon
     1.5  {
     1.6  
     1.7 -  private static ChannelWriter instance = new ChannelWriter();
     1.8 +	private static ChannelWriter instance = new ChannelWriter();
     1.9  
    1.10 -  /**
    1.11 -   * @return Returns the active ChannelWriter instance.
    1.12 -   */
    1.13 -  public static ChannelWriter getInstance()
    1.14 -  {
    1.15 -    return instance;
    1.16 -  }
    1.17 -  
    1.18 -  private Selector selector = null;
    1.19 -  
    1.20 -  protected ChannelWriter()
    1.21 -  {
    1.22 -  }
    1.23 -  
    1.24 -  /**
    1.25 -   * @return Selector associated with this instance.
    1.26 -   */
    1.27 -  public Selector getSelector()
    1.28 -  {
    1.29 -    return this.selector;
    1.30 -  }
    1.31 -  
    1.32 -  /**
    1.33 -   * Sets the selector that is used by this ChannelWriter.
    1.34 -   * @param selector
    1.35 -   */
    1.36 -  public void setSelector(final Selector selector)
    1.37 -  {
    1.38 -    this.selector = selector;
    1.39 -  }
    1.40 -  
    1.41 -  /**
    1.42 -   * Run loop.
    1.43 -   */
    1.44 -  @Override
    1.45 -  public void run()
    1.46 -  {
    1.47 -    assert selector != null;
    1.48 +	/**
    1.49 +	 * @return Returns the active ChannelWriter instance.
    1.50 +	 */
    1.51 +	public static ChannelWriter getInstance()
    1.52 +	{
    1.53 +		return instance;
    1.54 +	}
    1.55 +	private Selector selector = null;
    1.56  
    1.57 -    while(isRunning())
    1.58 -    {
    1.59 -      try
    1.60 -      {
    1.61 -        SelectionKey   selKey        = null;
    1.62 -        SocketChannel  socketChannel = null;
    1.63 -        NNTPConnection connection    = null;
    1.64 +	protected ChannelWriter()
    1.65 +	{
    1.66 +	}
    1.67  
    1.68 -        // select() blocks until some SelectableChannels are ready for
    1.69 -        // processing. There is no need to synchronize the selector as we
    1.70 -        // have only one thread per selector.
    1.71 -        selector.select(); // The return value of select can be ignored
    1.72 +	/**
    1.73 +	 * @return Selector associated with this instance.
    1.74 +	 */
    1.75 +	public Selector getSelector()
    1.76 +	{
    1.77 +		return this.selector;
    1.78 +	}
    1.79  
    1.80 -        // Get list of selection keys with pending OP_WRITE events.
    1.81 -        // The keySET is not thread-safe whereas the keys itself are.
    1.82 -        Iterator it = selector.selectedKeys().iterator();
    1.83 +	/**
    1.84 +	 * Sets the selector that is used by this ChannelWriter.
    1.85 +	 * @param selector
    1.86 +	 */
    1.87 +	public void setSelector(final Selector selector)
    1.88 +	{
    1.89 +		this.selector = selector;
    1.90 +	}
    1.91  
    1.92 -        while (it.hasNext())
    1.93 -        {
    1.94 -          // We remove the first event from the set and store it for
    1.95 -          // later processing.
    1.96 -          selKey = (SelectionKey) it.next();
    1.97 -          socketChannel = (SocketChannel) selKey.channel();
    1.98 -          connection = Connections.getInstance().get(socketChannel);
    1.99 +	/**
   1.100 +	 * Run loop.
   1.101 +	 */
   1.102 +	@Override
   1.103 +	public void run()
   1.104 +	{
   1.105 +		assert selector != null;
   1.106  
   1.107 -          it.remove();
   1.108 -          if (connection != null)
   1.109 -          {
   1.110 -            break;
   1.111 -          }
   1.112 -          else
   1.113 -          {
   1.114 -            selKey = null;
   1.115 -          }
   1.116 -        }
   1.117 -        
   1.118 -        if (selKey != null)
   1.119 -        {
   1.120 -          try
   1.121 -          {
   1.122 -            // Process the selected key.
   1.123 -            // As there is only one OP_WRITE key for a given channel, we need
   1.124 -            // not to synchronize this processing to retain the order.
   1.125 -            processSelectionKey(connection, socketChannel, selKey);
   1.126 -          }
   1.127 -          catch (IOException ex)
   1.128 -          {
   1.129 -            Log.get().warning("Error writing to channel: " + ex);
   1.130 +		while (isRunning()) {
   1.131 +			try {
   1.132 +				SelectionKey selKey = null;
   1.133 +				SocketChannel socketChannel = null;
   1.134 +				NNTPConnection connection = null;
   1.135  
   1.136 -            // Cancel write events for this channel
   1.137 -            selKey.cancel();
   1.138 -            connection.shutdownInput();
   1.139 -            connection.shutdownOutput();
   1.140 -          }
   1.141 -        }
   1.142 -        
   1.143 -        // Eventually wait for a register operation
   1.144 -        synchronized(NNTPDaemon.RegisterGate) { /* do nothing */ }
   1.145 -      }
   1.146 -      catch(CancelledKeyException ex)
   1.147 -      {
   1.148 -        Log.get().info("ChannelWriter.run(): " + ex);
   1.149 -      }
   1.150 -      catch(Exception ex)
   1.151 -      {
   1.152 -        ex.printStackTrace();
   1.153 -      }
   1.154 -    } // while(isRunning())
   1.155 -  }
   1.156 -  
   1.157 -  private void processSelectionKey(final NNTPConnection connection,
   1.158 -    final SocketChannel socketChannel, final SelectionKey selKey)
   1.159 -    throws InterruptedException, IOException
   1.160 -  {
   1.161 -    assert connection != null;
   1.162 -    assert socketChannel != null;
   1.163 -    assert selKey != null;
   1.164 -    assert selKey.isWritable();
   1.165 +				// select() blocks until some SelectableChannels are ready for
   1.166 +				// processing. There is no need to synchronize the selector as we
   1.167 +				// have only one thread per selector.
   1.168 +				selector.select(); // The return value of select can be ignored
   1.169  
   1.170 -    // SocketChannel is ready for writing
   1.171 -    if(selKey.isValid())
   1.172 -    {
   1.173 -      // Lock the socket channel
   1.174 -      synchronized(socketChannel)
   1.175 -      {
   1.176 -        // Get next output buffer
   1.177 -        ByteBuffer buf = connection.getOutputBuffer();
   1.178 -        if(buf == null)
   1.179 -        {
   1.180 -          // Currently we have nothing to write, so we stop the writeable
   1.181 -          // events until we have something to write to the socket channel
   1.182 -          //selKey.cancel();
   1.183 -          selKey.interestOps(0);
   1.184 -          // Update activity timestamp to prevent too early disconnects
   1.185 -          // on slow client connections
   1.186 -          connection.setLastActivity(System.currentTimeMillis());
   1.187 -          return;
   1.188 -        }
   1.189 - 
   1.190 -        while(buf != null) // There is data to be send
   1.191 -        {
   1.192 -          // Write buffer to socket channel; this method does not block
   1.193 -          if(socketChannel.write(buf) <= 0)
   1.194 -          {
   1.195 -            // Perhaps there is data to be written, but the SocketChannel's
   1.196 -            // buffer is full, so we stop writing to until the next event.
   1.197 -            break;
   1.198 -          }
   1.199 -          else
   1.200 -          {
   1.201 -            // Retrieve next buffer if available; method may return the same
   1.202 -            // buffer instance if it still have some bytes remaining
   1.203 -            buf = connection.getOutputBuffer();
   1.204 -          }
   1.205 -        }
   1.206 -      }
   1.207 -    }
   1.208 -    else
   1.209 -    {
   1.210 -      Log.get().warning("Invalid OP_WRITE key: " + selKey);
   1.211 +				// Get list of selection keys with pending OP_WRITE events.
   1.212 +				// The keySET is not thread-safe whereas the keys itself are.
   1.213 +				Iterator it = selector.selectedKeys().iterator();
   1.214  
   1.215 -      if(socketChannel.socket().isClosed())
   1.216 -      {
   1.217 -        connection.shutdownInput();
   1.218 -        connection.shutdownOutput();
   1.219 -        socketChannel.close();
   1.220 -        Log.get().info("Connection closed.");
   1.221 -      }
   1.222 -    }
   1.223 -  }
   1.224 -  
   1.225 +				while (it.hasNext()) {
   1.226 +					// We remove the first event from the set and store it for
   1.227 +					// later processing.
   1.228 +					selKey = (SelectionKey) it.next();
   1.229 +					socketChannel = (SocketChannel) selKey.channel();
   1.230 +					connection = Connections.getInstance().get(socketChannel);
   1.231 +
   1.232 +					it.remove();
   1.233 +					if (connection != null) {
   1.234 +						break;
   1.235 +					} else {
   1.236 +						selKey = null;
   1.237 +					}
   1.238 +				}
   1.239 +
   1.240 +				if (selKey != null) {
   1.241 +					try {
   1.242 +						// Process the selected key.
   1.243 +						// As there is only one OP_WRITE key for a given channel, we need
   1.244 +						// not to synchronize this processing to retain the order.
   1.245 +						processSelectionKey(connection, socketChannel, selKey);
   1.246 +					} catch (IOException ex) {
   1.247 +						Log.get().warning("Error writing to channel: " + ex);
   1.248 +
   1.249 +						// Cancel write events for this channel
   1.250 +						selKey.cancel();
   1.251 +						connection.shutdownInput();
   1.252 +						connection.shutdownOutput();
   1.253 +					}
   1.254 +				}
   1.255 +
   1.256 +				// Eventually wait for a register operation
   1.257 +				synchronized (NNTPDaemon.RegisterGate) { /* do nothing */ }
   1.258 +			} catch (CancelledKeyException ex) {
   1.259 +				Log.get().info("ChannelWriter.run(): " + ex);
   1.260 +			} catch (Exception ex) {
   1.261 +				ex.printStackTrace();
   1.262 +			}
   1.263 +		} // while(isRunning())
   1.264 +	}
   1.265 +
   1.266 +	private void processSelectionKey(final NNTPConnection connection,
   1.267 +		final SocketChannel socketChannel, final SelectionKey selKey)
   1.268 +		throws InterruptedException, IOException
   1.269 +	{
   1.270 +		assert connection != null;
   1.271 +		assert socketChannel != null;
   1.272 +		assert selKey != null;
   1.273 +		assert selKey.isWritable();
   1.274 +
   1.275 +		// SocketChannel is ready for writing
   1.276 +		if (selKey.isValid()) {
   1.277 +			// Lock the socket channel
   1.278 +			synchronized (socketChannel) {
   1.279 +				// Get next output buffer
   1.280 +				ByteBuffer buf = connection.getOutputBuffer();
   1.281 +				if (buf == null) {
   1.282 +					// Currently we have nothing to write, so we stop the writeable
   1.283 +					// events until we have something to write to the socket channel
   1.284 +					//selKey.cancel();
   1.285 +					selKey.interestOps(0);
   1.286 +					// Update activity timestamp to prevent too early disconnects
   1.287 +					// on slow client connections
   1.288 +					connection.setLastActivity(System.currentTimeMillis());
   1.289 +					return;
   1.290 +				}
   1.291 +
   1.292 +				while (buf != null) // There is data to be send
   1.293 +				{
   1.294 +					// Write buffer to socket channel; this method does not block
   1.295 +					if (socketChannel.write(buf) <= 0) {
   1.296 +						// Perhaps there is data to be written, but the SocketChannel's
   1.297 +						// buffer is full, so we stop writing to until the next event.
   1.298 +						break;
   1.299 +					} else {
   1.300 +						// Retrieve next buffer if available; method may return the same
   1.301 +						// buffer instance if it still have some bytes remaining
   1.302 +						buf = connection.getOutputBuffer();
   1.303 +					}
   1.304 +				}
   1.305 +			}
   1.306 +		} else {
   1.307 +			Log.get().warning("Invalid OP_WRITE key: " + selKey);
   1.308 +
   1.309 +			if (socketChannel.socket().isClosed()) {
   1.310 +				connection.shutdownInput();
   1.311 +				connection.shutdownOutput();
   1.312 +				socketChannel.close();
   1.313 +				Log.get().info("Connection closed.");
   1.314 +			}
   1.315 +		}
   1.316 +	}
   1.317  }