chris@1: /*
chris@1:  *   SONEWS News Server
chris@1:  *   see AUTHORS for the list of contributors
chris@1:  *
chris@1:  *   This program is free software: you can redistribute it and/or modify
chris@1:  *   it under the terms of the GNU General Public License as published by
chris@1:  *   the Free Software Foundation, either version 3 of the License, or
chris@1:  *   (at your option) any later version.
chris@1:  *
chris@1:  *   This program is distributed in the hope that it will be useful,
chris@1:  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1:  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1:  *   GNU General Public License for more details.
chris@1:  *
chris@1:  *   You should have received a copy of the GNU General Public License
chris@1:  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1:  */
chris@1: 
chris@1: package org.sonews.feed;
chris@1: 
chris@1: import java.io.BufferedReader;
chris@1: import java.io.IOException;
chris@1: import java.io.InputStreamReader;
chris@1: import java.io.PrintWriter;
chris@1: import java.net.Socket;
chris@1: import java.net.SocketException;
chris@1: import java.net.UnknownHostException;
chris@1: import java.util.ArrayList;
chris@1: import java.util.HashMap;
chris@1: import java.util.List;
chris@1: import java.util.Map;
chris@3: import org.sonews.config.Config;
chris@1: import org.sonews.util.Log;
chris@3: import org.sonews.storage.StorageBackendException;
chris@3: import org.sonews.storage.StorageManager;
chris@1: import org.sonews.util.Stats;
chris@1: import org.sonews.util.io.ArticleReader;
chris@1: import org.sonews.util.io.ArticleWriter;
chris@1: 
chris@1: /**
chris@1:  * The PullFeeder class regularily checks another Newsserver for new
chris@1:  * messages.
chris@1:  * @author Christian Lins
chris@1:  * @since sonews/0.5.0
chris@1:  */
chris@1: class PullFeeder extends AbstractFeeder
chris@1: {
chris@1:   
chris@1:   private Map<Subscription, Integer> highMarks = new HashMap<Subscription, Integer>();
chris@1:   private BufferedReader             in;
chris@1:   private PrintWriter                out;
chris@1:   
chris@1:   @Override
chris@1:   public void addSubscription(final Subscription sub)
chris@1:   {
chris@1:     super.addSubscription(sub);
chris@1:     
chris@1:     // Set a initial highMark
chris@1:     this.highMarks.put(sub, 0);
chris@1:   }
chris@1:   
chris@1:   /**
chris@1:    * Changes to the given group and returns its high mark.
chris@1:    * @param groupName
chris@1:    * @return
chris@1:    */
chris@1:   private int changeGroup(String groupName)
chris@1:     throws IOException
chris@1:   {
chris@1:     this.out.print("GROUP " + groupName + "\r\n");
chris@1:     this.out.flush();
chris@1:     
chris@1:     String line = this.in.readLine();
chris@1:     if(line.startsWith("211 "))
chris@1:     {
chris@1:       int highmark = Integer.parseInt(line.split(" ")[3]);
chris@1:       return highmark;
chris@1:     }
chris@1:     else
chris@1:     {
chris@1:       throw new IOException("GROUP " + groupName + " returned: " + line);
chris@1:     }
chris@1:   }
chris@1:   
chris@1:   private void connectTo(final String host, final int port)
chris@1:     throws IOException, UnknownHostException
chris@1:   {
chris@1:     Socket socket = new Socket(host, port);
chris@1:     this.out = new PrintWriter(socket.getOutputStream());
chris@1:     this.in  = new BufferedReader(new InputStreamReader(socket.getInputStream()));
chris@1: 
chris@1:     String line = in.readLine();
chris@1:     if(!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed
chris@1:     {
chris@1:       throw new IOException(line);
chris@1:     }
chris@1:   }
chris@1:   
chris@1:   private void disconnect()
chris@1:     throws IOException
chris@1:   {
chris@1:     this.out.print("QUIT\r\n");
chris@1:     this.out.flush();
chris@1:     this.out.close();
chris@1:     this.in.close();
chris@1:     
chris@1:     this.out = null;
chris@1:     this.in  = null;
chris@1:   }
chris@1:   
chris@1:   /**
chris@1:    * Uses the OVER or XOVER command to get a list of message overviews that
chris@1:    * may be unknown to this feeder and are about to be peered.
chris@1:    * @param start
chris@1:    * @param end
chris@1:    * @return A list of message ids with potentially interesting messages.
chris@1:    */
chris@1:   private List<String> over(int start, int end)
chris@1:     throws IOException
chris@1:   {
chris@1:     this.out.print("OVER " + start + "-" + end + "\r\n");
chris@1:     this.out.flush();
chris@1:     
chris@1:     String line = this.in.readLine();
chris@1:     if(line.startsWith("500 ")) // OVER not supported
chris@1:     {
chris@1:       this.out.print("XOVER " + start + "-" + end + "\r\n");
chris@1:       this.out.flush();
chris@1:       
chris@1:       line = this.in.readLine();
chris@1:     }
chris@1:     
chris@1:     if(line.startsWith("224 "))
chris@1:     {
chris@1:       List<String> messages = new ArrayList<String>();
chris@1:       line = this.in.readLine();
chris@1:       while(!".".equals(line))
chris@1:       {
chris@1:         String mid = line.split("\t")[4]; // 5th should be the Message-ID
chris@1:         messages.add(mid);
chris@1:         line = this.in.readLine();
chris@1:       }
chris@1:       return messages;
chris@1:     }
chris@1:     else
chris@1:     {
chris@1:       throw new IOException("Server return for OVER/XOVER: " + line);
chris@1:     }
chris@1:   }
chris@1:   
chris@1:   @Override
chris@1:   public void run()
chris@1:   {
chris@1:     while(isRunning())
chris@1:     {
chris@1:       int pullInterval = 1000 * 
chris@3:         Config.inst().get(Config.FEED_PULLINTERVAL, 3600);
chris@1:       String host = "localhost";
chris@1:       int    port = 119;
chris@1:       
chris@1:       Log.msg("Start PullFeeder run...", true);
chris@1: 
chris@1:       try
chris@1:       {
chris@1:         for(Subscription sub : this.subscriptions)
chris@1:         {
chris@1:           host = sub.getHost();
chris@1:           port = sub.getPort();
chris@1: 
chris@1:           try
chris@1:           {
chris@1:             Log.msg("Feeding " + sub.getGroup() + " from " + sub.getHost(), true);
chris@1:             try
chris@1:             {
chris@1:               connectTo(host, port);
chris@1:             }
chris@1:             catch(SocketException ex)
chris@1:             {
chris@1:               Log.msg("Skipping " + sub.getHost() + ": " + ex, false);
chris@1:               continue;
chris@1:             }
chris@1:             
chris@1:             int oldMark = this.highMarks.get(sub);
chris@1:             int newMark = changeGroup(sub.getGroup());
chris@1:             
chris@1:             if(oldMark != newMark)
chris@1:             {
chris@1:               List<String> messageIDs = over(oldMark, newMark);
chris@1: 
chris@1:               for(String messageID : messageIDs)
chris@1:               {
chris@3:                 if(!StorageManager.current().isArticleExisting(messageID))
chris@1:                 {
chris@3:                   try
chris@1:                   {
chris@3:                     // Post the message via common socket connection
chris@3:                     ArticleReader aread =
chris@3:                       new ArticleReader(sub.getHost(), sub.getPort(), messageID);
chris@3:                     byte[] abuf = aread.getArticleData();
chris@3:                     if (abuf == null)
chris@3:                     {
chris@3:                       Log.msg("Could not feed " + messageID + " from " + sub.getHost(), true);
chris@3:                     }
chris@3:                     else
chris@3:                     {
chris@3:                       Log.msg("Feeding " + messageID, true);
chris@3:                       ArticleWriter awrite = new ArticleWriter(
chris@3:                         "localhost", Config.inst().get(Config.PORT, 119));
chris@3:                       awrite.writeArticle(abuf);
chris@3:                       awrite.close();
chris@3:                     }
chris@3:                     Stats.getInstance().mailFeeded(sub.getGroup());
chris@1:                   }
chris@3:                   catch(IOException ex)
chris@1:                   {
chris@3:                     // There may be a temporary network failure
chris@3:                     ex.printStackTrace();
chris@3:                     Log.msg("Skipping mail " + messageID + " due to exception.", false);
chris@1:                   }
chris@1:                 }
chris@1:               } // for(;;)
chris@1:               this.highMarks.put(sub, newMark);
chris@1:             }
chris@1:             
chris@1:             disconnect();
chris@1:           }
chris@3:           catch(StorageBackendException ex)
chris@1:           {
chris@1:             ex.printStackTrace();
chris@1:           }
chris@1:           catch(IOException ex)
chris@1:           {
chris@1:             ex.printStackTrace();
chris@1:             Log.msg("PullFeeder run stopped due to exception.", false);
chris@1:           }
chris@1:         } // for(Subscription sub : subscriptions)
chris@1:         
chris@1:         Log.msg("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s", true);
chris@1:         Thread.sleep(pullInterval);
chris@1:       }
chris@1:       catch(InterruptedException ex)
chris@1:       {
chris@1:         Log.msg(ex.getMessage(), false);
chris@1:       }
chris@1:     }
chris@1:   }
chris@1:   
chris@1: }