1.1 --- a/org/sonews/feed/PullFeeder.java Sun Aug 29 17:04:25 2010 +0200
1.2 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000
1.3 @@ -1,276 +0,0 @@
1.4 -/*
1.5 - * SONEWS News Server
1.6 - * see AUTHORS for the list of contributors
1.7 - *
1.8 - * This program is free software: you can redistribute it and/or modify
1.9 - * it under the terms of the GNU General Public License as published by
1.10 - * the Free Software Foundation, either version 3 of the License, or
1.11 - * (at your option) any later version.
1.12 - *
1.13 - * This program is distributed in the hope that it will be useful,
1.14 - * but WITHOUT ANY WARRANTY; without even the implied warranty of
1.15 - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1.16 - * GNU General Public License for more details.
1.17 - *
1.18 - * You should have received a copy of the GNU General Public License
1.19 - * along with this program. If not, see <http://www.gnu.org/licenses/>.
1.20 - */
1.21 -
1.22 -package org.sonews.feed;
1.23 -
1.24 -import java.io.BufferedReader;
1.25 -import java.io.IOException;
1.26 -import java.io.InputStreamReader;
1.27 -import java.io.PrintWriter;
1.28 -import java.net.Socket;
1.29 -import java.net.SocketException;
1.30 -import java.net.UnknownHostException;
1.31 -import java.util.ArrayList;
1.32 -import java.util.HashMap;
1.33 -import java.util.HashSet;
1.34 -import java.util.List;
1.35 -import java.util.Map;
1.36 -import java.util.Set;
1.37 -import java.util.logging.Level;
1.38 -import org.sonews.config.Config;
1.39 -import org.sonews.daemon.AbstractDaemon;
1.40 -import org.sonews.util.Log;
1.41 -import org.sonews.storage.StorageBackendException;
1.42 -import org.sonews.storage.StorageManager;
1.43 -import org.sonews.util.Stats;
1.44 -import org.sonews.util.io.ArticleReader;
1.45 -import org.sonews.util.io.ArticleWriter;
1.46 -
1.47 -/**
1.48 - * The PullFeeder class regularily checks another Newsserver for new
1.49 - * messages.
1.50 - * @author Christian Lins
1.51 - * @since sonews/0.5.0
1.52 - */
1.53 -class PullFeeder extends AbstractDaemon
1.54 -{
1.55 -
1.56 - private Map<Subscription, Integer> highMarks = new HashMap<Subscription, Integer>();
1.57 - private BufferedReader in;
1.58 - private PrintWriter out;
1.59 - private Set<Subscription> subscriptions = new HashSet<Subscription>();
1.60 -
1.61 - private void addSubscription(final Subscription sub)
1.62 - {
1.63 - subscriptions.add(sub);
1.64 -
1.65 - if(!highMarks.containsKey(sub))
1.66 - {
1.67 - // Set a initial highMark
1.68 - this.highMarks.put(sub, 0);
1.69 - }
1.70 - }
1.71 -
1.72 - /**
1.73 - * Changes to the given group and returns its high mark.
1.74 - * @param groupName
1.75 - * @return
1.76 - */
1.77 - private int changeGroup(String groupName)
1.78 - throws IOException
1.79 - {
1.80 - this.out.print("GROUP " + groupName + "\r\n");
1.81 - this.out.flush();
1.82 -
1.83 - String line = this.in.readLine();
1.84 - if(line.startsWith("211 "))
1.85 - {
1.86 - int highmark = Integer.parseInt(line.split(" ")[3]);
1.87 - return highmark;
1.88 - }
1.89 - else
1.90 - {
1.91 - throw new IOException("GROUP " + groupName + " returned: " + line);
1.92 - }
1.93 - }
1.94 -
1.95 - private void connectTo(final String host, final int port)
1.96 - throws IOException, UnknownHostException
1.97 - {
1.98 - Socket socket = new Socket(host, port);
1.99 - this.out = new PrintWriter(socket.getOutputStream());
1.100 - this.in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
1.101 -
1.102 - String line = in.readLine();
1.103 - if(!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed
1.104 - {
1.105 - throw new IOException(line);
1.106 - }
1.107 -
1.108 - // Send MODE READER to peer, some newsservers are friendlier then
1.109 - this.out.println("MODE READER\r\n");
1.110 - this.out.flush();
1.111 - line = this.in.readLine();
1.112 - }
1.113 -
1.114 - private void disconnect()
1.115 - throws IOException
1.116 - {
1.117 - this.out.print("QUIT\r\n");
1.118 - this.out.flush();
1.119 - this.out.close();
1.120 - this.in.close();
1.121 -
1.122 - this.out = null;
1.123 - this.in = null;
1.124 - }
1.125 -
1.126 - /**
1.127 - * Uses the OVER or XOVER command to get a list of message overviews that
1.128 - * may be unknown to this feeder and are about to be peered.
1.129 - * @param start
1.130 - * @param end
1.131 - * @return A list of message ids with potentially interesting messages.
1.132 - */
1.133 - private List<String> over(int start, int end)
1.134 - throws IOException
1.135 - {
1.136 - this.out.print("OVER " + start + "-" + end + "\r\n");
1.137 - this.out.flush();
1.138 -
1.139 - String line = this.in.readLine();
1.140 - if(line.startsWith("500 ")) // OVER not supported
1.141 - {
1.142 - this.out.print("XOVER " + start + "-" + end + "\r\n");
1.143 - this.out.flush();
1.144 -
1.145 - line = this.in.readLine();
1.146 - }
1.147 -
1.148 - if(line.startsWith("224 "))
1.149 - {
1.150 - List<String> messages = new ArrayList<String>();
1.151 - line = this.in.readLine();
1.152 - while(!".".equals(line))
1.153 - {
1.154 - String mid = line.split("\t")[4]; // 5th should be the Message-ID
1.155 - messages.add(mid);
1.156 - line = this.in.readLine();
1.157 - }
1.158 - return messages;
1.159 - }
1.160 - else
1.161 - {
1.162 - throw new IOException("Server return for OVER/XOVER: " + line);
1.163 - }
1.164 - }
1.165 -
1.166 - @Override
1.167 - public void run()
1.168 - {
1.169 - while(isRunning())
1.170 - {
1.171 - int pullInterval = 1000 *
1.172 - Config.inst().get(Config.FEED_PULLINTERVAL, 3600);
1.173 - String host = "localhost";
1.174 - int port = 119;
1.175 -
1.176 - Log.get().info("Start PullFeeder run...");
1.177 -
1.178 - try
1.179 - {
1.180 - this.subscriptions.clear();
1.181 - List<Subscription> subsPull = StorageManager.current()
1.182 - .getSubscriptions(FeedManager.TYPE_PULL);
1.183 - for(Subscription sub : subsPull)
1.184 - {
1.185 - addSubscription(sub);
1.186 - }
1.187 - }
1.188 - catch(StorageBackendException ex)
1.189 - {
1.190 - Log.get().log(Level.SEVERE, host, ex);
1.191 - }
1.192 -
1.193 - try
1.194 - {
1.195 - for(Subscription sub : this.subscriptions)
1.196 - {
1.197 - host = sub.getHost();
1.198 - port = sub.getPort();
1.199 -
1.200 - try
1.201 - {
1.202 - Log.get().info("Feeding " + sub.getGroup() + " from " + sub.getHost());
1.203 - try
1.204 - {
1.205 - connectTo(host, port);
1.206 - }
1.207 - catch(SocketException ex)
1.208 - {
1.209 - Log.get().info("Skipping " + sub.getHost() + ": " + ex);
1.210 - continue;
1.211 - }
1.212 -
1.213 - int oldMark = this.highMarks.get(sub);
1.214 - int newMark = changeGroup(sub.getGroup());
1.215 -
1.216 - if(oldMark != newMark)
1.217 - {
1.218 - List<String> messageIDs = over(oldMark, newMark);
1.219 -
1.220 - for(String messageID : messageIDs)
1.221 - {
1.222 - if(!StorageManager.current().isArticleExisting(messageID))
1.223 - {
1.224 - try
1.225 - {
1.226 - // Post the message via common socket connection
1.227 - ArticleReader aread =
1.228 - new ArticleReader(sub.getHost(), sub.getPort(), messageID);
1.229 - byte[] abuf = aread.getArticleData();
1.230 - if(abuf == null)
1.231 - {
1.232 - Log.get().warning("Could not feed " + messageID
1.233 - + " from " + sub.getHost());
1.234 - }
1.235 - else
1.236 - {
1.237 - Log.get().info("Feeding " + messageID);
1.238 - ArticleWriter awrite = new ArticleWriter(
1.239 - "localhost", Config.inst().get(Config.PORT, 119));
1.240 - awrite.writeArticle(abuf);
1.241 - awrite.close();
1.242 - }
1.243 - Stats.getInstance().mailFeeded(sub.getGroup());
1.244 - }
1.245 - catch(IOException ex)
1.246 - {
1.247 - // There may be a temporary network failure
1.248 - ex.printStackTrace();
1.249 - Log.get().warning("Skipping mail " + messageID + " due to exception.");
1.250 - }
1.251 - }
1.252 - } // for(;;)
1.253 - this.highMarks.put(sub, newMark);
1.254 - }
1.255 -
1.256 - disconnect();
1.257 - }
1.258 - catch(StorageBackendException ex)
1.259 - {
1.260 - ex.printStackTrace();
1.261 - }
1.262 - catch(IOException ex)
1.263 - {
1.264 - ex.printStackTrace();
1.265 - Log.get().severe("PullFeeder run stopped due to exception.");
1.266 - }
1.267 - } // for(Subscription sub : subscriptions)
1.268 -
1.269 - Log.get().info("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s");
1.270 - Thread.sleep(pullInterval);
1.271 - }
1.272 - catch(InterruptedException ex)
1.273 - {
1.274 - Log.get().warning(ex.getMessage());
1.275 - }
1.276 - }
1.277 - }
1.278 -
1.279 -}