chris@1: /* chris@1: * SONEWS News Server chris@1: * see AUTHORS for the list of contributors chris@1: * chris@1: * This program is free software: you can redistribute it and/or modify chris@1: * it under the terms of the GNU General Public License as published by chris@1: * the Free Software Foundation, either version 3 of the License, or chris@1: * (at your option) any later version. chris@1: * chris@1: * This program is distributed in the hope that it will be useful, chris@1: * but WITHOUT ANY WARRANTY; without even the implied warranty of chris@1: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the chris@1: * GNU General Public License for more details. chris@1: * chris@1: * You should have received a copy of the GNU General Public License chris@1: * along with this program. If not, see . chris@1: */ chris@1: chris@1: package org.sonews.feed; chris@1: chris@1: import java.io.IOException; cli@22: import java.util.List; chris@1: import java.util.concurrent.ConcurrentLinkedQueue; cli@22: import org.sonews.daemon.AbstractDaemon; chris@3: import org.sonews.storage.Article; chris@3: import org.sonews.storage.Headers; cli@22: import org.sonews.storage.StorageBackendException; cli@22: import org.sonews.storage.StorageManager; chris@1: import org.sonews.util.Log; chris@1: import org.sonews.util.io.ArticleWriter; chris@1: chris@1: /** chris@1: * Pushes new articles to remote newsservers. This feeder sleeps until a new chris@1: * message is posted to the sonews instance. chris@1: * @author Christian Lins chris@1: * @since sonews/0.5.0 chris@1: */ cli@22: class PushFeeder extends AbstractDaemon chris@1: { cli@22: cli@37: private ConcurrentLinkedQueue
articleQueue = cli@37: new ConcurrentLinkedQueue
(); chris@1: cli@37: @Override cli@37: public void run() cli@37: { cli@37: while (isRunning()) { cli@37: try { cli@37: synchronized (this) { cli@37: this.wait(); cli@37: } cli@37: cli@37: List subscriptions = StorageManager.current().getSubscriptions(FeedManager.TYPE_PUSH); cli@37: cli@37: Article article = this.articleQueue.poll(); cli@37: String[] groups = article.getHeader(Headers.NEWSGROUPS)[0].split(","); cli@37: Log.get().info("PushFeed: " + article.getMessageID()); cli@37: for (Subscription sub : subscriptions) { cli@37: // Circle check cli@37: if (article.getHeader(Headers.PATH)[0].contains(sub.getHost())) { cli@37: Log.get().info(article.getMessageID() + " skipped for host " cli@37: + sub.getHost()); cli@37: continue; cli@37: } cli@37: cli@37: try { cli@37: for (String group : groups) { cli@37: if (sub.getGroup().equals(group)) { cli@37: // Delete headers that may cause problems cli@37: article.removeHeader(Headers.NNTP_POSTING_DATE); cli@37: article.removeHeader(Headers.NNTP_POSTING_HOST); cli@37: article.removeHeader(Headers.X_COMPLAINTS_TO); cli@37: article.removeHeader(Headers.X_TRACE); cli@37: article.removeHeader(Headers.XREF); cli@37: cli@37: // POST the message to remote server cli@37: ArticleWriter awriter = new ArticleWriter(sub.getHost(), sub.getPort()); cli@37: awriter.writeArticle(article); cli@37: break; cli@37: } cli@37: } cli@37: } catch (IOException ex) { cli@37: Log.get().warning(ex.toString()); cli@37: } cli@37: } cli@37: } catch (StorageBackendException ex) { cli@37: Log.get().severe(ex.toString()); cli@37: } catch (InterruptedException ex) { cli@37: Log.get().warning("PushFeeder interrupted: " + ex); cli@37: } cli@37: } cli@37: } cli@37: cli@37: public void queueForPush(Article article) cli@37: { cli@37: this.articleQueue.add(article); cli@37: synchronized (this) { cli@37: this.notifyAll(); cli@37: } cli@37: } chris@1: }