org/sonews/feed/PullFeeder.java
author chris <chris@marvin>
Fri Jun 26 16:48:50 2009 +0200 (2009-06-26)
changeset 1 6fceb66e1ad7
child 3 2fdc9cc89502
permissions -rw-r--r--
Hooray... sonews/0.5.0 final

HG: Enter commit message. Lines beginning with 'HG:' are removed.
HG: Remove all lines to abort the collapse operation.
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.feed;
    20 
    21 import java.io.BufferedReader;
    22 import java.io.IOException;
    23 import java.io.InputStreamReader;
    24 import java.io.PrintWriter;
    25 import java.net.Socket;
    26 import java.net.SocketException;
    27 import java.net.UnknownHostException;
    28 import java.sql.SQLException;
    29 import java.util.ArrayList;
    30 import java.util.HashMap;
    31 import java.util.List;
    32 import java.util.Map;
    33 import org.sonews.daemon.Config;
    34 import org.sonews.util.Log;
    35 import org.sonews.daemon.storage.Database;
    36 import org.sonews.util.Stats;
    37 import org.sonews.util.io.ArticleReader;
    38 import org.sonews.util.io.ArticleWriter;
    39 
    40 /**
    41  * The PullFeeder class regularily checks another Newsserver for new
    42  * messages.
    43  * @author Christian Lins
    44  * @since sonews/0.5.0
    45  */
    46 class PullFeeder extends AbstractFeeder
    47 {
    48   
    49   private Map<Subscription, Integer> highMarks = new HashMap<Subscription, Integer>();
    50   private BufferedReader             in;
    51   private PrintWriter                out;
    52   
    53   @Override
    54   public void addSubscription(final Subscription sub)
    55   {
    56     super.addSubscription(sub);
    57     
    58     // Set a initial highMark
    59     this.highMarks.put(sub, 0);
    60   }
    61   
    62   /**
    63    * Changes to the given group and returns its high mark.
    64    * @param groupName
    65    * @return
    66    */
    67   private int changeGroup(String groupName)
    68     throws IOException
    69   {
    70     this.out.print("GROUP " + groupName + "\r\n");
    71     this.out.flush();
    72     
    73     String line = this.in.readLine();
    74     if(line.startsWith("211 "))
    75     {
    76       int highmark = Integer.parseInt(line.split(" ")[3]);
    77       return highmark;
    78     }
    79     else
    80     {
    81       throw new IOException("GROUP " + groupName + " returned: " + line);
    82     }
    83   }
    84   
    85   private void connectTo(final String host, final int port)
    86     throws IOException, UnknownHostException
    87   {
    88     Socket socket = new Socket(host, port);
    89     this.out = new PrintWriter(socket.getOutputStream());
    90     this.in  = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    91 
    92     String line = in.readLine();
    93     if(!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed
    94     {
    95       throw new IOException(line);
    96     }
    97   }
    98   
    99   private void disconnect()
   100     throws IOException
   101   {
   102     this.out.print("QUIT\r\n");
   103     this.out.flush();
   104     this.out.close();
   105     this.in.close();
   106     
   107     this.out = null;
   108     this.in  = null;
   109   }
   110   
   111   /**
   112    * Uses the OVER or XOVER command to get a list of message overviews that
   113    * may be unknown to this feeder and are about to be peered.
   114    * @param start
   115    * @param end
   116    * @return A list of message ids with potentially interesting messages.
   117    */
   118   private List<String> over(int start, int end)
   119     throws IOException
   120   {
   121     this.out.print("OVER " + start + "-" + end + "\r\n");
   122     this.out.flush();
   123     
   124     String line = this.in.readLine();
   125     if(line.startsWith("500 ")) // OVER not supported
   126     {
   127       this.out.print("XOVER " + start + "-" + end + "\r\n");
   128       this.out.flush();
   129       
   130       line = this.in.readLine();
   131     }
   132     
   133     if(line.startsWith("224 "))
   134     {
   135       List<String> messages = new ArrayList<String>();
   136       line = this.in.readLine();
   137       while(!".".equals(line))
   138       {
   139         String mid = line.split("\t")[4]; // 5th should be the Message-ID
   140         messages.add(mid);
   141         line = this.in.readLine();
   142       }
   143       return messages;
   144     }
   145     else
   146     {
   147       throw new IOException("Server return for OVER/XOVER: " + line);
   148     }
   149   }
   150   
   151   @Override
   152   public void run()
   153   {
   154     while(isRunning())
   155     {
   156       int pullInterval = 1000 * 
   157         Config.getInstance().get(Config.FEED_PULLINTERVAL, 3600);
   158       String host = "localhost";
   159       int    port = 119;
   160       
   161       Log.msg("Start PullFeeder run...", true);
   162 
   163       try
   164       {
   165         for(Subscription sub : this.subscriptions)
   166         {
   167           host = sub.getHost();
   168           port = sub.getPort();
   169 
   170           try
   171           {
   172             Log.msg("Feeding " + sub.getGroup() + " from " + sub.getHost(), true);
   173             try
   174             {
   175               connectTo(host, port);
   176             }
   177             catch(SocketException ex)
   178             {
   179               Log.msg("Skipping " + sub.getHost() + ": " + ex, false);
   180               continue;
   181             }
   182             
   183             int oldMark = this.highMarks.get(sub);
   184             int newMark = changeGroup(sub.getGroup());
   185             
   186             if(oldMark != newMark)
   187             {
   188               List<String> messageIDs = over(oldMark, newMark);
   189 
   190               for(String messageID : messageIDs)
   191               {
   192                 if(Database.getInstance().isArticleExisting(messageID))
   193                 {
   194                   continue;
   195                 }
   196 
   197                 try
   198                 {
   199                   // Post the message via common socket connection
   200                   ArticleReader aread =
   201                     new ArticleReader(sub.getHost(), sub.getPort(), messageID);
   202                   byte[] abuf = aread.getArticleData();
   203                   if (abuf == null)
   204                   {
   205                     Log.msg("Could not feed " + messageID + " from " + sub.getHost(), true);
   206                   }
   207                   else
   208                   {
   209                     Log.msg("Feeding " + messageID, true);
   210                     ArticleWriter awrite = new ArticleWriter(
   211                       "localhost", Config.getInstance().get(Config.PORT, 119));
   212                     awrite.writeArticle(abuf);
   213                     awrite.close();
   214                   }
   215                   Stats.getInstance().mailFeeded(sub.getGroup());
   216                 }
   217                 catch(IOException ex)
   218                 {
   219                   // There may be a temporary network failure
   220                   ex.printStackTrace();
   221                   Log.msg("Skipping mail " + messageID + " due to exception.", false);
   222                 }
   223               } // for(;;)
   224               this.highMarks.put(sub, newMark);
   225             }
   226             
   227             disconnect();
   228           }
   229           catch(SQLException ex)
   230           {
   231             ex.printStackTrace();
   232           }
   233           catch(IOException ex)
   234           {
   235             ex.printStackTrace();
   236             Log.msg("PullFeeder run stopped due to exception.", false);
   237           }
   238         } // for(Subscription sub : subscriptions)
   239         
   240         Log.msg("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s", true);
   241         Thread.sleep(pullInterval);
   242       }
   243       catch(InterruptedException ex)
   244       {
   245         Log.msg(ex.getMessage(), false);
   246       }
   247     }
   248   }
   249   
   250 }