1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.index.updater;
20
21 import java.io.BufferedInputStream;
22 import java.io.DataInput;
23 import java.io.DataInputStream;
24 import java.io.EOFException;
25 import java.io.File;
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.io.UTFDataFormatException;
29 import java.nio.file.Files;
30 import java.nio.file.Path;
31 import java.time.Duration;
32 import java.time.Instant;
33 import java.util.ArrayList;
34 import java.util.Collections;
35 import java.util.Date;
36 import java.util.HashSet;
37 import java.util.List;
38 import java.util.Objects;
39 import java.util.Set;
40 import java.util.concurrent.ArrayBlockingQueue;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ExecutorService;
43 import java.util.concurrent.Executors;
44 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.atomic.AtomicBoolean;
46 import java.util.zip.GZIPInputStream;
47
48 import org.apache.lucene.document.Document;
49 import org.apache.lucene.document.Field;
50 import org.apache.lucene.document.FieldType;
51 import org.apache.lucene.index.IndexOptions;
52 import org.apache.lucene.index.IndexWriter;
53 import org.apache.lucene.index.IndexWriterConfig;
54 import org.apache.lucene.store.Directory;
55 import org.apache.lucene.store.FSDirectory;
56 import org.apache.maven.index.ArtifactInfo;
57 import org.apache.maven.index.context.DocumentFilter;
58 import org.apache.maven.index.context.IndexUtils;
59 import org.apache.maven.index.context.IndexingContext;
60 import org.apache.maven.index.context.NexusAnalyzer;
61 import org.apache.maven.index.context.NexusIndexWriter;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65
66
67
68
69
70 public class IndexDataReader {
71 private static final Logger LOGGER = LoggerFactory.getLogger(IndexDataReader.class);
72
73 private final DataInputStream dis;
74 private final Path tempStorage;
75 private final DocumentFilter filter;
76 private final FSDirectoryFactory factory;
77 private final int threads;
78
79 public IndexDataReader(final InputStream is) throws IOException {
80 this(is, 1);
81 }
82
83 public IndexDataReader(final InputStream is, final int threads) throws IOException {
84 this(is, null, null, null, threads);
85 }
86
87 public IndexDataReader(final InputStream is, final IndexUpdateRequest request) throws IOException {
88 this(
89 is,
90 request.getIndexTempDir() != null ? request.getIndexTempDir().toPath() : null,
91 request.getExtractionFilter(),
92 request.getFSDirectoryFactory(),
93 request.getThreads());
94 }
95
96 public IndexDataReader(
97 final InputStream is,
98 final Path tempStorage,
99 final DocumentFilter filter,
100 final FSDirectoryFactory factory,
101 final int threads)
102 throws IOException {
103 if (threads < 1) {
104 throw new IllegalArgumentException("Reader threads must be greater than zero: " + threads);
105 }
106 this.tempStorage = Objects.requireNonNullElse(tempStorage, Path.of(System.getProperty("java.io.tmpdir")));
107 this.factory = Objects.requireNonNullElse(factory, FSDirectoryFactory.DEFAULT);
108 this.filter = filter;
109 this.threads = threads;
110
111
112
113
114 is.mark(2);
115 InputStream data;
116 if (is.read() == 0x1f && is.read() == 0x8b)
117 {
118 is.reset();
119 data = new BufferedInputStream(new GZIPInputStream(is, 1024 * 8), 1024 * 8);
120 } else {
121 is.reset();
122 data = new BufferedInputStream(is, 1024 * 8);
123 }
124
125 this.dis = new DataInputStream(data);
126 }
127
128 public IndexDataReadResult readIndex(IndexWriter w, IndexingContext context) throws IOException {
129 if (threads == 1) {
130 return readIndexST(w, context);
131 } else {
132 return readIndexMT(w, context);
133 }
134 }
135
136 private IndexDataReadResult readIndexST(IndexWriter w, IndexingContext context) throws IOException {
137 LOGGER.debug("Reading ST index...");
138 Instant start = Instant.now();
139 long timestamp = readHeader();
140
141 Date date = null;
142
143 if (timestamp != -1) {
144 date = new Date(timestamp);
145
146 IndexUtils.updateTimestamp(w.getDirectory(), date);
147 }
148
149 int n = 0;
150
151 Document doc;
152 Set<String> rootGroups = new HashSet<>();
153 Set<String> allGroups = new HashSet<>();
154
155 while ((doc = readDocument()) != null) {
156 addToIndex(doc, context, w, rootGroups, allGroups);
157 n++;
158 }
159
160 w.commit();
161
162 IndexDataReadResult result = new IndexDataReadResult();
163 result.setDocumentCount(n);
164 result.setTimestamp(date);
165 result.setRootGroups(rootGroups);
166 result.setAllGroups(allGroups);
167
168 LOGGER.debug(
169 "Reading ST index done in {} sec",
170 Duration.between(start, Instant.now()).getSeconds());
171 return result;
172 }
173
174 private IndexDataReadResult readIndexMT(IndexWriter w, IndexingContext context) throws IOException {
175 LOGGER.debug("Reading MT index...");
176 Instant start = Instant.now();
177 long timestamp = readHeader();
178
179 int n = 0;
180
181 final Document theEnd = new Document();
182
183 Set<String> rootGroups = ConcurrentHashMap.newKeySet();
184 Set<String> allGroups = ConcurrentHashMap.newKeySet();
185 ArrayBlockingQueue<Document> queue = new ArrayBlockingQueue<>(10000);
186
187 ExecutorService executorService = Executors.newFixedThreadPool(threads);
188 List<Throwable> errors = Collections.synchronizedList(new ArrayList<>());
189 List<FSDirectory> siloDirectories = new ArrayList<>(threads);
190 List<IndexWriter> siloWriters = new ArrayList<>(threads);
191 AtomicBoolean stopEarly = new AtomicBoolean(false);
192 LOGGER.debug("Creating {} silo writer threads...", threads);
193 for (int i = 0; i < threads; i++) {
194 final int silo = i;
195 FSDirectory siloDirectory = tempDirectory("silo" + i);
196 siloDirectories.add(siloDirectory);
197 siloWriters.add(tempWriter(siloDirectory));
198 executorService.execute(() -> {
199 LOGGER.debug("Starting thread {}", Thread.currentThread().getName());
200 try {
201 while (true) {
202 try {
203 Document doc = queue.take();
204 if (doc == theEnd) {
205 break;
206 }
207 addToIndex(doc, context, siloWriters.get(silo), rootGroups, allGroups);
208 } catch (Throwable e) {
209 errors.add(e);
210 if (stopEarly.compareAndSet(false, true)) {
211 queue.clear();
212 executorService.shutdownNow();
213 }
214 break;
215 }
216 }
217 } finally {
218 LOGGER.debug("Done thread {}", Thread.currentThread().getName());
219 }
220 });
221 }
222
223 LOGGER.debug("Loading up documents into silos");
224 try {
225 Document doc;
226 while (!stopEarly.get() && (doc = readDocument()) != null) {
227 queue.put(doc);
228 n++;
229 }
230 LOGGER.debug("Signalling END");
231 for (int i = 0; i < threads; i++) {
232 queue.put(theEnd);
233 }
234
235 LOGGER.debug("Shutting down threads");
236 executorService.shutdown();
237 executorService.awaitTermination(5L, TimeUnit.MINUTES);
238 } catch (InterruptedException e) {
239 throw new IOException("Interrupted", e);
240 }
241
242 if (!errors.isEmpty()) {
243 if (errors.stream().allMatch(ex -> ex instanceof IOException || ex instanceof InterruptedException)) {
244 IOException exception = new IOException("Error during load of index");
245 errors.forEach(exception::addSuppressed);
246 throw exception;
247 } else {
248 RuntimeException exception = new RuntimeException("Error during load of index");
249 errors.forEach(exception::addSuppressed);
250 throw exception;
251 }
252 }
253
254 LOGGER.debug("Silos loaded...");
255 Date date = null;
256 if (timestamp != -1) {
257 date = new Date(timestamp);
258 IndexUtils.updateTimestamp(w.getDirectory(), date);
259 }
260
261 LOGGER.debug("Closing silo writers...");
262 for (IndexWriter siloWriter : siloWriters) {
263 siloWriter.commit();
264 siloWriter.close();
265 }
266
267 LOGGER.debug("Merging silo directories...");
268 w.addIndexes(siloDirectories.toArray(new Directory[0]));
269
270 LOGGER.debug("Cleanup of silo directories...");
271 for (FSDirectory siloDirectory : siloDirectories) {
272 File dir = siloDirectory.getDirectory().toFile();
273 siloDirectory.close();
274 IndexUtils.delete(dir);
275 }
276
277 LOGGER.debug("Finalizing...");
278 w.commit();
279
280 IndexDataReadResult result = new IndexDataReadResult();
281 result.setDocumentCount(n);
282 result.setTimestamp(date);
283 result.setRootGroups(rootGroups);
284 result.setAllGroups(allGroups);
285
286 LOGGER.debug(
287 "Reading MT index done in {} sec",
288 Duration.between(start, Instant.now()).getSeconds());
289 return result;
290 }
291
292 private FSDirectory tempDirectory(final String name) throws IOException {
293 return factory.open(
294 Files.createTempDirectory(tempStorage, name + ".dir").toFile());
295 }
296
297 private IndexWriter tempWriter(final FSDirectory directory) throws IOException {
298 IndexWriterConfig config = new IndexWriterConfig(new NexusAnalyzer());
299 config.setUseCompoundFile(false);
300 return new NexusIndexWriter(directory, config);
301 }
302
303 private void addToIndex(
304 final Document doc,
305 final IndexingContext context,
306 final IndexWriter indexWriter,
307 final Set<String> rootGroups,
308 final Set<String> allGroups)
309 throws IOException {
310 ArtifactInfo ai = IndexUtils.constructArtifactInfo(doc, context);
311 if (ai != null) {
312 if (filter == null || filter.accept(doc)) {
313 indexWriter.addDocument(IndexUtils.updateDocument(doc, context, false, ai));
314 rootGroups.add(ai.getRootGroup());
315 allGroups.add(ai.getGroupId());
316 }
317 } else {
318
319 if (doc.getField(ArtifactInfo.ALL_GROUPS) == null && doc.getField(ArtifactInfo.ROOT_GROUPS) == null) {
320 indexWriter.addDocument(doc);
321 }
322 }
323 }
324
325 public long readHeader() throws IOException {
326 final byte hdrbyte = (byte) ((IndexDataWriter.VERSION << 24) >> 24);
327
328 if (hdrbyte != dis.readByte()) {
329
330 throw new IOException("Provided input contains unexpected data (0x01 expected as 1st byte)!");
331 }
332
333 return dis.readLong();
334 }
335
336 public Document readDocument() throws IOException {
337 int fieldCount;
338 try {
339 fieldCount = dis.readInt();
340 } catch (EOFException ex) {
341 return null;
342 }
343
344 Document doc = new Document();
345
346 for (int i = 0; i < fieldCount; i++) {
347 doc.add(readField());
348 }
349
350
351 final Field uinfoField = (Field) doc.getField(ArtifactInfo.UINFO);
352 final String info = doc.get(ArtifactInfo.INFO);
353 if (uinfoField != null && info != null && !info.isEmpty()) {
354 String uinfoString = uinfoField.stringValue();
355 if (uinfoString.endsWith(ArtifactInfo.FS + ArtifactInfo.NA)) {
356 int elem = 0;
357 for (int i = -1; (i = info.indexOf(ArtifactInfo.FS, i + 1)) != -1; ) {
358 if (++elem == 6) {
359 String extension = info.substring(i + 1);
360 uinfoField.setStringValue(uinfoString + ArtifactInfo.FS + ArtifactInfo.nvl(extension));
361 break;
362 }
363 }
364 }
365 }
366
367 return doc;
368 }
369
370 private Field readField() throws IOException {
371 int flags = dis.read();
372
373 FieldType fieldType = new FieldType();
374 if ((flags & IndexDataWriter.F_INDEXED) > 0) {
375 boolean tokenized = (flags & IndexDataWriter.F_TOKENIZED) > 0;
376 fieldType.setTokenized(tokenized);
377 }
378 fieldType.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS);
379 fieldType.setStored((flags & IndexDataWriter.F_STORED) > 0);
380
381 String name = dis.readUTF();
382 String value = readUTF(dis);
383
384 return new Field(name, value, fieldType);
385 }
386
387 private static String readUTF(DataInput in) throws IOException {
388 int utflen = in.readInt();
389
390 byte[] bytearr;
391 char[] chararr;
392
393 try {
394 bytearr = new byte[utflen];
395 chararr = new char[utflen];
396 } catch (OutOfMemoryError e) {
397 throw new IOException(
398 "Index data content is inappropriate (is junk?), leads to OutOfMemoryError!"
399 + " See MINDEXER-28 for more information!",
400 e);
401 }
402
403 int c, char2, char3;
404 int count = 0;
405 int chararrCount = 0;
406
407 in.readFully(bytearr, 0, utflen);
408
409 while (count < utflen) {
410 c = bytearr[count] & 0xff;
411 if (c > 127) {
412 break;
413 }
414 count++;
415 chararr[chararrCount++] = (char) c;
416 }
417
418 while (count < utflen) {
419 c = bytearr[count] & 0xff;
420 switch (c >> 4) {
421 case 0:
422 case 1:
423 case 2:
424 case 3:
425 case 4:
426 case 5:
427 case 6:
428 case 7:
429
430 count++;
431 chararr[chararrCount++] = (char) c;
432 break;
433
434 case 12:
435 case 13:
436
437 count += 2;
438 if (count > utflen) {
439 throw new UTFDataFormatException("malformed input: partial character at end");
440 }
441 char2 = bytearr[count - 1];
442 if ((char2 & 0xC0) != 0x80) {
443 throw new UTFDataFormatException("malformed input around byte " + count);
444 }
445 chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
446 break;
447
448 case 14:
449
450 count += 3;
451 if (count > utflen) {
452 throw new UTFDataFormatException("malformed input: partial character at end");
453 }
454 char2 = bytearr[count - 2];
455 char3 = bytearr[count - 1];
456 if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
457 throw new UTFDataFormatException("malformed input around byte " + (count - 1));
458 }
459 chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F)));
460 break;
461
462 default:
463
464 throw new UTFDataFormatException("malformed input around byte " + count);
465 }
466 }
467
468
469 return new String(chararr, 0, chararrCount);
470 }
471
472
473
474
475 public static class IndexDataReadResult {
476 private Date timestamp;
477
478 private int documentCount;
479
480 private Set<String> rootGroups;
481
482 private Set<String> allGroups;
483
484 public void setDocumentCount(int documentCount) {
485 this.documentCount = documentCount;
486 }
487
488 public int getDocumentCount() {
489 return documentCount;
490 }
491
492 public void setTimestamp(Date timestamp) {
493 this.timestamp = timestamp;
494 }
495
496 public Date getTimestamp() {
497 return timestamp;
498 }
499
500 public void setRootGroups(Set<String> rootGroups) {
501 this.rootGroups = rootGroups;
502 }
503
504 public Set<String> getRootGroups() {
505 return rootGroups;
506 }
507
508 public void setAllGroups(Set<String> allGroups) {
509 this.allGroups = allGroups;
510 }
511
512 public Set<String> getAllGroups() {
513 return allGroups;
514 }
515 }
516
517
518
519
520
521
522
523
524
525
526 public IndexDataReadResult readIndex(final IndexDataReadVisitor visitor, final IndexingContext context)
527 throws IOException {
528 dis.readByte();
529
530 long timestamp = dis.readLong();
531
532 Date date = null;
533
534 if (timestamp != -1) {
535 date = new Date(timestamp);
536 }
537
538 int n = 0;
539
540 Document doc;
541 while ((doc = readDocument()) != null) {
542 visitor.visitDocument(IndexUtils.updateDocument(doc, context, false));
543
544 n++;
545 }
546
547 IndexDataReadResult result = new IndexDataReadResult();
548 result.setDocumentCount(n);
549 result.setTimestamp(date);
550 return result;
551 }
552
553
554
555
556 public interface IndexDataReadVisitor {
557
558
559
560
561
562
563 void visitDocument(Document document);
564 }
565 }