1.1 --- a/org/sonews/daemon/ChannelWriter.java Sun Aug 29 17:04:25 2010 +0200
1.2 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000
1.3 @@ -1,210 +0,0 @@
1.4 -/*
1.5 - * SONEWS News Server
1.6 - * see AUTHORS for the list of contributors
1.7 - *
1.8 - * This program is free software: you can redistribute it and/or modify
1.9 - * it under the terms of the GNU General Public License as published by
1.10 - * the Free Software Foundation, either version 3 of the License, or
1.11 - * (at your option) any later version.
1.12 - *
1.13 - * This program is distributed in the hope that it will be useful,
1.14 - * but WITHOUT ANY WARRANTY; without even the implied warranty of
1.15 - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1.16 - * GNU General Public License for more details.
1.17 - *
1.18 - * You should have received a copy of the GNU General Public License
1.19 - * along with this program. If not, see <http://www.gnu.org/licenses/>.
1.20 - */
1.21 -
1.22 -package org.sonews.daemon;
1.23 -
1.24 -import org.sonews.util.Log;
1.25 -import java.io.IOException;
1.26 -import java.nio.ByteBuffer;
1.27 -import java.nio.channels.CancelledKeyException;
1.28 -import java.nio.channels.SelectionKey;
1.29 -import java.nio.channels.Selector;
1.30 -import java.nio.channels.SocketChannel;
1.31 -import java.util.Iterator;
1.32 -
1.33 -/**
1.34 - * A Thread task that processes OP_WRITE events for SocketChannels.
1.35 - * @author Christian Lins
1.36 - * @since sonews/0.5.0
1.37 - */
1.38 -class ChannelWriter extends AbstractDaemon
1.39 -{
1.40 -
1.41 - private static ChannelWriter instance = new ChannelWriter();
1.42 -
1.43 - /**
1.44 - * @return Returns the active ChannelWriter instance.
1.45 - */
1.46 - public static ChannelWriter getInstance()
1.47 - {
1.48 - return instance;
1.49 - }
1.50 -
1.51 - private Selector selector = null;
1.52 -
1.53 - protected ChannelWriter()
1.54 - {
1.55 - }
1.56 -
1.57 - /**
1.58 - * @return Selector associated with this instance.
1.59 - */
1.60 - public Selector getSelector()
1.61 - {
1.62 - return this.selector;
1.63 - }
1.64 -
1.65 - /**
1.66 - * Sets the selector that is used by this ChannelWriter.
1.67 - * @param selector
1.68 - */
1.69 - public void setSelector(final Selector selector)
1.70 - {
1.71 - this.selector = selector;
1.72 - }
1.73 -
1.74 - /**
1.75 - * Run loop.
1.76 - */
1.77 - @Override
1.78 - public void run()
1.79 - {
1.80 - assert selector != null;
1.81 -
1.82 - while(isRunning())
1.83 - {
1.84 - try
1.85 - {
1.86 - SelectionKey selKey = null;
1.87 - SocketChannel socketChannel = null;
1.88 - NNTPConnection connection = null;
1.89 -
1.90 - // select() blocks until some SelectableChannels are ready for
1.91 - // processing. There is no need to synchronize the selector as we
1.92 - // have only one thread per selector.
1.93 - selector.select(); // The return value of select can be ignored
1.94 -
1.95 - // Get list of selection keys with pending OP_WRITE events.
1.96 - // The keySET is not thread-safe whereas the keys itself are.
1.97 - Iterator it = selector.selectedKeys().iterator();
1.98 -
1.99 - while (it.hasNext())
1.100 - {
1.101 - // We remove the first event from the set and store it for
1.102 - // later processing.
1.103 - selKey = (SelectionKey) it.next();
1.104 - socketChannel = (SocketChannel) selKey.channel();
1.105 - connection = Connections.getInstance().get(socketChannel);
1.106 -
1.107 - it.remove();
1.108 - if (connection != null)
1.109 - {
1.110 - break;
1.111 - }
1.112 - else
1.113 - {
1.114 - selKey = null;
1.115 - }
1.116 - }
1.117 -
1.118 - if (selKey != null)
1.119 - {
1.120 - try
1.121 - {
1.122 - // Process the selected key.
1.123 - // As there is only one OP_WRITE key for a given channel, we need
1.124 - // not to synchronize this processing to retain the order.
1.125 - processSelectionKey(connection, socketChannel, selKey);
1.126 - }
1.127 - catch (IOException ex)
1.128 - {
1.129 - Log.get().warning("Error writing to channel: " + ex);
1.130 -
1.131 - // Cancel write events for this channel
1.132 - selKey.cancel();
1.133 - connection.shutdownInput();
1.134 - connection.shutdownOutput();
1.135 - }
1.136 - }
1.137 -
1.138 - // Eventually wait for a register operation
1.139 - synchronized(NNTPDaemon.RegisterGate) { /* do nothing */ }
1.140 - }
1.141 - catch(CancelledKeyException ex)
1.142 - {
1.143 - Log.get().info("ChannelWriter.run(): " + ex);
1.144 - }
1.145 - catch(Exception ex)
1.146 - {
1.147 - ex.printStackTrace();
1.148 - }
1.149 - } // while(isRunning())
1.150 - }
1.151 -
1.152 - private void processSelectionKey(final NNTPConnection connection,
1.153 - final SocketChannel socketChannel, final SelectionKey selKey)
1.154 - throws InterruptedException, IOException
1.155 - {
1.156 - assert connection != null;
1.157 - assert socketChannel != null;
1.158 - assert selKey != null;
1.159 - assert selKey.isWritable();
1.160 -
1.161 - // SocketChannel is ready for writing
1.162 - if(selKey.isValid())
1.163 - {
1.164 - // Lock the socket channel
1.165 - synchronized(socketChannel)
1.166 - {
1.167 - // Get next output buffer
1.168 - ByteBuffer buf = connection.getOutputBuffer();
1.169 - if(buf == null)
1.170 - {
1.171 - // Currently we have nothing to write, so we stop the writeable
1.172 - // events until we have something to write to the socket channel
1.173 - //selKey.cancel();
1.174 - selKey.interestOps(0);
1.175 - // Update activity timestamp to prevent too early disconnects
1.176 - // on slow client connections
1.177 - connection.setLastActivity(System.currentTimeMillis());
1.178 - return;
1.179 - }
1.180 -
1.181 - while(buf != null) // There is data to be send
1.182 - {
1.183 - // Write buffer to socket channel; this method does not block
1.184 - if(socketChannel.write(buf) <= 0)
1.185 - {
1.186 - // Perhaps there is data to be written, but the SocketChannel's
1.187 - // buffer is full, so we stop writing to until the next event.
1.188 - break;
1.189 - }
1.190 - else
1.191 - {
1.192 - // Retrieve next buffer if available; method may return the same
1.193 - // buffer instance if it still have some bytes remaining
1.194 - buf = connection.getOutputBuffer();
1.195 - }
1.196 - }
1.197 - }
1.198 - }
1.199 - else
1.200 - {
1.201 - Log.get().warning("Invalid OP_WRITE key: " + selKey);
1.202 -
1.203 - if(socketChannel.socket().isClosed())
1.204 - {
1.205 - connection.shutdownInput();
1.206 - connection.shutdownOutput();
1.207 - socketChannel.close();
1.208 - Log.get().info("Connection closed.");
1.209 - }
1.210 - }
1.211 - }
1.212 -
1.213 -}