chris@1: /*
chris@1:  *   SONEWS News Server
chris@1:  *   see AUTHORS for the list of contributors
chris@1:  *
chris@1:  *   This program is free software: you can redistribute it and/or modify
chris@1:  *   it under the terms of the GNU General Public License as published by
chris@1:  *   the Free Software Foundation, either version 3 of the License, or
chris@1:  *   (at your option) any later version.
chris@1:  *
chris@1:  *   This program is distributed in the hope that it will be useful,
chris@1:  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1:  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1:  *   GNU General Public License for more details.
chris@1:  *
chris@1:  *   You should have received a copy of the GNU General Public License
chris@1:  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1:  */
chris@1: 
chris@1: package org.sonews.feed;
chris@1: 
chris@1: import java.io.BufferedReader;
chris@1: import java.io.IOException;
chris@1: import java.io.InputStreamReader;
chris@1: import java.io.PrintWriter;
chris@1: import java.net.Socket;
chris@1: import java.net.SocketException;
chris@1: import java.net.UnknownHostException;
chris@1: import java.util.ArrayList;
chris@1: import java.util.HashMap;
cli@22: import java.util.HashSet;
chris@1: import java.util.List;
chris@1: import java.util.Map;
cli@22: import java.util.Set;
cli@22: import java.util.logging.Level;
chris@3: import org.sonews.config.Config;
cli@22: import org.sonews.daemon.AbstractDaemon;
chris@1: import org.sonews.util.Log;
chris@3: import org.sonews.storage.StorageBackendException;
chris@3: import org.sonews.storage.StorageManager;
chris@1: import org.sonews.util.Stats;
chris@1: import org.sonews.util.io.ArticleReader;
chris@1: import org.sonews.util.io.ArticleWriter;
chris@1: 
chris@1: /**
chris@1:  * The PullFeeder class regularily checks another Newsserver for new
chris@1:  * messages.
chris@1:  * @author Christian Lins
chris@1:  * @since sonews/0.5.0
chris@1:  */
cli@22: class PullFeeder extends AbstractDaemon
chris@1: {
cli@22: 
cli@37: 	private Map<Subscription, Integer> highMarks = new HashMap<Subscription, Integer>();
cli@37: 	private BufferedReader in;
cli@37: 	private PrintWriter out;
cli@37: 	private Set<Subscription> subscriptions = new HashSet<Subscription>();
chris@1: 
cli@37: 	private void addSubscription(final Subscription sub)
cli@37: 	{
cli@37: 		subscriptions.add(sub);
cli@7: 
cli@37: 		if (!highMarks.containsKey(sub)) {
cli@37: 			// Set a initial highMark
cli@37: 			this.highMarks.put(sub, 0);
cli@37: 		}
cli@37: 	}
chris@1: 
cli@37: 	/**
cli@37: 	 * Changes to the given group and returns its high mark.
cli@37: 	 * @param groupName
cli@37: 	 * @return
cli@37: 	 */
cli@37: 	private int changeGroup(String groupName)
cli@37: 		throws IOException
cli@37: 	{
cli@37: 		this.out.print("GROUP " + groupName + "\r\n");
cli@37: 		this.out.flush();
cli@22: 
cli@37: 		String line = this.in.readLine();
cli@37: 		if (line.startsWith("211 ")) {
cli@37: 			int highmark = Integer.parseInt(line.split(" ")[3]);
cli@37: 			return highmark;
cli@37: 		} else {
cli@37: 			throw new IOException("GROUP " + groupName + " returned: " + line);
cli@37: 		}
cli@37: 	}
chris@1: 
cli@37: 	private void connectTo(final String host, final int port)
cli@37: 		throws IOException, UnknownHostException
cli@37: 	{
cli@37: 		Socket socket = new Socket(host, port);
cli@37: 		this.out = new PrintWriter(socket.getOutputStream());
cli@37: 		this.in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
chris@1: 
cli@37: 		String line = in.readLine();
cli@37: 		if (!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed
cli@37: 		{
cli@37: 			throw new IOException(line);
cli@37: 		}
cli@37: 
cli@37: 		// Send MODE READER to peer, some newsservers are friendlier then
cli@37: 		this.out.println("MODE READER\r\n");
cli@37: 		this.out.flush();
cli@37: 		line = this.in.readLine();
cli@37: 	}
cli@37: 
cli@37: 	private void disconnect()
cli@37: 		throws IOException
cli@37: 	{
cli@37: 		this.out.print("QUIT\r\n");
cli@37: 		this.out.flush();
cli@37: 		this.out.close();
cli@37: 		this.in.close();
cli@37: 
cli@37: 		this.out = null;
cli@37: 		this.in = null;
cli@37: 	}
cli@37: 
cli@37: 	/**
cli@37: 	 * Uses the OVER or XOVER command to get a list of message overviews that
cli@37: 	 * may be unknown to this feeder and are about to be peered.
cli@37: 	 * @param start
cli@37: 	 * @param end
cli@37: 	 * @return A list of message ids with potentially interesting messages.
cli@37: 	 */
cli@37: 	private List<String> over(int start, int end)
cli@37: 		throws IOException
cli@37: 	{
cli@37: 		this.out.print("OVER " + start + "-" + end + "\r\n");
cli@37: 		this.out.flush();
cli@37: 
cli@37: 		String line = this.in.readLine();
cli@37: 		if (line.startsWith("500 ")) // OVER not supported
cli@37: 		{
cli@37: 			this.out.print("XOVER " + start + "-" + end + "\r\n");
cli@37: 			this.out.flush();
cli@37: 
cli@37: 			line = this.in.readLine();
cli@37: 		}
cli@37: 
cli@37: 		if (line.startsWith("224 ")) {
cli@37: 			List<String> messages = new ArrayList<String>();
cli@37: 			line = this.in.readLine();
cli@37: 			while (!".".equals(line)) {
cli@37: 				String mid = line.split("\t")[4]; // 5th should be the Message-ID
cli@37: 				messages.add(mid);
cli@37: 				line = this.in.readLine();
cli@37: 			}
cli@37: 			return messages;
cli@37: 		} else {
cli@37: 			throw new IOException("Server return for OVER/XOVER: " + line);
cli@37: 		}
cli@37: 	}
cli@37: 
cli@37: 	@Override
cli@37: 	public void run()
cli@37: 	{
cli@37: 		while (isRunning()) {
cli@37: 			int pullInterval = 1000
cli@37: 				* Config.inst().get(Config.FEED_PULLINTERVAL, 3600);
cli@37: 			String host = "localhost";
cli@37: 			int port = 119;
cli@37: 
cli@37: 			Log.get().info("Start PullFeeder run...");
cli@37: 
cli@37: 			try {
cli@37: 				this.subscriptions.clear();
cli@37: 				List<Subscription> subsPull = StorageManager.current().getSubscriptions(FeedManager.TYPE_PULL);
cli@37: 				for (Subscription sub : subsPull) {
cli@37: 					addSubscription(sub);
cli@37: 				}
cli@37: 			} catch (StorageBackendException ex) {
cli@37: 				Log.get().log(Level.SEVERE, host, ex);
cli@37: 			}
cli@37: 
cli@37: 			try {
cli@37: 				for (Subscription sub : this.subscriptions) {
cli@37: 					host = sub.getHost();
cli@37: 					port = sub.getPort();
cli@37: 
cli@37: 					try {
cli@37: 						Log.get().info("Feeding " + sub.getGroup() + " from " + sub.getHost());
cli@37: 						try {
cli@37: 							connectTo(host, port);
cli@37: 						} catch (SocketException ex) {
cli@37: 							Log.get().info("Skipping " + sub.getHost() + ": " + ex);
cli@37: 							continue;
cli@37: 						}
cli@37: 
cli@37: 						int oldMark = this.highMarks.get(sub);
cli@37: 						int newMark = changeGroup(sub.getGroup());
cli@37: 
cli@37: 						if (oldMark != newMark) {
cli@37: 							List<String> messageIDs = over(oldMark, newMark);
cli@37: 
cli@37: 							for (String messageID : messageIDs) {
cli@37: 								if (!StorageManager.current().isArticleExisting(messageID)) {
cli@37: 									try {
cli@37: 										// Post the message via common socket connection
cli@37: 										ArticleReader aread =
cli@37: 											new ArticleReader(sub.getHost(), sub.getPort(), messageID);
cli@37: 										byte[] abuf = aread.getArticleData();
cli@37: 										if (abuf == null) {
cli@37: 											Log.get().warning("Could not feed " + messageID
cli@37: 												+ " from " + sub.getHost());
cli@37: 										} else {
cli@37: 											Log.get().info("Feeding " + messageID);
cli@37: 											ArticleWriter awrite = new ArticleWriter(
cli@37: 												"localhost", Config.inst().get(Config.PORT, 119));
cli@37: 											awrite.writeArticle(abuf);
cli@37: 											awrite.close();
cli@37: 										}
cli@37: 										Stats.getInstance().mailFeeded(sub.getGroup());
cli@37: 									} catch (IOException ex) {
cli@37: 										// There may be a temporary network failure
cli@37: 										ex.printStackTrace();
cli@37: 										Log.get().warning("Skipping mail " + messageID + " due to exception.");
cli@37: 									}
cli@37: 								}
cli@37: 							} // for(;;)
cli@37: 							this.highMarks.put(sub, newMark);
cli@37: 						}
cli@37: 
cli@37: 						disconnect();
cli@37: 					} catch (StorageBackendException ex) {
cli@37: 						ex.printStackTrace();
cli@37: 					} catch (IOException ex) {
cli@37: 						ex.printStackTrace();
cli@37: 						Log.get().severe("PullFeeder run stopped due to exception.");
cli@37: 					}
cli@37: 				} // for(Subscription sub : subscriptions)
cli@37: 
cli@37: 				Log.get().info("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s");
cli@37: 				Thread.sleep(pullInterval);
cli@37: 			} catch (InterruptedException ex) {
cli@37: 				Log.get().warning(ex.getMessage());
cli@37: 			}
cli@37: 		}
cli@37: 	}
chris@1: }