org/sonews/feed/PullFeeder.java
author cli
Tue Apr 27 22:11:30 2010 +0200 (2010-04-27)
changeset 27 d879bab39600
parent 15 f2293e8566f5
permissions -rw-r--r--
Fix for #567 "mailinglist gateway does not recover after database outage".
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.feed;
    20 
    21 import java.io.BufferedReader;
    22 import java.io.IOException;
    23 import java.io.InputStreamReader;
    24 import java.io.PrintWriter;
    25 import java.net.Socket;
    26 import java.net.SocketException;
    27 import java.net.UnknownHostException;
    28 import java.util.ArrayList;
    29 import java.util.HashMap;
    30 import java.util.HashSet;
    31 import java.util.List;
    32 import java.util.Map;
    33 import java.util.Set;
    34 import java.util.logging.Level;
    35 import org.sonews.config.Config;
    36 import org.sonews.daemon.AbstractDaemon;
    37 import org.sonews.util.Log;
    38 import org.sonews.storage.StorageBackendException;
    39 import org.sonews.storage.StorageManager;
    40 import org.sonews.util.Stats;
    41 import org.sonews.util.io.ArticleReader;
    42 import org.sonews.util.io.ArticleWriter;
    43 
    44 /**
    45  * The PullFeeder class regularily checks another Newsserver for new
    46  * messages.
    47  * @author Christian Lins
    48  * @since sonews/0.5.0
    49  */
    50 class PullFeeder extends AbstractDaemon
    51 {
    52   
    53   private Map<Subscription, Integer> highMarks = new HashMap<Subscription, Integer>();
    54   private BufferedReader             in;
    55   private PrintWriter                out;
    56   private Set<Subscription>          subscriptions = new HashSet<Subscription>();
    57   
    58   private void addSubscription(final Subscription sub)
    59   {
    60     subscriptions.add(sub);
    61 
    62     if(!highMarks.containsKey(sub))
    63     {
    64       // Set a initial highMark
    65       this.highMarks.put(sub, 0);
    66     }
    67   }
    68   
    69   /**
    70    * Changes to the given group and returns its high mark.
    71    * @param groupName
    72    * @return
    73    */
    74   private int changeGroup(String groupName)
    75     throws IOException
    76   {
    77     this.out.print("GROUP " + groupName + "\r\n");
    78     this.out.flush();
    79     
    80     String line = this.in.readLine();
    81     if(line.startsWith("211 "))
    82     {
    83       int highmark = Integer.parseInt(line.split(" ")[3]);
    84       return highmark;
    85     }
    86     else
    87     {
    88       throw new IOException("GROUP " + groupName + " returned: " + line);
    89     }
    90   }
    91   
    92   private void connectTo(final String host, final int port)
    93     throws IOException, UnknownHostException
    94   {
    95     Socket socket = new Socket(host, port);
    96     this.out = new PrintWriter(socket.getOutputStream());
    97     this.in  = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    98 
    99     String line = in.readLine();
   100     if(!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed
   101     {
   102       throw new IOException(line);
   103     }
   104 
   105     // Send MODE READER to peer, some newsservers are friendlier then
   106     this.out.println("MODE READER\r\n");
   107     this.out.flush();
   108     line = this.in.readLine();
   109   }
   110   
   111   private void disconnect()
   112     throws IOException
   113   {
   114     this.out.print("QUIT\r\n");
   115     this.out.flush();
   116     this.out.close();
   117     this.in.close();
   118     
   119     this.out = null;
   120     this.in  = null;
   121   }
   122   
   123   /**
   124    * Uses the OVER or XOVER command to get a list of message overviews that
   125    * may be unknown to this feeder and are about to be peered.
   126    * @param start
   127    * @param end
   128    * @return A list of message ids with potentially interesting messages.
   129    */
   130   private List<String> over(int start, int end)
   131     throws IOException
   132   {
   133     this.out.print("OVER " + start + "-" + end + "\r\n");
   134     this.out.flush();
   135     
   136     String line = this.in.readLine();
   137     if(line.startsWith("500 ")) // OVER not supported
   138     {
   139       this.out.print("XOVER " + start + "-" + end + "\r\n");
   140       this.out.flush();
   141       
   142       line = this.in.readLine();
   143     }
   144     
   145     if(line.startsWith("224 "))
   146     {
   147       List<String> messages = new ArrayList<String>();
   148       line = this.in.readLine();
   149       while(!".".equals(line))
   150       {
   151         String mid = line.split("\t")[4]; // 5th should be the Message-ID
   152         messages.add(mid);
   153         line = this.in.readLine();
   154       }
   155       return messages;
   156     }
   157     else
   158     {
   159       throw new IOException("Server return for OVER/XOVER: " + line);
   160     }
   161   }
   162   
   163   @Override
   164   public void run()
   165   {
   166     while(isRunning())
   167     {
   168       int pullInterval = 1000 * 
   169         Config.inst().get(Config.FEED_PULLINTERVAL, 3600);
   170       String host = "localhost";
   171       int    port = 119;
   172       
   173       Log.get().info("Start PullFeeder run...");
   174 
   175       try
   176       {
   177         this.subscriptions.clear();
   178         List<Subscription> subsPull = StorageManager.current()
   179           .getSubscriptions(FeedManager.TYPE_PULL);
   180         for(Subscription sub : subsPull)
   181         {
   182           addSubscription(sub);
   183         }
   184       }
   185       catch(StorageBackendException ex)
   186       {
   187         Log.get().log(Level.SEVERE, host, ex);
   188       }
   189 
   190       try
   191       {
   192         for(Subscription sub : this.subscriptions)
   193         {
   194           host = sub.getHost();
   195           port = sub.getPort();
   196 
   197           try
   198           {
   199             Log.get().info("Feeding " + sub.getGroup() + " from " + sub.getHost());
   200             try
   201             {
   202               connectTo(host, port);
   203             }
   204             catch(SocketException ex)
   205             {
   206               Log.get().info("Skipping " + sub.getHost() + ": " + ex);
   207               continue;
   208             }
   209             
   210             int oldMark = this.highMarks.get(sub);
   211             int newMark = changeGroup(sub.getGroup());
   212             
   213             if(oldMark != newMark)
   214             {
   215               List<String> messageIDs = over(oldMark, newMark);
   216 
   217               for(String messageID : messageIDs)
   218               {
   219                 if(!StorageManager.current().isArticleExisting(messageID))
   220                 {
   221                   try
   222                   {
   223                     // Post the message via common socket connection
   224                     ArticleReader aread =
   225                       new ArticleReader(sub.getHost(), sub.getPort(), messageID);
   226                     byte[] abuf = aread.getArticleData();
   227                     if(abuf == null)
   228                     {
   229                       Log.get().warning("Could not feed " + messageID
   230                         + " from " + sub.getHost());
   231                     }
   232                     else
   233                     {
   234                       Log.get().info("Feeding " + messageID);
   235                       ArticleWriter awrite = new ArticleWriter(
   236                         "localhost", Config.inst().get(Config.PORT, 119));
   237                       awrite.writeArticle(abuf);
   238                       awrite.close();
   239                     }
   240                     Stats.getInstance().mailFeeded(sub.getGroup());
   241                   }
   242                   catch(IOException ex)
   243                   {
   244                     // There may be a temporary network failure
   245                     ex.printStackTrace();
   246                     Log.get().warning("Skipping mail " + messageID + " due to exception.");
   247                   }
   248                 }
   249               } // for(;;)
   250               this.highMarks.put(sub, newMark);
   251             }
   252             
   253             disconnect();
   254           }
   255           catch(StorageBackendException ex)
   256           {
   257             ex.printStackTrace();
   258           }
   259           catch(IOException ex)
   260           {
   261             ex.printStackTrace();
   262             Log.get().severe("PullFeeder run stopped due to exception.");
   263           }
   264         } // for(Subscription sub : subscriptions)
   265         
   266         Log.get().info("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s");
   267         Thread.sleep(pullInterval);
   268       }
   269       catch(InterruptedException ex)
   270       {
   271         Log.get().warning(ex.getMessage());
   272       }
   273     }
   274   }
   275   
   276 }