chris@1: /* chris@1: * SONEWS News Server chris@1: * see AUTHORS for the list of contributors chris@1: * chris@1: * This program is free software: you can redistribute it and/or modify chris@1: * it under the terms of the GNU General Public License as published by chris@1: * the Free Software Foundation, either version 3 of the License, or chris@1: * (at your option) any later version. chris@1: * chris@1: * This program is distributed in the hope that it will be useful, chris@1: * but WITHOUT ANY WARRANTY; without even the implied warranty of chris@1: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the chris@1: * GNU General Public License for more details. chris@1: * chris@1: * You should have received a copy of the GNU General Public License chris@1: * along with this program. If not, see <http://www.gnu.org/licenses/>. chris@1: */ chris@1: chris@1: package org.sonews.feed; chris@1: chris@1: import java.io.BufferedReader; chris@1: import java.io.IOException; chris@1: import java.io.InputStreamReader; chris@1: import java.io.PrintWriter; chris@1: import java.net.Socket; chris@1: import java.net.SocketException; chris@1: import java.net.UnknownHostException; chris@1: import java.util.ArrayList; chris@1: import java.util.HashMap; chris@1: import java.util.List; chris@1: import java.util.Map; chris@3: import org.sonews.config.Config; chris@1: import org.sonews.util.Log; chris@3: import org.sonews.storage.StorageBackendException; chris@3: import org.sonews.storage.StorageManager; chris@1: import org.sonews.util.Stats; chris@1: import org.sonews.util.io.ArticleReader; chris@1: import org.sonews.util.io.ArticleWriter; chris@1: chris@1: /** chris@1: * The PullFeeder class regularily checks another Newsserver for new chris@1: * messages. chris@1: * @author Christian Lins chris@1: * @since sonews/0.5.0 chris@1: */ chris@1: class PullFeeder extends AbstractFeeder chris@1: { chris@1: chris@1: private Map<Subscription, Integer> highMarks = new HashMap<Subscription, Integer>(); chris@1: private BufferedReader in; chris@1: private PrintWriter out; chris@1: chris@1: @Override chris@1: public void addSubscription(final Subscription sub) chris@1: { chris@1: super.addSubscription(sub); chris@1: chris@1: // Set a initial highMark chris@1: this.highMarks.put(sub, 0); chris@1: } chris@1: chris@1: /** chris@1: * Changes to the given group and returns its high mark. chris@1: * @param groupName chris@1: * @return chris@1: */ chris@1: private int changeGroup(String groupName) chris@1: throws IOException chris@1: { chris@1: this.out.print("GROUP " + groupName + "\r\n"); chris@1: this.out.flush(); chris@1: chris@1: String line = this.in.readLine(); chris@1: if(line.startsWith("211 ")) chris@1: { chris@1: int highmark = Integer.parseInt(line.split(" ")[3]); chris@1: return highmark; chris@1: } chris@1: else chris@1: { chris@1: throw new IOException("GROUP " + groupName + " returned: " + line); chris@1: } chris@1: } chris@1: chris@1: private void connectTo(final String host, final int port) chris@1: throws IOException, UnknownHostException chris@1: { chris@1: Socket socket = new Socket(host, port); chris@1: this.out = new PrintWriter(socket.getOutputStream()); chris@1: this.in = new BufferedReader(new InputStreamReader(socket.getInputStream())); chris@1: chris@1: String line = in.readLine(); chris@1: if(!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed chris@1: { chris@1: throw new IOException(line); chris@1: } chris@1: } chris@1: chris@1: private void disconnect() chris@1: throws IOException chris@1: { chris@1: this.out.print("QUIT\r\n"); chris@1: this.out.flush(); chris@1: this.out.close(); chris@1: this.in.close(); chris@1: chris@1: this.out = null; chris@1: this.in = null; chris@1: } chris@1: chris@1: /** chris@1: * Uses the OVER or XOVER command to get a list of message overviews that chris@1: * may be unknown to this feeder and are about to be peered. chris@1: * @param start chris@1: * @param end chris@1: * @return A list of message ids with potentially interesting messages. chris@1: */ chris@1: private List<String> over(int start, int end) chris@1: throws IOException chris@1: { chris@1: this.out.print("OVER " + start + "-" + end + "\r\n"); chris@1: this.out.flush(); chris@1: chris@1: String line = this.in.readLine(); chris@1: if(line.startsWith("500 ")) // OVER not supported chris@1: { chris@1: this.out.print("XOVER " + start + "-" + end + "\r\n"); chris@1: this.out.flush(); chris@1: chris@1: line = this.in.readLine(); chris@1: } chris@1: chris@1: if(line.startsWith("224 ")) chris@1: { chris@1: List<String> messages = new ArrayList<String>(); chris@1: line = this.in.readLine(); chris@1: while(!".".equals(line)) chris@1: { chris@1: String mid = line.split("\t")[4]; // 5th should be the Message-ID chris@1: messages.add(mid); chris@1: line = this.in.readLine(); chris@1: } chris@1: return messages; chris@1: } chris@1: else chris@1: { chris@1: throw new IOException("Server return for OVER/XOVER: " + line); chris@1: } chris@1: } chris@1: chris@1: @Override chris@1: public void run() chris@1: { chris@1: while(isRunning()) chris@1: { chris@1: int pullInterval = 1000 * chris@3: Config.inst().get(Config.FEED_PULLINTERVAL, 3600); chris@1: String host = "localhost"; chris@1: int port = 119; chris@1: chris@1: Log.msg("Start PullFeeder run...", true); chris@1: chris@1: try chris@1: { chris@1: for(Subscription sub : this.subscriptions) chris@1: { chris@1: host = sub.getHost(); chris@1: port = sub.getPort(); chris@1: chris@1: try chris@1: { chris@1: Log.msg("Feeding " + sub.getGroup() + " from " + sub.getHost(), true); chris@1: try chris@1: { chris@1: connectTo(host, port); chris@1: } chris@1: catch(SocketException ex) chris@1: { chris@1: Log.msg("Skipping " + sub.getHost() + ": " + ex, false); chris@1: continue; chris@1: } chris@1: chris@1: int oldMark = this.highMarks.get(sub); chris@1: int newMark = changeGroup(sub.getGroup()); chris@1: chris@1: if(oldMark != newMark) chris@1: { chris@1: List<String> messageIDs = over(oldMark, newMark); chris@1: chris@1: for(String messageID : messageIDs) chris@1: { chris@3: if(!StorageManager.current().isArticleExisting(messageID)) chris@1: { chris@3: try chris@1: { chris@3: // Post the message via common socket connection chris@3: ArticleReader aread = chris@3: new ArticleReader(sub.getHost(), sub.getPort(), messageID); chris@3: byte[] abuf = aread.getArticleData(); chris@3: if (abuf == null) chris@3: { chris@3: Log.msg("Could not feed " + messageID + " from " + sub.getHost(), true); chris@3: } chris@3: else chris@3: { chris@3: Log.msg("Feeding " + messageID, true); chris@3: ArticleWriter awrite = new ArticleWriter( chris@3: "localhost", Config.inst().get(Config.PORT, 119)); chris@3: awrite.writeArticle(abuf); chris@3: awrite.close(); chris@3: } chris@3: Stats.getInstance().mailFeeded(sub.getGroup()); chris@1: } chris@3: catch(IOException ex) chris@1: { chris@3: // There may be a temporary network failure chris@3: ex.printStackTrace(); chris@3: Log.msg("Skipping mail " + messageID + " due to exception.", false); chris@1: } chris@1: } chris@1: } // for(;;) chris@1: this.highMarks.put(sub, newMark); chris@1: } chris@1: chris@1: disconnect(); chris@1: } chris@3: catch(StorageBackendException ex) chris@1: { chris@1: ex.printStackTrace(); chris@1: } chris@1: catch(IOException ex) chris@1: { chris@1: ex.printStackTrace(); chris@1: Log.msg("PullFeeder run stopped due to exception.", false); chris@1: } chris@1: } // for(Subscription sub : subscriptions) chris@1: chris@1: Log.msg("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s", true); chris@1: Thread.sleep(pullInterval); chris@1: } chris@1: catch(InterruptedException ex) chris@1: { chris@1: Log.msg(ex.getMessage(), false); chris@1: } chris@1: } chris@1: } chris@1: chris@1: }