org/sonews/feed/PullFeeder.java
author cli
Fri Dec 25 15:42:46 2009 +0100 (2009-12-25)
changeset 25 dd05c3f2fa24
parent 15 f2293e8566f5
permissions -rw-r--r--
Fix for too early disconnects on slow client connections. (#563)
chris@1
     1
/*
chris@1
     2
 *   SONEWS News Server
chris@1
     3
 *   see AUTHORS for the list of contributors
chris@1
     4
 *
chris@1
     5
 *   This program is free software: you can redistribute it and/or modify
chris@1
     6
 *   it under the terms of the GNU General Public License as published by
chris@1
     7
 *   the Free Software Foundation, either version 3 of the License, or
chris@1
     8
 *   (at your option) any later version.
chris@1
     9
 *
chris@1
    10
 *   This program is distributed in the hope that it will be useful,
chris@1
    11
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1
    12
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1
    13
 *   GNU General Public License for more details.
chris@1
    14
 *
chris@1
    15
 *   You should have received a copy of the GNU General Public License
chris@1
    16
 *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1
    17
 */
chris@1
    18
chris@1
    19
package org.sonews.feed;
chris@1
    20
chris@1
    21
import java.io.BufferedReader;
chris@1
    22
import java.io.IOException;
chris@1
    23
import java.io.InputStreamReader;
chris@1
    24
import java.io.PrintWriter;
chris@1
    25
import java.net.Socket;
chris@1
    26
import java.net.SocketException;
chris@1
    27
import java.net.UnknownHostException;
chris@1
    28
import java.util.ArrayList;
chris@1
    29
import java.util.HashMap;
cli@22
    30
import java.util.HashSet;
chris@1
    31
import java.util.List;
chris@1
    32
import java.util.Map;
cli@22
    33
import java.util.Set;
cli@22
    34
import java.util.logging.Level;
chris@3
    35
import org.sonews.config.Config;
cli@22
    36
import org.sonews.daemon.AbstractDaemon;
chris@1
    37
import org.sonews.util.Log;
chris@3
    38
import org.sonews.storage.StorageBackendException;
chris@3
    39
import org.sonews.storage.StorageManager;
chris@1
    40
import org.sonews.util.Stats;
chris@1
    41
import org.sonews.util.io.ArticleReader;
chris@1
    42
import org.sonews.util.io.ArticleWriter;
chris@1
    43
chris@1
    44
/**
chris@1
    45
 * The PullFeeder class regularily checks another Newsserver for new
chris@1
    46
 * messages.
chris@1
    47
 * @author Christian Lins
chris@1
    48
 * @since sonews/0.5.0
chris@1
    49
 */
cli@22
    50
class PullFeeder extends AbstractDaemon
chris@1
    51
{
chris@1
    52
  
chris@1
    53
  private Map<Subscription, Integer> highMarks = new HashMap<Subscription, Integer>();
chris@1
    54
  private BufferedReader             in;
chris@1
    55
  private PrintWriter                out;
cli@22
    56
  private Set<Subscription>          subscriptions = new HashSet<Subscription>();
chris@1
    57
  
cli@22
    58
  private void addSubscription(final Subscription sub)
chris@1
    59
  {
cli@22
    60
    subscriptions.add(sub);
cli@22
    61
cli@22
    62
    if(!highMarks.containsKey(sub))
cli@22
    63
    {
cli@22
    64
      // Set a initial highMark
cli@22
    65
      this.highMarks.put(sub, 0);
cli@22
    66
    }
chris@1
    67
  }
chris@1
    68
  
chris@1
    69
  /**
chris@1
    70
   * Changes to the given group and returns its high mark.
chris@1
    71
   * @param groupName
chris@1
    72
   * @return
chris@1
    73
   */
chris@1
    74
  private int changeGroup(String groupName)
chris@1
    75
    throws IOException
chris@1
    76
  {
chris@1
    77
    this.out.print("GROUP " + groupName + "\r\n");
chris@1
    78
    this.out.flush();
chris@1
    79
    
chris@1
    80
    String line = this.in.readLine();
chris@1
    81
    if(line.startsWith("211 "))
chris@1
    82
    {
chris@1
    83
      int highmark = Integer.parseInt(line.split(" ")[3]);
chris@1
    84
      return highmark;
chris@1
    85
    }
chris@1
    86
    else
chris@1
    87
    {
chris@1
    88
      throw new IOException("GROUP " + groupName + " returned: " + line);
chris@1
    89
    }
chris@1
    90
  }
chris@1
    91
  
chris@1
    92
  private void connectTo(final String host, final int port)
chris@1
    93
    throws IOException, UnknownHostException
chris@1
    94
  {
chris@1
    95
    Socket socket = new Socket(host, port);
chris@1
    96
    this.out = new PrintWriter(socket.getOutputStream());
chris@1
    97
    this.in  = new BufferedReader(new InputStreamReader(socket.getInputStream()));
chris@1
    98
chris@1
    99
    String line = in.readLine();
chris@1
   100
    if(!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed
chris@1
   101
    {
chris@1
   102
      throw new IOException(line);
chris@1
   103
    }
cli@7
   104
cli@7
   105
    // Send MODE READER to peer, some newsservers are friendlier then
cli@7
   106
    this.out.println("MODE READER\r\n");
cli@7
   107
    this.out.flush();
cli@7
   108
    line = this.in.readLine();
chris@1
   109
  }
chris@1
   110
  
chris@1
   111
  private void disconnect()
chris@1
   112
    throws IOException
chris@1
   113
  {
chris@1
   114
    this.out.print("QUIT\r\n");
chris@1
   115
    this.out.flush();
chris@1
   116
    this.out.close();
chris@1
   117
    this.in.close();
chris@1
   118
    
chris@1
   119
    this.out = null;
chris@1
   120
    this.in  = null;
chris@1
   121
  }
chris@1
   122
  
chris@1
   123
  /**
chris@1
   124
   * Uses the OVER or XOVER command to get a list of message overviews that
chris@1
   125
   * may be unknown to this feeder and are about to be peered.
chris@1
   126
   * @param start
chris@1
   127
   * @param end
chris@1
   128
   * @return A list of message ids with potentially interesting messages.
chris@1
   129
   */
chris@1
   130
  private List<String> over(int start, int end)
chris@1
   131
    throws IOException
chris@1
   132
  {
chris@1
   133
    this.out.print("OVER " + start + "-" + end + "\r\n");
chris@1
   134
    this.out.flush();
chris@1
   135
    
chris@1
   136
    String line = this.in.readLine();
chris@1
   137
    if(line.startsWith("500 ")) // OVER not supported
chris@1
   138
    {
chris@1
   139
      this.out.print("XOVER " + start + "-" + end + "\r\n");
chris@1
   140
      this.out.flush();
chris@1
   141
      
chris@1
   142
      line = this.in.readLine();
chris@1
   143
    }
chris@1
   144
    
chris@1
   145
    if(line.startsWith("224 "))
chris@1
   146
    {
chris@1
   147
      List<String> messages = new ArrayList<String>();
chris@1
   148
      line = this.in.readLine();
chris@1
   149
      while(!".".equals(line))
chris@1
   150
      {
chris@1
   151
        String mid = line.split("\t")[4]; // 5th should be the Message-ID
chris@1
   152
        messages.add(mid);
chris@1
   153
        line = this.in.readLine();
chris@1
   154
      }
chris@1
   155
      return messages;
chris@1
   156
    }
chris@1
   157
    else
chris@1
   158
    {
chris@1
   159
      throw new IOException("Server return for OVER/XOVER: " + line);
chris@1
   160
    }
chris@1
   161
  }
chris@1
   162
  
chris@1
   163
  @Override
chris@1
   164
  public void run()
chris@1
   165
  {
chris@1
   166
    while(isRunning())
chris@1
   167
    {
chris@1
   168
      int pullInterval = 1000 * 
chris@3
   169
        Config.inst().get(Config.FEED_PULLINTERVAL, 3600);
chris@1
   170
      String host = "localhost";
chris@1
   171
      int    port = 119;
chris@1
   172
      
cli@15
   173
      Log.get().info("Start PullFeeder run...");
chris@1
   174
chris@1
   175
      try
chris@1
   176
      {
cli@22
   177
        this.subscriptions.clear();
cli@22
   178
        List<Subscription> subsPull = StorageManager.current()
cli@22
   179
          .getSubscriptions(FeedManager.TYPE_PULL);
cli@22
   180
        for(Subscription sub : subsPull)
cli@22
   181
        {
cli@22
   182
          addSubscription(sub);
cli@22
   183
        }
cli@22
   184
      }
cli@22
   185
      catch(StorageBackendException ex)
cli@22
   186
      {
cli@22
   187
        Log.get().log(Level.SEVERE, host, ex);
cli@22
   188
      }
cli@22
   189
cli@22
   190
      try
cli@22
   191
      {
chris@1
   192
        for(Subscription sub : this.subscriptions)
chris@1
   193
        {
chris@1
   194
          host = sub.getHost();
chris@1
   195
          port = sub.getPort();
chris@1
   196
chris@1
   197
          try
chris@1
   198
          {
cli@15
   199
            Log.get().info("Feeding " + sub.getGroup() + " from " + sub.getHost());
chris@1
   200
            try
chris@1
   201
            {
chris@1
   202
              connectTo(host, port);
chris@1
   203
            }
chris@1
   204
            catch(SocketException ex)
chris@1
   205
            {
cli@15
   206
              Log.get().info("Skipping " + sub.getHost() + ": " + ex);
chris@1
   207
              continue;
chris@1
   208
            }
chris@1
   209
            
chris@1
   210
            int oldMark = this.highMarks.get(sub);
chris@1
   211
            int newMark = changeGroup(sub.getGroup());
chris@1
   212
            
chris@1
   213
            if(oldMark != newMark)
chris@1
   214
            {
chris@1
   215
              List<String> messageIDs = over(oldMark, newMark);
chris@1
   216
chris@1
   217
              for(String messageID : messageIDs)
chris@1
   218
              {
chris@3
   219
                if(!StorageManager.current().isArticleExisting(messageID))
chris@1
   220
                {
chris@3
   221
                  try
chris@1
   222
                  {
chris@3
   223
                    // Post the message via common socket connection
chris@3
   224
                    ArticleReader aread =
chris@3
   225
                      new ArticleReader(sub.getHost(), sub.getPort(), messageID);
chris@3
   226
                    byte[] abuf = aread.getArticleData();
cli@15
   227
                    if(abuf == null)
chris@3
   228
                    {
cli@15
   229
                      Log.get().warning("Could not feed " + messageID
cli@15
   230
                        + " from " + sub.getHost());
chris@3
   231
                    }
chris@3
   232
                    else
chris@3
   233
                    {
cli@15
   234
                      Log.get().info("Feeding " + messageID);
chris@3
   235
                      ArticleWriter awrite = new ArticleWriter(
chris@3
   236
                        "localhost", Config.inst().get(Config.PORT, 119));
chris@3
   237
                      awrite.writeArticle(abuf);
chris@3
   238
                      awrite.close();
chris@3
   239
                    }
chris@3
   240
                    Stats.getInstance().mailFeeded(sub.getGroup());
chris@1
   241
                  }
chris@3
   242
                  catch(IOException ex)
chris@1
   243
                  {
chris@3
   244
                    // There may be a temporary network failure
chris@3
   245
                    ex.printStackTrace();
cli@15
   246
                    Log.get().warning("Skipping mail " + messageID + " due to exception.");
chris@1
   247
                  }
chris@1
   248
                }
chris@1
   249
              } // for(;;)
chris@1
   250
              this.highMarks.put(sub, newMark);
chris@1
   251
            }
chris@1
   252
            
chris@1
   253
            disconnect();
chris@1
   254
          }
chris@3
   255
          catch(StorageBackendException ex)
chris@1
   256
          {
chris@1
   257
            ex.printStackTrace();
chris@1
   258
          }
chris@1
   259
          catch(IOException ex)
chris@1
   260
          {
chris@1
   261
            ex.printStackTrace();
cli@15
   262
            Log.get().severe("PullFeeder run stopped due to exception.");
chris@1
   263
          }
chris@1
   264
        } // for(Subscription sub : subscriptions)
chris@1
   265
        
cli@15
   266
        Log.get().info("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s");
chris@1
   267
        Thread.sleep(pullInterval);
chris@1
   268
      }
chris@1
   269
      catch(InterruptedException ex)
chris@1
   270
      {
cli@15
   271
        Log.get().warning(ex.getMessage());
chris@1
   272
      }
chris@1
   273
    }
chris@1
   274
  }
chris@1
   275
  
chris@1
   276
}