src/org/sonews/feed/PushFeeder.java
author Christian Lins <christian@lins.me>
Wed Sep 14 23:25:00 2011 +0200 (2011-09-14)
changeset 62 be4e87479855
parent 35 ed84c8bdd87b
permissions -rwxr-xr-x
Reformatting XDaemonCommand
chris@1
     1
/*
chris@1
     2
 *   SONEWS News Server
chris@1
     3
 *   see AUTHORS for the list of contributors
chris@1
     4
 *
chris@1
     5
 *   This program is free software: you can redistribute it and/or modify
chris@1
     6
 *   it under the terms of the GNU General Public License as published by
chris@1
     7
 *   the Free Software Foundation, either version 3 of the License, or
chris@1
     8
 *   (at your option) any later version.
chris@1
     9
 *
chris@1
    10
 *   This program is distributed in the hope that it will be useful,
chris@1
    11
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1
    12
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1
    13
 *   GNU General Public License for more details.
chris@1
    14
 *
chris@1
    15
 *   You should have received a copy of the GNU General Public License
chris@1
    16
 *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1
    17
 */
chris@1
    18
chris@1
    19
package org.sonews.feed;
chris@1
    20
chris@1
    21
import java.io.IOException;
cli@22
    22
import java.util.List;
chris@1
    23
import java.util.concurrent.ConcurrentLinkedQueue;
cli@22
    24
import org.sonews.daemon.AbstractDaemon;
chris@3
    25
import org.sonews.storage.Article;
chris@3
    26
import org.sonews.storage.Headers;
cli@22
    27
import org.sonews.storage.StorageBackendException;
cli@22
    28
import org.sonews.storage.StorageManager;
chris@1
    29
import org.sonews.util.Log;
chris@1
    30
import org.sonews.util.io.ArticleWriter;
chris@1
    31
chris@1
    32
/**
chris@1
    33
 * Pushes new articles to remote newsservers. This feeder sleeps until a new
chris@1
    34
 * message is posted to the sonews instance.
chris@1
    35
 * @author Christian Lins
chris@1
    36
 * @since sonews/0.5.0
chris@1
    37
 */
cli@22
    38
class PushFeeder extends AbstractDaemon
chris@1
    39
{
cli@22
    40
cli@37
    41
	private ConcurrentLinkedQueue<Article> articleQueue =
cli@37
    42
		new ConcurrentLinkedQueue<Article>();
chris@1
    43
cli@37
    44
	@Override
cli@37
    45
	public void run()
cli@37
    46
	{
cli@37
    47
		while (isRunning()) {
cli@37
    48
			try {
cli@37
    49
				synchronized (this) {
cli@37
    50
					this.wait();
cli@37
    51
				}
cli@37
    52
cli@37
    53
				List<Subscription> subscriptions = StorageManager.current().getSubscriptions(FeedManager.TYPE_PUSH);
cli@37
    54
cli@37
    55
				Article article = this.articleQueue.poll();
cli@37
    56
				String[] groups = article.getHeader(Headers.NEWSGROUPS)[0].split(",");
cli@37
    57
				Log.get().info("PushFeed: " + article.getMessageID());
cli@37
    58
				for (Subscription sub : subscriptions) {
cli@37
    59
					// Circle check
cli@37
    60
					if (article.getHeader(Headers.PATH)[0].contains(sub.getHost())) {
cli@37
    61
						Log.get().info(article.getMessageID() + " skipped for host "
cli@37
    62
							+ sub.getHost());
cli@37
    63
						continue;
cli@37
    64
					}
cli@37
    65
cli@37
    66
					try {
cli@37
    67
						for (String group : groups) {
cli@37
    68
							if (sub.getGroup().equals(group)) {
cli@37
    69
								// Delete headers that may cause problems
cli@37
    70
								article.removeHeader(Headers.NNTP_POSTING_DATE);
cli@37
    71
								article.removeHeader(Headers.NNTP_POSTING_HOST);
cli@37
    72
								article.removeHeader(Headers.X_COMPLAINTS_TO);
cli@37
    73
								article.removeHeader(Headers.X_TRACE);
cli@37
    74
								article.removeHeader(Headers.XREF);
cli@37
    75
cli@37
    76
								// POST the message to remote server
cli@37
    77
								ArticleWriter awriter = new ArticleWriter(sub.getHost(), sub.getPort());
cli@37
    78
								awriter.writeArticle(article);
cli@37
    79
								break;
cli@37
    80
							}
cli@37
    81
						}
cli@37
    82
					} catch (IOException ex) {
cli@37
    83
						Log.get().warning(ex.toString());
cli@37
    84
					}
cli@37
    85
				}
cli@37
    86
			} catch (StorageBackendException ex) {
cli@37
    87
				Log.get().severe(ex.toString());
cli@37
    88
			} catch (InterruptedException ex) {
cli@37
    89
				Log.get().warning("PushFeeder interrupted: " + ex);
cli@37
    90
			}
cli@37
    91
		}
cli@37
    92
	}
cli@37
    93
cli@37
    94
	public void queueForPush(Article article)
cli@37
    95
	{
cli@37
    96
		this.articleQueue.add(article);
cli@37
    97
		synchronized (this) {
cli@37
    98
			this.notifyAll();
cli@37
    99
		}
cli@37
   100
	}
chris@1
   101
}