chris@1: /*
chris@1:  *   SONEWS News Server
chris@1:  *   see AUTHORS for the list of contributors
chris@1:  *
chris@1:  *   This program is free software: you can redistribute it and/or modify
chris@1:  *   it under the terms of the GNU General Public License as published by
chris@1:  *   the Free Software Foundation, either version 3 of the License, or
chris@1:  *   (at your option) any later version.
chris@1:  *
chris@1:  *   This program is distributed in the hope that it will be useful,
chris@1:  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1:  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1:  *   GNU General Public License for more details.
chris@1:  *
chris@1:  *   You should have received a copy of the GNU General Public License
chris@1:  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1:  */
chris@1: 
chris@1: package org.sonews.feed;
chris@1: 
chris@1: import java.io.IOException;
cli@22: import java.util.List;
chris@1: import java.util.concurrent.ConcurrentLinkedQueue;
cli@22: import org.sonews.daemon.AbstractDaemon;
chris@3: import org.sonews.storage.Article;
chris@3: import org.sonews.storage.Headers;
cli@22: import org.sonews.storage.StorageBackendException;
cli@22: import org.sonews.storage.StorageManager;
chris@1: import org.sonews.util.Log;
chris@1: import org.sonews.util.io.ArticleWriter;
chris@1: 
chris@1: /**
chris@1:  * Pushes new articles to remote newsservers. This feeder sleeps until a new
chris@1:  * message is posted to the sonews instance.
chris@1:  * @author Christian Lins
chris@1:  * @since sonews/0.5.0
chris@1:  */
cli@22: class PushFeeder extends AbstractDaemon
chris@1: {
cli@22: 
cli@37: 	private ConcurrentLinkedQueue<Article> articleQueue =
cli@37: 		new ConcurrentLinkedQueue<Article>();
chris@1: 
cli@37: 	@Override
cli@37: 	public void run()
cli@37: 	{
cli@37: 		while (isRunning()) {
cli@37: 			try {
cli@37: 				synchronized (this) {
cli@37: 					this.wait();
cli@37: 				}
cli@37: 
cli@37: 				List<Subscription> subscriptions = StorageManager.current().getSubscriptions(FeedManager.TYPE_PUSH);
cli@37: 
cli@37: 				Article article = this.articleQueue.poll();
cli@37: 				String[] groups = article.getHeader(Headers.NEWSGROUPS)[0].split(",");
cli@37: 				Log.get().info("PushFeed: " + article.getMessageID());
cli@37: 				for (Subscription sub : subscriptions) {
cli@37: 					// Circle check
cli@37: 					if (article.getHeader(Headers.PATH)[0].contains(sub.getHost())) {
cli@37: 						Log.get().info(article.getMessageID() + " skipped for host "
cli@37: 							+ sub.getHost());
cli@37: 						continue;
cli@37: 					}
cli@37: 
cli@37: 					try {
cli@37: 						for (String group : groups) {
cli@37: 							if (sub.getGroup().equals(group)) {
cli@37: 								// Delete headers that may cause problems
cli@37: 								article.removeHeader(Headers.NNTP_POSTING_DATE);
cli@37: 								article.removeHeader(Headers.NNTP_POSTING_HOST);
cli@37: 								article.removeHeader(Headers.X_COMPLAINTS_TO);
cli@37: 								article.removeHeader(Headers.X_TRACE);
cli@37: 								article.removeHeader(Headers.XREF);
cli@37: 
cli@37: 								// POST the message to remote server
cli@37: 								ArticleWriter awriter = new ArticleWriter(sub.getHost(), sub.getPort());
cli@37: 								awriter.writeArticle(article);
cli@37: 								break;
cli@37: 							}
cli@37: 						}
cli@37: 					} catch (IOException ex) {
cli@37: 						Log.get().warning(ex.toString());
cli@37: 					}
cli@37: 				}
cli@37: 			} catch (StorageBackendException ex) {
cli@37: 				Log.get().severe(ex.toString());
cli@37: 			} catch (InterruptedException ex) {
cli@37: 				Log.get().warning("PushFeeder interrupted: " + ex);
cli@37: 			}
cli@37: 		}
cli@37: 	}
cli@37: 
cli@37: 	public void queueForPush(Article article)
cli@37: 	{
cli@37: 		this.articleQueue.add(article);
cli@37: 		synchronized (this) {
cli@37: 			this.notifyAll();
cli@37: 		}
cli@37: 	}
chris@1: }