src/org/sonews/daemon/ConnectionWorker.java
changeset 62 be4e87479855
parent 35 ed84c8bdd87b
     1.1 --- a/src/org/sonews/daemon/ConnectionWorker.java	Sun Aug 29 17:28:58 2010 +0200
     1.2 +++ b/src/org/sonews/daemon/ConnectionWorker.java	Wed Sep 14 23:25:00 2011 +0200
     1.3 @@ -31,72 +31,60 @@
     1.4  class ConnectionWorker extends AbstractDaemon
     1.5  {
     1.6  
     1.7 -  // 256 pending events should be enough
     1.8 -  private static ArrayBlockingQueue<SocketChannel> pendingChannels
     1.9 -    = new ArrayBlockingQueue<SocketChannel>(256, true);
    1.10 -  
    1.11 -  /**
    1.12 -   * Registers the given channel for further event processing.
    1.13 -   * @param channel
    1.14 -   */
    1.15 -  public static void addChannel(SocketChannel channel)
    1.16 -    throws InterruptedException
    1.17 -  {
    1.18 -    pendingChannels.put(channel);
    1.19 -  }
    1.20 -  
    1.21 -  /**
    1.22 -   * Processing loop.
    1.23 -   */
    1.24 -  @Override
    1.25 -  public void run()
    1.26 -  {
    1.27 -    while(isRunning())
    1.28 -    {
    1.29 -      try
    1.30 -      {
    1.31 -        // Retrieve and remove if available, otherwise wait.
    1.32 -        SocketChannel channel = pendingChannels.take();
    1.33 +	// 256 pending events should be enough
    1.34 +	private static ArrayBlockingQueue<SocketChannel> pendingChannels = new ArrayBlockingQueue<SocketChannel>(256, true);
    1.35  
    1.36 -        if(channel != null)
    1.37 -        {
    1.38 -          // Connections.getInstance().get() MAY return null
    1.39 -          NNTPConnection conn = Connections.getInstance().get(channel);
    1.40 -          
    1.41 -          // Try to lock the connection object
    1.42 -          if(conn != null && conn.tryReadLock())
    1.43 -          {
    1.44 -            ByteBuffer buf = conn.getBuffers().nextInputLine();
    1.45 -            while(buf != null) // Complete line was received
    1.46 -            {
    1.47 -              final byte[] line = new byte[buf.limit()];
    1.48 -              buf.get(line);
    1.49 -              ChannelLineBuffers.recycleBuffer(buf);
    1.50 -              
    1.51 -              // Here is the actual work done
    1.52 -              conn.lineReceived(line);
    1.53 +	/**
    1.54 +	 * Registers the given channel for further event processing.
    1.55 +	 * @param channel
    1.56 +	 */
    1.57 +	public static void addChannel(SocketChannel channel)
    1.58 +		throws InterruptedException
    1.59 +	{
    1.60 +		pendingChannels.put(channel);
    1.61 +	}
    1.62  
    1.63 -              // Read next line as we could have already received the next line
    1.64 -              buf = conn.getBuffers().nextInputLine();
    1.65 -            }
    1.66 -            conn.unlockReadLock();
    1.67 -          }
    1.68 -          else
    1.69 -          {
    1.70 -            addChannel(channel);
    1.71 -          }
    1.72 -        }
    1.73 -      }
    1.74 -      catch(InterruptedException ex)
    1.75 -      {
    1.76 -        Log.get().info("ConnectionWorker interrupted: " + ex);
    1.77 -      }
    1.78 -      catch(Exception ex)
    1.79 -      {
    1.80 -        Log.get().severe("Exception in ConnectionWorker: " + ex);
    1.81 -        ex.printStackTrace();
    1.82 -      }
    1.83 -    } // end while(isRunning())
    1.84 -  }
    1.85 -  
    1.86 +	/**
    1.87 +	 * Processing loop.
    1.88 +	 */
    1.89 +	@Override
    1.90 +	public void run()
    1.91 +	{
    1.92 +		while (isRunning()) {
    1.93 +			try {
    1.94 +				// Retrieve and remove if available, otherwise wait.
    1.95 +				SocketChannel channel = pendingChannels.take();
    1.96 +
    1.97 +				if (channel != null) {
    1.98 +					// Connections.getInstance().get() MAY return null
    1.99 +					NNTPConnection conn = Connections.getInstance().get(channel);
   1.100 +
   1.101 +					// Try to lock the connection object
   1.102 +					if (conn != null && conn.tryReadLock()) {
   1.103 +						ByteBuffer buf = conn.getBuffers().nextInputLine();
   1.104 +						while (buf != null) // Complete line was received
   1.105 +						{
   1.106 +							final byte[] line = new byte[buf.limit()];
   1.107 +							buf.get(line);
   1.108 +							ChannelLineBuffers.recycleBuffer(buf);
   1.109 +
   1.110 +							// Here is the actual work done
   1.111 +							conn.lineReceived(line);
   1.112 +
   1.113 +							// Read next line as we could have already received the next line
   1.114 +							buf = conn.getBuffers().nextInputLine();
   1.115 +						}
   1.116 +						conn.unlockReadLock();
   1.117 +					} else {
   1.118 +						addChannel(channel);
   1.119 +					}
   1.120 +				}
   1.121 +			} catch (InterruptedException ex) {
   1.122 +				Log.get().info("ConnectionWorker interrupted: " + ex);
   1.123 +			} catch (Exception ex) {
   1.124 +				Log.get().severe("Exception in ConnectionWorker: " + ex);
   1.125 +				ex.printStackTrace();
   1.126 +			}
   1.127 +		} // end while(isRunning())
   1.128 +	}
   1.129  }