chris@1: /* chris@1: * SONEWS News Server chris@1: * see AUTHORS for the list of contributors chris@1: * chris@1: * This program is free software: you can redistribute it and/or modify chris@1: * it under the terms of the GNU General Public License as published by chris@1: * the Free Software Foundation, either version 3 of the License, or chris@1: * (at your option) any later version. chris@1: * chris@1: * This program is distributed in the hope that it will be useful, chris@1: * but WITHOUT ANY WARRANTY; without even the implied warranty of chris@1: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the chris@1: * GNU General Public License for more details. chris@1: * chris@1: * You should have received a copy of the GNU General Public License chris@1: * along with this program. If not, see . chris@1: */ chris@1: chris@1: package org.sonews.feed; chris@1: chris@1: import java.io.BufferedReader; chris@1: import java.io.IOException; chris@1: import java.io.InputStreamReader; chris@1: import java.io.PrintWriter; chris@1: import java.net.Socket; chris@1: import java.net.SocketException; chris@1: import java.net.UnknownHostException; chris@1: import java.util.ArrayList; chris@1: import java.util.HashMap; cli@22: import java.util.HashSet; chris@1: import java.util.List; chris@1: import java.util.Map; cli@22: import java.util.Set; cli@22: import java.util.logging.Level; chris@3: import org.sonews.config.Config; cli@22: import org.sonews.daemon.AbstractDaemon; chris@1: import org.sonews.util.Log; chris@3: import org.sonews.storage.StorageBackendException; chris@3: import org.sonews.storage.StorageManager; chris@1: import org.sonews.util.Stats; chris@1: import org.sonews.util.io.ArticleReader; chris@1: import org.sonews.util.io.ArticleWriter; chris@1: chris@1: /** chris@1: * The PullFeeder class regularily checks another Newsserver for new chris@1: * messages. chris@1: * @author Christian Lins chris@1: * @since sonews/0.5.0 chris@1: */ cli@22: class PullFeeder extends AbstractDaemon chris@1: { cli@22: cli@37: private Map highMarks = new HashMap(); cli@37: private BufferedReader in; cli@37: private PrintWriter out; cli@37: private Set subscriptions = new HashSet(); chris@1: cli@37: private void addSubscription(final Subscription sub) cli@37: { cli@37: subscriptions.add(sub); cli@7: cli@37: if (!highMarks.containsKey(sub)) { cli@37: // Set a initial highMark cli@37: this.highMarks.put(sub, 0); cli@37: } cli@37: } chris@1: cli@37: /** cli@37: * Changes to the given group and returns its high mark. cli@37: * @param groupName cli@37: * @return cli@37: */ cli@37: private int changeGroup(String groupName) cli@37: throws IOException cli@37: { cli@37: this.out.print("GROUP " + groupName + "\r\n"); cli@37: this.out.flush(); cli@22: cli@37: String line = this.in.readLine(); cli@37: if (line.startsWith("211 ")) { cli@37: int highmark = Integer.parseInt(line.split(" ")[3]); cli@37: return highmark; cli@37: } else { cli@37: throw new IOException("GROUP " + groupName + " returned: " + line); cli@37: } cli@37: } chris@1: cli@37: private void connectTo(final String host, final int port) cli@37: throws IOException, UnknownHostException cli@37: { cli@37: Socket socket = new Socket(host, port); cli@37: this.out = new PrintWriter(socket.getOutputStream()); cli@37: this.in = new BufferedReader(new InputStreamReader(socket.getInputStream())); chris@1: cli@37: String line = in.readLine(); cli@37: if (!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed cli@37: { cli@37: throw new IOException(line); cli@37: } cli@37: cli@37: // Send MODE READER to peer, some newsservers are friendlier then cli@37: this.out.println("MODE READER\r\n"); cli@37: this.out.flush(); cli@37: line = this.in.readLine(); cli@37: } cli@37: cli@37: private void disconnect() cli@37: throws IOException cli@37: { cli@37: this.out.print("QUIT\r\n"); cli@37: this.out.flush(); cli@37: this.out.close(); cli@37: this.in.close(); cli@37: cli@37: this.out = null; cli@37: this.in = null; cli@37: } cli@37: cli@37: /** cli@37: * Uses the OVER or XOVER command to get a list of message overviews that cli@37: * may be unknown to this feeder and are about to be peered. cli@37: * @param start cli@37: * @param end cli@37: * @return A list of message ids with potentially interesting messages. cli@37: */ cli@37: private List over(int start, int end) cli@37: throws IOException cli@37: { cli@37: this.out.print("OVER " + start + "-" + end + "\r\n"); cli@37: this.out.flush(); cli@37: cli@37: String line = this.in.readLine(); cli@37: if (line.startsWith("500 ")) // OVER not supported cli@37: { cli@37: this.out.print("XOVER " + start + "-" + end + "\r\n"); cli@37: this.out.flush(); cli@37: cli@37: line = this.in.readLine(); cli@37: } cli@37: cli@37: if (line.startsWith("224 ")) { cli@37: List messages = new ArrayList(); cli@37: line = this.in.readLine(); cli@37: while (!".".equals(line)) { cli@37: String mid = line.split("\t")[4]; // 5th should be the Message-ID cli@37: messages.add(mid); cli@37: line = this.in.readLine(); cli@37: } cli@37: return messages; cli@37: } else { cli@37: throw new IOException("Server return for OVER/XOVER: " + line); cli@37: } cli@37: } cli@37: cli@37: @Override cli@37: public void run() cli@37: { cli@37: while (isRunning()) { cli@37: int pullInterval = 1000 cli@37: * Config.inst().get(Config.FEED_PULLINTERVAL, 3600); cli@37: String host = "localhost"; cli@37: int port = 119; cli@37: cli@37: Log.get().info("Start PullFeeder run..."); cli@37: cli@37: try { cli@37: this.subscriptions.clear(); cli@37: List subsPull = StorageManager.current().getSubscriptions(FeedManager.TYPE_PULL); cli@37: for (Subscription sub : subsPull) { cli@37: addSubscription(sub); cli@37: } cli@37: } catch (StorageBackendException ex) { cli@37: Log.get().log(Level.SEVERE, host, ex); cli@37: } cli@37: cli@37: try { cli@37: for (Subscription sub : this.subscriptions) { cli@37: host = sub.getHost(); cli@37: port = sub.getPort(); cli@37: cli@37: try { cli@37: Log.get().info("Feeding " + sub.getGroup() + " from " + sub.getHost()); cli@37: try { cli@37: connectTo(host, port); cli@37: } catch (SocketException ex) { cli@37: Log.get().info("Skipping " + sub.getHost() + ": " + ex); cli@37: continue; cli@37: } cli@37: cli@37: int oldMark = this.highMarks.get(sub); cli@37: int newMark = changeGroup(sub.getGroup()); cli@37: cli@37: if (oldMark != newMark) { cli@37: List messageIDs = over(oldMark, newMark); cli@37: cli@37: for (String messageID : messageIDs) { cli@37: if (!StorageManager.current().isArticleExisting(messageID)) { cli@37: try { cli@37: // Post the message via common socket connection cli@37: ArticleReader aread = cli@37: new ArticleReader(sub.getHost(), sub.getPort(), messageID); cli@37: byte[] abuf = aread.getArticleData(); cli@37: if (abuf == null) { cli@37: Log.get().warning("Could not feed " + messageID cli@37: + " from " + sub.getHost()); cli@37: } else { cli@37: Log.get().info("Feeding " + messageID); cli@37: ArticleWriter awrite = new ArticleWriter( cli@37: "localhost", Config.inst().get(Config.PORT, 119)); cli@37: awrite.writeArticle(abuf); cli@37: awrite.close(); cli@37: } cli@37: Stats.getInstance().mailFeeded(sub.getGroup()); cli@37: } catch (IOException ex) { cli@37: // There may be a temporary network failure cli@37: ex.printStackTrace(); cli@37: Log.get().warning("Skipping mail " + messageID + " due to exception."); cli@37: } cli@37: } cli@37: } // for(;;) cli@37: this.highMarks.put(sub, newMark); cli@37: } cli@37: cli@37: disconnect(); cli@37: } catch (StorageBackendException ex) { cli@37: ex.printStackTrace(); cli@37: } catch (IOException ex) { cli@37: ex.printStackTrace(); cli@37: Log.get().severe("PullFeeder run stopped due to exception."); cli@37: } cli@37: } // for(Subscription sub : subscriptions) cli@37: cli@37: Log.get().info("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s"); cli@37: Thread.sleep(pullInterval); cli@37: } catch (InterruptedException ex) { cli@37: Log.get().warning(ex.getMessage()); cli@37: } cli@37: } cli@37: } chris@1: }