View Javadoc
1   package us.codecraft.webmagic.scheduler;
2   
3   import java.io.BufferedReader;
4   import java.io.Closeable;
5   import java.io.File;
6   import java.io.FileNotFoundException;
7   import java.io.FileReader;
8   import java.io.FileWriter;
9   import java.io.IOException;
10  import java.io.PrintWriter;
11  import java.util.LinkedHashSet;
12  import java.util.Set;
13  import java.util.concurrent.BlockingQueue;
14  import java.util.concurrent.Executors;
15  import java.util.concurrent.LinkedBlockingQueue;
16  import java.util.concurrent.ScheduledExecutorService;
17  import java.util.concurrent.TimeUnit;
18  import java.util.concurrent.atomic.AtomicBoolean;
19  import java.util.concurrent.atomic.AtomicInteger;
20  
21  import org.apache.commons.io.IOUtils;
22  import org.apache.commons.lang3.math.NumberUtils;
23  
24  import us.codecraft.webmagic.Request;
25  import us.codecraft.webmagic.Task;
26  import us.codecraft.webmagic.scheduler.component.DuplicateRemover;
27  
28  
29  /**
30   * Store urls and cursor in files so that a Spider can resume the status when shutdown.<br>
31   *
32   * @author code4crafter@gmail.com <br>
33   * @since 0.2.0
34   */
35  public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler,Closeable {
36  
37      private String filePath = System.getProperty("java.io.tmpdir");
38  
39      private String fileUrlAllName = ".urls.txt";
40  
41      private Task task;
42  
43      private String fileCursor = ".cursor.txt";
44  
45      private PrintWriter fileUrlWriter;
46  
47      private PrintWriter fileCursorWriter;
48  
49      private AtomicInteger cursor = new AtomicInteger();
50  
51      private AtomicBoolean inited = new AtomicBoolean(false);
52  
53      private BlockingQueue<Request> queue;
54  
55      private Set<String> urls;
56      
57      private ScheduledExecutorService flushThreadPool;
58  
59      public FileCacheQueueScheduler(String filePath) {
60          if (!filePath.endsWith("/") && !filePath.endsWith("\\")) {
61              filePath += "/";
62          }
63          this.filePath = filePath;
64          initDuplicateRemover();
65      }
66  
67      private void flush() {
68          fileUrlWriter.flush();
69          fileCursorWriter.flush();
70      }
71  
72      private void init(Task task) {
73          this.task = task;
74          File file = new File(filePath);
75          if (!file.exists()) {
76              file.mkdirs();
77          }
78          readFile();
79          initWriter();
80          initFlushThread();
81          inited.set(true);
82          logger.info("init cache scheduler success");
83      }
84  
85      private void initDuplicateRemover() {
86          setDuplicateRemover(
87                  new DuplicateRemover() {
88                      @Override
89                      public boolean isDuplicate(Request request, Task task) {
90                          if (!inited.get()) {
91                              init(task);
92                          }
93                          return !urls.add(request.getUrl());
94                      }
95  
96                      @Override
97                      public void resetDuplicateCheck(Task task) {
98                          urls.clear();
99                      }
100 
101                     @Override
102                     public int getTotalRequestsCount(Task task) {
103                         return urls.size();
104                     }
105                 });
106     }
107 
108     private void initFlushThread() {
109     	flushThreadPool = Executors.newScheduledThreadPool(1);
110     	flushThreadPool.scheduleAtFixedRate(new Runnable() {
111             @Override
112             public void run() {
113                 flush();
114             }
115         }, 10, 10, TimeUnit.SECONDS);
116     }
117 
118     private void initWriter() {
119         try {
120             fileUrlWriter = new PrintWriter(new FileWriter(getFileName(fileUrlAllName), true));
121             fileCursorWriter = new PrintWriter(new FileWriter(getFileName(fileCursor), false));
122         } catch (IOException e) {
123             throw new RuntimeException("init cache scheduler error", e);
124         }
125     }
126 
127     private void readFile() {
128         try {
129             queue = new LinkedBlockingQueue<Request>();
130             urls = new LinkedHashSet<String>();
131             readCursorFile();
132             readUrlFile();
133             // initDuplicateRemover();
134         } catch (FileNotFoundException e) {
135             //init
136             logger.info("init cache file " + getFileName(fileUrlAllName));
137         } catch (IOException e) {
138             logger.error("init file error", e);
139         }
140     }
141 
142     private void readUrlFile() throws IOException {
143         String line;
144         BufferedReader fileUrlReader = null;
145         try {
146             fileUrlReader = new BufferedReader(new FileReader(getFileName(fileUrlAllName)));
147             int lineReaded = 0;
148             while ((line = fileUrlReader.readLine()) != null) {
149                 urls.add(line.trim());
150                 lineReaded++;
151                 if (lineReaded > cursor.get()) {
152                     queue.add(deserializeRequest(line));
153                 }
154             }
155         } finally {
156             if (fileUrlReader != null) {
157                 IOUtils.closeQuietly(fileUrlReader);
158             }
159         }
160     }
161 
162     private void readCursorFile() throws IOException {
163         BufferedReader fileCursorReader = null;
164         try {
165         	fileCursorReader = new BufferedReader(new FileReader(getFileName(fileCursor)));
166             String line;
167             //read the last number
168             while ((line = fileCursorReader.readLine()) != null) {
169                 cursor = new AtomicInteger(NumberUtils.toInt(line));
170             }
171         } finally {
172             if (fileCursorReader != null) {
173                 IOUtils.closeQuietly(fileCursorReader);
174             }
175         }
176     }
177     
178     public void close() throws IOException {
179 		flushThreadPool.shutdown();	
180 		fileUrlWriter.close();
181 		fileCursorWriter.close();
182 	}
183 
184     private String getFileName(String filename) {
185         return filePath + task.getUUID() + filename;
186     }
187 
188     @Override
189     protected void pushWhenNoDuplicate(Request request, Task task) {
190         if (!inited.get()) {
191             init(task);
192         }
193         queue.add(request);
194         fileUrlWriter.println(serializeRequest(request));
195     }
196 
197     @Override
198     public synchronized Request poll(Task task) {
199         if (!inited.get()) {
200             init(task);
201         }
202         fileCursorWriter.println(cursor.incrementAndGet());
203         return queue.poll();
204     }
205 
206     @Override
207     public int getLeftRequestsCount(Task task) {
208         return queue.size();
209     }
210 
211     @Override
212     public int getTotalRequestsCount(Task task) {
213         return getDuplicateRemover().getTotalRequestsCount(task);
214     }
215 
216     protected String serializeRequest(Request request) {
217         return request.getUrl();
218     }
219 
220     protected Request deserializeRequest(String line) {
221         return new Request(line);
222     }
223 
224 }