View Javadoc
1   package us.codecraft.webmagic.pipeline;
2   
3   import us.codecraft.webmagic.MultiPageModel;
4   import us.codecraft.webmagic.ResultItems;
5   import us.codecraft.webmagic.Task;
6   import us.codecraft.webmagic.utils.Experimental;
7   import us.codecraft.webmagic.utils.DoubleKeyMap;
8   
9   import java.util.*;
10  import java.util.concurrent.ConcurrentHashMap;
11  
12  /**
13   * A pipeline combines the result in more than one page together.<br>
14   * Used for news and articles containing more than one web page. <br>
15   * MultiPagePipeline will store parts of object and output them when all parts are extracted.<br>
16   *
17   * @author code4crafter@gmail.com <br>
18   * @since 0.2.0
19   */
20  @Experimental
21  public class MultiPagePipeline implements Pipeline {
22  
23      private DoubleKeyMap<String, String, Boolean> pageMap = new DoubleKeyMap<String, String, Boolean>(ConcurrentHashMap.class);
24  
25      private DoubleKeyMap<String, String, MultiPageModel> objectMap = new DoubleKeyMap<String, String, MultiPageModel>(ConcurrentHashMap.class);
26  
27      @Override
28      public void process(ResultItems resultItems, Task task) {
29          Map<String, Object> resultItemsAll = resultItems.getAll();
30          Iterator<Map.Entry<String, Object>> iterator = resultItemsAll.entrySet().iterator();
31          while (iterator.hasNext()) {
32              handleObject(iterator);
33          }
34      }
35  
36      private void handleObject(Iterator<Map.Entry<String, Object>> iterator) {
37          Map.Entry<String, Object> objectEntry = iterator.next();
38          Object o = objectEntry.getValue();
39          //需要拼凑
40          if (o instanceof MultiPageModel) {
41              MultiPageModel multiPageModel = (MultiPageModel) o;
42              //这次处理的部分,设置为完成
43              pageMap.put(multiPageModel.getPageKey(), multiPageModel.getPage(), Boolean.FALSE);
44              //每个key单独加锁
45              synchronized (pageMap.get(multiPageModel.getPageKey())) {
46                  pageMap.put(multiPageModel.getPageKey(), multiPageModel.getPage(), Boolean.TRUE);
47                  //其他需要拼凑的部分
48                  if (multiPageModel.getOtherPages() != null) {
49                      for (String otherPage : multiPageModel.getOtherPages()) {
50                          Boolean aBoolean = pageMap.get(multiPageModel.getPageKey(), otherPage);
51                          if (aBoolean == null) {
52                              pageMap.put(multiPageModel.getPageKey(), otherPage, Boolean.FALSE);
53                          }
54                      }
55                  }
56                  //check if all pages are processed
57                  Map<String, Boolean> booleanMap = pageMap.get(multiPageModel.getPageKey());
58                  objectMap.put(multiPageModel.getPageKey(), multiPageModel.getPage(), multiPageModel);
59                  if (booleanMap == null) {
60                      return;
61                  }
62                  // /过滤,这次完成的page item中,还未拼凑完整的item,不进入下一个pipeline
63                  for (Map.Entry<String, Boolean> stringBooleanEntry : booleanMap.entrySet()) {
64                      if (!stringBooleanEntry.getValue()) {
65                          iterator.remove();
66                          return;
67                      }
68                  }
69                  List<Map.Entry<String, MultiPageModel>> entryList = new ArrayList<Map.Entry<String, MultiPageModel>>();
70                  entryList.addAll(objectMap.get(multiPageModel.getPageKey()).entrySet());
71                  if (entryList.size() != 0) {
72                      Collections.sort(entryList, new Comparator<Map.Entry<String, MultiPageModel>>() {
73                          @Override
74                          public int compare(Map.Entry<String, MultiPageModel> o1, Map.Entry<String, MultiPageModel> o2) {
75                              try {
76                                  int i1 = Integer.parseInt(o1.getKey());
77                                  int i2 = Integer.parseInt(o2.getKey());
78                                  return i1 - i2;
79                              } catch (NumberFormatException e) {
80                                  return o1.getKey().compareTo(o2.getKey());
81                              }
82                          }
83                      });
84                      // 合并
85                      MultiPageModel value = entryList.get(0).getValue();
86                      for (int i = 1; i < entryList.size(); i++) {
87                          value = value.combine(entryList.get(i).getValue());
88                      }
89                      objectEntry.setValue(value);
90                  }
91              }
92          }
93  
94      }
95  
96  }