1 package us.codecraft.webmagic.scheduler;
2
3 import java.io.BufferedReader;
4 import java.io.Closeable;
5 import java.io.File;
6 import java.io.FileNotFoundException;
7 import java.io.FileReader;
8 import java.io.FileWriter;
9 import java.io.IOException;
10 import java.io.PrintWriter;
11 import java.util.LinkedHashSet;
12 import java.util.Set;
13 import java.util.concurrent.BlockingQueue;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.LinkedBlockingQueue;
16 import java.util.concurrent.ScheduledExecutorService;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.atomic.AtomicBoolean;
19 import java.util.concurrent.atomic.AtomicInteger;
20
21 import org.apache.commons.io.IOUtils;
22 import org.apache.commons.lang3.math.NumberUtils;
23
24 import us.codecraft.webmagic.Request;
25 import us.codecraft.webmagic.Task;
26 import us.codecraft.webmagic.scheduler.component.DuplicateRemover;
27
28
29
30
31
32
33
34
35 public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler,Closeable {
36
37 private String filePath = System.getProperty("java.io.tmpdir");
38
39 private String fileUrlAllName = ".urls.txt";
40
41 private Task task;
42
43 private String fileCursor = ".cursor.txt";
44
45 private PrintWriter fileUrlWriter;
46
47 private PrintWriter fileCursorWriter;
48
49 private AtomicInteger cursor = new AtomicInteger();
50
51 private AtomicBoolean inited = new AtomicBoolean(false);
52
53 private BlockingQueue<Request> queue;
54
55 private Set<String> urls;
56
57 private ScheduledExecutorService flushThreadPool;
58
59 public FileCacheQueueScheduler(String filePath) {
60 if (!filePath.endsWith("/") && !filePath.endsWith("\\")) {
61 filePath += "/";
62 }
63 this.filePath = filePath;
64 initDuplicateRemover();
65 }
66
67 private void flush() {
68 fileUrlWriter.flush();
69 fileCursorWriter.flush();
70 }
71
72 private void init(Task task) {
73 this.task = task;
74 File file = new File(filePath);
75 if (!file.exists()) {
76 file.mkdirs();
77 }
78 readFile();
79 initWriter();
80 initFlushThread();
81 inited.set(true);
82 logger.info("init cache scheduler success");
83 }
84
85 private void initDuplicateRemover() {
86 setDuplicateRemover(
87 new DuplicateRemover() {
88 @Override
89 public boolean isDuplicate(Request request, Task task) {
90 if (!inited.get()) {
91 init(task);
92 }
93 return !urls.add(request.getUrl());
94 }
95
96 @Override
97 public void resetDuplicateCheck(Task task) {
98 urls.clear();
99 }
100
101 @Override
102 public int getTotalRequestsCount(Task task) {
103 return urls.size();
104 }
105 });
106 }
107
108 private void initFlushThread() {
109 flushThreadPool = Executors.newScheduledThreadPool(1);
110 flushThreadPool.scheduleAtFixedRate(new Runnable() {
111 @Override
112 public void run() {
113 flush();
114 }
115 }, 10, 10, TimeUnit.SECONDS);
116 }
117
118 private void initWriter() {
119 try {
120 fileUrlWriter = new PrintWriter(new FileWriter(getFileName(fileUrlAllName), true));
121 fileCursorWriter = new PrintWriter(new FileWriter(getFileName(fileCursor), false));
122 } catch (IOException e) {
123 throw new RuntimeException("init cache scheduler error", e);
124 }
125 }
126
127 private void readFile() {
128 try {
129 queue = new LinkedBlockingQueue<Request>();
130 urls = new LinkedHashSet<String>();
131 readCursorFile();
132 readUrlFile();
133
134 } catch (FileNotFoundException e) {
135
136 logger.info("init cache file " + getFileName(fileUrlAllName));
137 } catch (IOException e) {
138 logger.error("init file error", e);
139 }
140 }
141
142 private void readUrlFile() throws IOException {
143 String line;
144 BufferedReader fileUrlReader = null;
145 try {
146 fileUrlReader = new BufferedReader(new FileReader(getFileName(fileUrlAllName)));
147 int lineReaded = 0;
148 while ((line = fileUrlReader.readLine()) != null) {
149 urls.add(line.trim());
150 lineReaded++;
151 if (lineReaded > cursor.get()) {
152 queue.add(deserializeRequest(line));
153 }
154 }
155 } finally {
156 if (fileUrlReader != null) {
157 IOUtils.closeQuietly(fileUrlReader);
158 }
159 }
160 }
161
162 private void readCursorFile() throws IOException {
163 BufferedReader fileCursorReader = null;
164 try {
165 fileCursorReader = new BufferedReader(new FileReader(getFileName(fileCursor)));
166 String line;
167
168 while ((line = fileCursorReader.readLine()) != null) {
169 cursor = new AtomicInteger(NumberUtils.toInt(line));
170 }
171 } finally {
172 if (fileCursorReader != null) {
173 IOUtils.closeQuietly(fileCursorReader);
174 }
175 }
176 }
177
178 public void close() throws IOException {
179 flushThreadPool.shutdown();
180 fileUrlWriter.close();
181 fileCursorWriter.close();
182 }
183
184 private String getFileName(String filename) {
185 return filePath + task.getUUID() + filename;
186 }
187
188 @Override
189 protected void pushWhenNoDuplicate(Request request, Task task) {
190 if (!inited.get()) {
191 init(task);
192 }
193 queue.add(request);
194 fileUrlWriter.println(serializeRequest(request));
195 }
196
197 @Override
198 public synchronized Request poll(Task task) {
199 if (!inited.get()) {
200 init(task);
201 }
202 fileCursorWriter.println(cursor.incrementAndGet());
203 return queue.poll();
204 }
205
206 @Override
207 public int getLeftRequestsCount(Task task) {
208 return queue.size();
209 }
210
211 @Override
212 public int getTotalRequestsCount(Task task) {
213 return getDuplicateRemover().getTotalRequestsCount(task);
214 }
215
216 protected String serializeRequest(Request request) {
217 return request.getUrl();
218 }
219
220 protected Request deserializeRequest(String line) {
221 return new Request(line);
222 }
223
224 }