1.1 --- a/src/org/sonews/feed/PullFeeder.java Sun Aug 29 17:28:58 2010 +0200
1.2 +++ b/src/org/sonews/feed/PullFeeder.java Mon Jun 06 20:12:21 2011 +0200
1.3 @@ -49,228 +49,192 @@
1.4 */
1.5 class PullFeeder extends AbstractDaemon
1.6 {
1.7 -
1.8 - private Map<Subscription, Integer> highMarks = new HashMap<Subscription, Integer>();
1.9 - private BufferedReader in;
1.10 - private PrintWriter out;
1.11 - private Set<Subscription> subscriptions = new HashSet<Subscription>();
1.12 -
1.13 - private void addSubscription(final Subscription sub)
1.14 - {
1.15 - subscriptions.add(sub);
1.16
1.17 - if(!highMarks.containsKey(sub))
1.18 - {
1.19 - // Set a initial highMark
1.20 - this.highMarks.put(sub, 0);
1.21 - }
1.22 - }
1.23 -
1.24 - /**
1.25 - * Changes to the given group and returns its high mark.
1.26 - * @param groupName
1.27 - * @return
1.28 - */
1.29 - private int changeGroup(String groupName)
1.30 - throws IOException
1.31 - {
1.32 - this.out.print("GROUP " + groupName + "\r\n");
1.33 - this.out.flush();
1.34 -
1.35 - String line = this.in.readLine();
1.36 - if(line.startsWith("211 "))
1.37 - {
1.38 - int highmark = Integer.parseInt(line.split(" ")[3]);
1.39 - return highmark;
1.40 - }
1.41 - else
1.42 - {
1.43 - throw new IOException("GROUP " + groupName + " returned: " + line);
1.44 - }
1.45 - }
1.46 -
1.47 - private void connectTo(final String host, final int port)
1.48 - throws IOException, UnknownHostException
1.49 - {
1.50 - Socket socket = new Socket(host, port);
1.51 - this.out = new PrintWriter(socket.getOutputStream());
1.52 - this.in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
1.53 + private Map<Subscription, Integer> highMarks = new HashMap<Subscription, Integer>();
1.54 + private BufferedReader in;
1.55 + private PrintWriter out;
1.56 + private Set<Subscription> subscriptions = new HashSet<Subscription>();
1.57
1.58 - String line = in.readLine();
1.59 - if(!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed
1.60 - {
1.61 - throw new IOException(line);
1.62 - }
1.63 + private void addSubscription(final Subscription sub)
1.64 + {
1.65 + subscriptions.add(sub);
1.66
1.67 - // Send MODE READER to peer, some newsservers are friendlier then
1.68 - this.out.println("MODE READER\r\n");
1.69 - this.out.flush();
1.70 - line = this.in.readLine();
1.71 - }
1.72 -
1.73 - private void disconnect()
1.74 - throws IOException
1.75 - {
1.76 - this.out.print("QUIT\r\n");
1.77 - this.out.flush();
1.78 - this.out.close();
1.79 - this.in.close();
1.80 -
1.81 - this.out = null;
1.82 - this.in = null;
1.83 - }
1.84 -
1.85 - /**
1.86 - * Uses the OVER or XOVER command to get a list of message overviews that
1.87 - * may be unknown to this feeder and are about to be peered.
1.88 - * @param start
1.89 - * @param end
1.90 - * @return A list of message ids with potentially interesting messages.
1.91 - */
1.92 - private List<String> over(int start, int end)
1.93 - throws IOException
1.94 - {
1.95 - this.out.print("OVER " + start + "-" + end + "\r\n");
1.96 - this.out.flush();
1.97 -
1.98 - String line = this.in.readLine();
1.99 - if(line.startsWith("500 ")) // OVER not supported
1.100 - {
1.101 - this.out.print("XOVER " + start + "-" + end + "\r\n");
1.102 - this.out.flush();
1.103 -
1.104 - line = this.in.readLine();
1.105 - }
1.106 -
1.107 - if(line.startsWith("224 "))
1.108 - {
1.109 - List<String> messages = new ArrayList<String>();
1.110 - line = this.in.readLine();
1.111 - while(!".".equals(line))
1.112 - {
1.113 - String mid = line.split("\t")[4]; // 5th should be the Message-ID
1.114 - messages.add(mid);
1.115 - line = this.in.readLine();
1.116 - }
1.117 - return messages;
1.118 - }
1.119 - else
1.120 - {
1.121 - throw new IOException("Server return for OVER/XOVER: " + line);
1.122 - }
1.123 - }
1.124 -
1.125 - @Override
1.126 - public void run()
1.127 - {
1.128 - while(isRunning())
1.129 - {
1.130 - int pullInterval = 1000 *
1.131 - Config.inst().get(Config.FEED_PULLINTERVAL, 3600);
1.132 - String host = "localhost";
1.133 - int port = 119;
1.134 -
1.135 - Log.get().info("Start PullFeeder run...");
1.136 + if (!highMarks.containsKey(sub)) {
1.137 + // Set a initial highMark
1.138 + this.highMarks.put(sub, 0);
1.139 + }
1.140 + }
1.141
1.142 - try
1.143 - {
1.144 - this.subscriptions.clear();
1.145 - List<Subscription> subsPull = StorageManager.current()
1.146 - .getSubscriptions(FeedManager.TYPE_PULL);
1.147 - for(Subscription sub : subsPull)
1.148 - {
1.149 - addSubscription(sub);
1.150 - }
1.151 - }
1.152 - catch(StorageBackendException ex)
1.153 - {
1.154 - Log.get().log(Level.SEVERE, host, ex);
1.155 - }
1.156 + /**
1.157 + * Changes to the given group and returns its high mark.
1.158 + * @param groupName
1.159 + * @return
1.160 + */
1.161 + private int changeGroup(String groupName)
1.162 + throws IOException
1.163 + {
1.164 + this.out.print("GROUP " + groupName + "\r\n");
1.165 + this.out.flush();
1.166
1.167 - try
1.168 - {
1.169 - for(Subscription sub : this.subscriptions)
1.170 - {
1.171 - host = sub.getHost();
1.172 - port = sub.getPort();
1.173 + String line = this.in.readLine();
1.174 + if (line.startsWith("211 ")) {
1.175 + int highmark = Integer.parseInt(line.split(" ")[3]);
1.176 + return highmark;
1.177 + } else {
1.178 + throw new IOException("GROUP " + groupName + " returned: " + line);
1.179 + }
1.180 + }
1.181
1.182 - try
1.183 - {
1.184 - Log.get().info("Feeding " + sub.getGroup() + " from " + sub.getHost());
1.185 - try
1.186 - {
1.187 - connectTo(host, port);
1.188 - }
1.189 - catch(SocketException ex)
1.190 - {
1.191 - Log.get().info("Skipping " + sub.getHost() + ": " + ex);
1.192 - continue;
1.193 - }
1.194 -
1.195 - int oldMark = this.highMarks.get(sub);
1.196 - int newMark = changeGroup(sub.getGroup());
1.197 -
1.198 - if(oldMark != newMark)
1.199 - {
1.200 - List<String> messageIDs = over(oldMark, newMark);
1.201 + private void connectTo(final String host, final int port)
1.202 + throws IOException, UnknownHostException
1.203 + {
1.204 + Socket socket = new Socket(host, port);
1.205 + this.out = new PrintWriter(socket.getOutputStream());
1.206 + this.in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
1.207
1.208 - for(String messageID : messageIDs)
1.209 - {
1.210 - if(!StorageManager.current().isArticleExisting(messageID))
1.211 - {
1.212 - try
1.213 - {
1.214 - // Post the message via common socket connection
1.215 - ArticleReader aread =
1.216 - new ArticleReader(sub.getHost(), sub.getPort(), messageID);
1.217 - byte[] abuf = aread.getArticleData();
1.218 - if(abuf == null)
1.219 - {
1.220 - Log.get().warning("Could not feed " + messageID
1.221 - + " from " + sub.getHost());
1.222 - }
1.223 - else
1.224 - {
1.225 - Log.get().info("Feeding " + messageID);
1.226 - ArticleWriter awrite = new ArticleWriter(
1.227 - "localhost", Config.inst().get(Config.PORT, 119));
1.228 - awrite.writeArticle(abuf);
1.229 - awrite.close();
1.230 - }
1.231 - Stats.getInstance().mailFeeded(sub.getGroup());
1.232 - }
1.233 - catch(IOException ex)
1.234 - {
1.235 - // There may be a temporary network failure
1.236 - ex.printStackTrace();
1.237 - Log.get().warning("Skipping mail " + messageID + " due to exception.");
1.238 - }
1.239 - }
1.240 - } // for(;;)
1.241 - this.highMarks.put(sub, newMark);
1.242 - }
1.243 -
1.244 - disconnect();
1.245 - }
1.246 - catch(StorageBackendException ex)
1.247 - {
1.248 - ex.printStackTrace();
1.249 - }
1.250 - catch(IOException ex)
1.251 - {
1.252 - ex.printStackTrace();
1.253 - Log.get().severe("PullFeeder run stopped due to exception.");
1.254 - }
1.255 - } // for(Subscription sub : subscriptions)
1.256 -
1.257 - Log.get().info("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s");
1.258 - Thread.sleep(pullInterval);
1.259 - }
1.260 - catch(InterruptedException ex)
1.261 - {
1.262 - Log.get().warning(ex.getMessage());
1.263 - }
1.264 - }
1.265 - }
1.266 -
1.267 + String line = in.readLine();
1.268 + if (!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed
1.269 + {
1.270 + throw new IOException(line);
1.271 + }
1.272 +
1.273 + // Send MODE READER to peer, some newsservers are friendlier then
1.274 + this.out.println("MODE READER\r\n");
1.275 + this.out.flush();
1.276 + line = this.in.readLine();
1.277 + }
1.278 +
1.279 + private void disconnect()
1.280 + throws IOException
1.281 + {
1.282 + this.out.print("QUIT\r\n");
1.283 + this.out.flush();
1.284 + this.out.close();
1.285 + this.in.close();
1.286 +
1.287 + this.out = null;
1.288 + this.in = null;
1.289 + }
1.290 +
1.291 + /**
1.292 + * Uses the OVER or XOVER command to get a list of message overviews that
1.293 + * may be unknown to this feeder and are about to be peered.
1.294 + * @param start
1.295 + * @param end
1.296 + * @return A list of message ids with potentially interesting messages.
1.297 + */
1.298 + private List<String> over(int start, int end)
1.299 + throws IOException
1.300 + {
1.301 + this.out.print("OVER " + start + "-" + end + "\r\n");
1.302 + this.out.flush();
1.303 +
1.304 + String line = this.in.readLine();
1.305 + if (line.startsWith("500 ")) // OVER not supported
1.306 + {
1.307 + this.out.print("XOVER " + start + "-" + end + "\r\n");
1.308 + this.out.flush();
1.309 +
1.310 + line = this.in.readLine();
1.311 + }
1.312 +
1.313 + if (line.startsWith("224 ")) {
1.314 + List<String> messages = new ArrayList<String>();
1.315 + line = this.in.readLine();
1.316 + while (!".".equals(line)) {
1.317 + String mid = line.split("\t")[4]; // 5th should be the Message-ID
1.318 + messages.add(mid);
1.319 + line = this.in.readLine();
1.320 + }
1.321 + return messages;
1.322 + } else {
1.323 + throw new IOException("Server return for OVER/XOVER: " + line);
1.324 + }
1.325 + }
1.326 +
1.327 + @Override
1.328 + public void run()
1.329 + {
1.330 + while (isRunning()) {
1.331 + int pullInterval = 1000
1.332 + * Config.inst().get(Config.FEED_PULLINTERVAL, 3600);
1.333 + String host = "localhost";
1.334 + int port = 119;
1.335 +
1.336 + Log.get().info("Start PullFeeder run...");
1.337 +
1.338 + try {
1.339 + this.subscriptions.clear();
1.340 + List<Subscription> subsPull = StorageManager.current().getSubscriptions(FeedManager.TYPE_PULL);
1.341 + for (Subscription sub : subsPull) {
1.342 + addSubscription(sub);
1.343 + }
1.344 + } catch (StorageBackendException ex) {
1.345 + Log.get().log(Level.SEVERE, host, ex);
1.346 + }
1.347 +
1.348 + try {
1.349 + for (Subscription sub : this.subscriptions) {
1.350 + host = sub.getHost();
1.351 + port = sub.getPort();
1.352 +
1.353 + try {
1.354 + Log.get().info("Feeding " + sub.getGroup() + " from " + sub.getHost());
1.355 + try {
1.356 + connectTo(host, port);
1.357 + } catch (SocketException ex) {
1.358 + Log.get().info("Skipping " + sub.getHost() + ": " + ex);
1.359 + continue;
1.360 + }
1.361 +
1.362 + int oldMark = this.highMarks.get(sub);
1.363 + int newMark = changeGroup(sub.getGroup());
1.364 +
1.365 + if (oldMark != newMark) {
1.366 + List<String> messageIDs = over(oldMark, newMark);
1.367 +
1.368 + for (String messageID : messageIDs) {
1.369 + if (!StorageManager.current().isArticleExisting(messageID)) {
1.370 + try {
1.371 + // Post the message via common socket connection
1.372 + ArticleReader aread =
1.373 + new ArticleReader(sub.getHost(), sub.getPort(), messageID);
1.374 + byte[] abuf = aread.getArticleData();
1.375 + if (abuf == null) {
1.376 + Log.get().warning("Could not feed " + messageID
1.377 + + " from " + sub.getHost());
1.378 + } else {
1.379 + Log.get().info("Feeding " + messageID);
1.380 + ArticleWriter awrite = new ArticleWriter(
1.381 + "localhost", Config.inst().get(Config.PORT, 119));
1.382 + awrite.writeArticle(abuf);
1.383 + awrite.close();
1.384 + }
1.385 + Stats.getInstance().mailFeeded(sub.getGroup());
1.386 + } catch (IOException ex) {
1.387 + // There may be a temporary network failure
1.388 + ex.printStackTrace();
1.389 + Log.get().warning("Skipping mail " + messageID + " due to exception.");
1.390 + }
1.391 + }
1.392 + } // for(;;)
1.393 + this.highMarks.put(sub, newMark);
1.394 + }
1.395 +
1.396 + disconnect();
1.397 + } catch (StorageBackendException ex) {
1.398 + ex.printStackTrace();
1.399 + } catch (IOException ex) {
1.400 + ex.printStackTrace();
1.401 + Log.get().severe("PullFeeder run stopped due to exception.");
1.402 + }
1.403 + } // for(Subscription sub : subscriptions)
1.404 +
1.405 + Log.get().info("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s");
1.406 + Thread.sleep(pullInterval);
1.407 + } catch (InterruptedException ex) {
1.408 + Log.get().warning(ex.getMessage());
1.409 + }
1.410 + }
1.411 + }
1.412 }