iceberg通过实现spark的datasource v2中的DataSourceReader接口来实现读取数据,这里使用DataSourceReader的子接口SupportsScanColumnarBatch中实现enableBatchRead和planInputPartitions方法读取
/**
* A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
* interface to output {@link ColumnarBatch} and make the scan faster.
*/
@InterfaceStability.Evolving
public interface SupportsScanColumnarBatch extends DataSourceReader {
@Override
default List<InputPartition<InternalRow>> planInputPartitions() {
throw new IllegalStateException(
"planInputPartitions not supported by default within SupportsScanColumnarBatch.");
}
/**
* Similar to {@link DataSourceReader#planInputPartitions()}, but returns columnar data
* in batches.
*/
List<InputPartition<ColumnarBatch>> planBatchInputPartitions();
/**
* Returns true if the concrete data source reader can read data in batch according to the scan
* properties like required columns, pushes filters, etc. It's possible that the implementation
* can only support some certain columns with certain types. Users can overwrite this method and
* {@link #planInputPartitions()} to fallback to normal read path under some conditions.
*/
default boolean enableBatchRead() {
return true;
}
}
复制代码
如上所示enableBatchRead是一个方法去判断是否支持一些如required columns, pushes filters的特性,返回true则可以调用planInputPartitions()方法去读取数据。
public boolean enableBatchRead() {
if (readUsingBatch == null) {
boolean allParquetFileScanTasks =
tasks().stream()
.allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files()
.stream()
.allMatch(fileScanTask -> fileScanTask.file().format().equals(
FileFormat.PARQUET)));
boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes);
this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks ||
(allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));
}
return readUsingBatch;
}
复制代码
enableBatchRead中调用tasks()去返回读取数据的任务列表
private List<CombinedScanTask> tasks() {
if (tasks == null) {
TableScan scan = table
.newScan()
.caseSensitive(caseSensitive)
.project(lazySchema());
try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
this.tasks = Lists.newArrayList(tasksIterable);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close table scan: %s", scan);
}
}
return tasks;
}
复制代码
调用TableScan 的planTasks方法返回一个CombinedScanTask的迭代器,实现在BaseTableScan中
public CloseableIterable<CombinedScanTask> planTasks() {
Map<String, String> options = context.options();
..........
CloseableIterable<FileScanTask> fileScanTasks = planFiles();
CloseableIterable<FileScanTask> splitFiles = TableScanUtil.splitFiles(fileScanTasks, splitSize);
return TableScanUtil.planTasks(splitFiles, splitSize, lookback, openFileCost);
}
复制代码
这里调用planFiles()方法返回一个FileScanTask的迭代器
public CloseableIterable<FileScanTask> planFiles() {
return planFiles(ops, snapshot,
context.rowFilter(), context.ignoreResiduals(), context.caseSensitive(), context.returnColumnStats());
}
复制代码
这里调用重载的planFiles()方法,这里采用了模板设计模式,重载的planFiles方法在BaseTableScan的子类DataTableScan中实现
public CloseableIterable<FileScanTask> planFiles(TableOperations ops, Snapshot snapshot,
Expression rowFilter, boolean ignoreResiduals,
boolean caseSensitive, boolean colStats) {
ManifestGroup manifestGroup = new ManifestGroup(ops.io(), snapshot.dataManifests(), snapshot.deleteManifests())
.caseSensitive(caseSensitive)
.select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
.filterData(rowFilter)
.specsById(ops.current().specsById())
.ignoreDeleted();
if (ignoreResiduals) {
manifestGroup = manifestGroup.ignoreResiduals();
}
if (PLAN_SCANS_WITH_WORKER_POOL && snapshot.dataManifests().size() > 1) {
manifestGroup = manifestGroup.planWith(ThreadPools.getWorkerPool());
}
return manifestGroup.planFiles();
}
复制代码
这里snapshot.dataManifests()中调用cacheManifests()方法
public List<ManifestFile> dataManifests() {
if (dataManifests == null) {
cacheManifests();
}
return dataManifests;
}
复制代码
private void cacheManifests() {
if (allManifests == null) {
// if manifests isn't set, then the snapshotFile is set and should be read to get the list
this.allManifests = ManifestLists.read(io.newInputFile(manifestListLocation));
}
if (dataManifests == null || deleteManifests == null) {
this.dataManifests = ImmutableList.copyOf(Iterables.filter(allManifests,
manifest -> manifest.content() == ManifestContent.DATA));
this.deleteManifests = ImmutableList.copyOf(Iterables.filter(allManifests,
manifest -> manifest.content() == ManifestContent.DELETES));
}
}
复制代码
如果allManifests为空从文件中读取,然后分为dataManifests和deleteManifests
这里可以看到这边入了12个snapshot的数据,所以有12个metadata.json文件和12个snapshot*.avro文件即 清单列表(Manifest list)文件,而清单文件(Manifest file)有24个,从文件名可以看到清单列表(Manifest list)文件和 清单文件(Manifest file)是有对应关系的
从运行时的数据也可看出对应于一个清单列表(Manifest list)文件有两个清单文件,m1结尾的是delete文件清单,m0结尾的是data文件清单
回到上面的planFiles方法中,最后调用manifestGroup.planFiles()方法
/**
* Returns a iterable of scan tasks. It is safe to add entries of this iterable
* to a collection as {@link DataFile} in each {@link FileScanTask} is defensively
* copied.
* @return a {@link CloseableIterable} of {@link FileScanTask}
*/
public CloseableIterable<FileScanTask> planFiles() {
LoadingCache<Integer, ResidualEvaluator> residualCache = Caffeine.newBuilder().build(specId -> {
PartitionSpec spec = specsById.get(specId);
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : dataFilter;
return ResidualEvaluator.of(spec, filter, caseSensitive);
});
DeleteFileIndex deleteFiles = deleteIndexBuilder.build();
boolean dropStats = ManifestReader.dropStats(dataFilter, columns);
if (!deleteFiles.isEmpty()) {
select(Streams.concat(columns.stream(), ManifestReader.STATS_COLUMNS.stream()).collect(Collectors.toList()));
}
Iterable<CloseableIterable<FileScanTask>> tasks = entries((manifest, entries) -> {
int specId = manifest.partitionSpecId();
PartitionSpec spec = specsById.get(specId);
String schemaString = SchemaParser.toJson(spec.schema());
String specString = PartitionSpecParser.toJson(spec);
ResidualEvaluator residuals = residualCache.get(specId);
if (dropStats) {
return CloseableIterable.transform(entries, e -> new BaseFileScanTask(
e.file().copyWithoutStats(), deleteFiles.forEntry(e), schemaString, specString, residuals));
} else {
return CloseableIterable.transform(entries, e -> new BaseFileScanTask(
e.file().copy(), deleteFiles.forEntry(e), schemaString, specString, residuals));
}
});
if (executorService != null) {
return new ParallelIterable<>(tasks, executorService);
} else {
return CloseableIterable.concat(tasks);
}
}
复制代码
这里返回的是CloseableIterable即文件级别的任务
中间DeleteFileIndex deleteFiles = deleteIndexBuilder.build();是读取delete文件索引
DeleteFileIndex build() {
// read all of the matching delete manifests in parallel and accumulate the matching files in a queue
Queue<ManifestEntry<DeleteFile>> deleteEntries = new ConcurrentLinkedQueue<>();
Tasks.foreach(deleteManifestReaders())
.stopOnFailure().throwFailureWhenFinished()
.executeWith(executorService)
.run(deleteFile -> {
try (CloseableIterable<ManifestEntry<DeleteFile>> reader = deleteFile) {
for (ManifestEntry<DeleteFile> entry : reader) {
// copy with stats for better filtering against data file stats
deleteEntries.add(entry.copy());
}
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close");
}
});
// build a map from (specId, partition) to delete file entries
ListMultimap<Pair<Integer, StructLikeWrapper>, ManifestEntry<DeleteFile>> deleteFilesByPartition =
Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
for (ManifestEntry<DeleteFile> entry : deleteEntries) {
int specId = entry.file().specId();
StructLikeWrapper wrapper = StructLikeWrapper.forType(specsById.get(specId).partitionType())
.set(entry.file().partition());
deleteFilesByPartition.put(Pair.of(specId, wrapper), entry);
}
// sort the entries in each map value by sequence number and split into sequence numbers and delete files lists
Map<Pair<Integer, StructLikeWrapper>, Pair<long[], DeleteFile[]>> sortedDeletesByPartition = Maps.newHashMap();
// also, separate out equality deletes in an unpartitioned spec that should be applied globally
long[] globalApplySeqs = null;
DeleteFile[] globalDeletes = null;
for (Pair<Integer, StructLikeWrapper> partition : deleteFilesByPartition.keySet()) {
if (specsById.get(partition.first()).isUnpartitioned()) {
Preconditions.checkState(globalDeletes == null, "Detected multiple partition specs with no partitions");
List<Pair<Long, DeleteFile>> eqFilesSortedBySeq = deleteFilesByPartition.get(partition).stream()
.filter(entry -> entry.file().content() == FileContent.EQUALITY_DELETES)
.map(entry ->
// a delete file is indexed by the sequence number it should be applied to
Pair.of(entry.sequenceNumber() - 1, entry.file()))
.sorted(Comparator.comparingLong(Pair::first))
.collect(Collectors.toList());
globalApplySeqs = eqFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
globalDeletes = eqFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);
List<Pair<Long, DeleteFile>> posFilesSortedBySeq = deleteFilesByPartition.get(partition).stream()
.filter(entry -> entry.file().content() == FileContent.POSITION_DELETES)
.map(entry -> Pair.of(entry.sequenceNumber(), entry.file()))
.sorted(Comparator.comparingLong(Pair::first))
.collect(Collectors.toList());
long[] seqs = posFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
DeleteFile[] files = posFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);
sortedDeletesByPartition.put(partition, Pair.of(seqs, files));
} else {
List<Pair<Long, DeleteFile>> filesSortedBySeq = deleteFilesByPartition.get(partition).stream()
.map(entry -> {
// a delete file is indexed by the sequence number it should be applied to
long applySeq = entry.sequenceNumber() -
(entry.file().content() == FileContent.EQUALITY_DELETES ? 1 : 0);
return Pair.of(applySeq, entry.file());
})
.sorted(Comparator.comparingLong(Pair::first))
.collect(Collectors.toList());
long[] seqs = filesSortedBySeq.stream().mapToLong(Pair::first).toArray();
DeleteFile[] files = filesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);
sortedDeletesByPartition.put(partition, Pair.of(seqs, files));
}
}
return new DeleteFileIndex(specsById, globalApplySeqs, globalDeletes, sortedDeletesByPartition);
}
复制代码
这里首先调用deleteManifestReaders()读取deleteManifest文件
private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>> deleteManifestReaders() {
LoadingCache<Integer, ManifestEvaluator> evalCache = specsById == null ? null :
Caffeine.newBuilder().build(specId -> {
PartitionSpec spec = specsById.get(specId);
return ManifestEvaluator.forPartitionFilter(
Expressions.and(partitionFilter, Projections.inclusive(spec, caseSensitive).project(dataFilter)),
spec, caseSensitive);
});
Iterable<ManifestFile> matchingManifests = evalCache == null ? deleteManifests :
Iterables.filter(deleteManifests, manifest ->
manifest.content() == ManifestContent.DELETES &&
(manifest.hasAddedFiles() || manifest.hasDeletedFiles()) &&
evalCache.get(manifest.partitionSpecId()).eval(manifest));
return Iterables.transform(
matchingManifests,
manifest ->
ManifestFiles.readDeleteManifest(manifest, io, specsById)
.filterRows(dataFilter)
.filterPartitions(partitionFilter)
.caseSensitive(caseSensitive)
.liveEntries()
);
}
复制代码
deleteManifests中即读取的deleteManifests的文件map
最后调用ManifestFiles.readDeleteManifest 读取manifest文件返回ManifestReader
public static ManifestReader<DeleteFile> readDeleteManifest(ManifestFile manifest, FileIO io,
Map<Integer, PartitionSpec> specsById) {
Preconditions.checkArgument(manifest.content() == ManifestContent.DELETES,
"Cannot read a data manifest with a DeleteManifestReader: %s", manifest);
InputFile file = io.newInputFile(manifest.path());
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest);
return new ManifestReader<>(file, specsById, inheritableMetadata, FileType.DELETE_FILES);
}
复制代码
protected ManifestReader(InputFile file, Map<Integer, PartitionSpec> specsById,
InheritableMetadata inheritableMetadata, FileType content) {
this.file = file;
this.inheritableMetadata = inheritableMetadata;
this.content = content;
try {
try (AvroIterable<ManifestEntry<F>> headerReader = Avro.read(file)
.project(ManifestEntry.getSchema(Types.StructType.of()).select("status"))
.build()) {
this.metadata = headerReader.getMetadata();
}
} catch (IOException e) {
throw new RuntimeIOException(e);
}
int specId = TableMetadata.INITIAL_SPEC_ID;
String specProperty = metadata.get("partition-spec-id");
if (specProperty != null) {
specId = Integer.parseInt(specProperty);
}
if (specsById != null) {
this.spec = specsById.get(specId);
} else {
Schema schema = SchemaParser.fromJson(metadata.get("schema"));
this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec"));
}
this.fileSchema = new Schema(DataFile.getType(spec.partitionType()).fields());
}
复制代码
这里逻辑较为复杂 总之最后返回一个DeleteFileIndex对象deleteFiles,其中globalDeletes字段为equalitydelete文件列表,sortedDeletesByPartition字段为positiondelete文件列表
如图所示,这里sortedDeletesByPartition中Pair中的第一个元素为delete文件对应的snapshot,第二个元素为delete文件
继续planFiles方法中下面的步骤
这里调用entries方法返回FileScanTask的迭代器的迭代器
entries的参数是一个有两个参数的方法
private <T> Iterable<CloseableIterable<T>> entries(
BiFunction<ManifestFile, CloseableIterable<ManifestEntry<DataFile>>, CloseableIterable<T>> entryFn) {
Iterable<ManifestFile> matchingManifests = evalCache == null ? dataManifests :
Iterables.filter(dataManifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
matchingManifests = Iterables.filter(matchingManifests, manifestPredicate::test);
return Iterables.transform(
matchingManifests,
manifest -> {
ManifestReader<DataFile> reader = ManifestFiles.read(manifest, io, specsById)
.filterRows(dataFilter)
.filterPartitions(partitionFilter)
.caseSensitive(caseSensitive)
.select(columns);
CloseableIterable<ManifestEntry<DataFile>> entries = reader.entries();
entries = CloseableIterable.filter(entries, manifestEntryPredicate);
return entryFn.apply(manifest, entries);
});
}
复制代码
这里调用ManifestFiles.read方法读取Manifest文件,然后获取ManifestEntry迭代器,最后应用entries传入的参数方法去生成BaseFileScanTask
BaseFileScanTask(DataFile file, DeleteFile[] deletes, String schemaString, String specString,
ResidualEvaluator residuals) {
this.file = file;
this.deletes = deletes != null ? deletes : new DeleteFile[0];
this.schemaString = schemaString;
this.specString = specString;
this.residuals = residuals;
}
复制代码
file 为data文件,deletes 为delete文件列表,包含了所有序号比data文件大的delete文件
这里data文件为87,总文件有140个,比87大的delete文件有45个(还有一些比87大的data文件)
BaseFileScanTask的第二个参数delete文件是通过deleteFiles.forEntry(e)方法得到的
DeleteFile[] forEntry(ManifestEntry<DataFile> entry) {
return this.forDataFile(entry.sequenceNumber(), (DataFile)entry.file());
}
DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {
Pair<Integer, StructLikeWrapper> partition = this.partition(file.specId(), file.partition());
Pair<long[], DeleteFile[]> partitionDeletes = (Pair)this.sortedDeletesByPartition.get(partition);
Stream matchingDeletes;
if (partitionDeletes == null) {
matchingDeletes = limitBySequenceNumber(sequenceNumber, this.globalSeqs, this.globalDeletes);
} else if (this.globalDeletes == null) {
matchingDeletes = limitBySequenceNumber(sequenceNumber, (long[])partitionDeletes.first(), (DeleteFile[])partitionDeletes.second());
} else {
matchingDeletes = Stream.concat(limitBySequenceNumber(sequenceNumber, this.globalSeqs, this.globalDeletes), limitBySequenceNumber(sequenceNumber, (long[])partitionDeletes.first(), (DeleteFile[])partitionDeletes.second()));
}
return (DeleteFile[])matchingDeletes.filter((deleteFile) -> {
return canContainDeletesForFile(file, deleteFile, ((PartitionSpec)this.specsById.get(file.specId())).schema());
}).toArray((x$0) -> {
return new DeleteFile[x$0];
});
}
复制代码
最后运行完enableBatchRead后发现返回false.....
当然还是会通过planInputPartitions方法读取数据
/**
* This is called in the Spark Driver when data is to be materialized into {@link InternalRow}
*/
@Override
public List<InputPartition<InternalRow>> planInputPartitions() {
String tableSchemaString = SchemaParser.toJson(table.schema());
String expectedSchemaString = SchemaParser.toJson(lazySchema());
String nameMappingString = table.properties().get(DEFAULT_NAME_MAPPING);
List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
for (CombinedScanTask task : tasks()) {
readTasks.add(new ReadTask<>(
task, tableSchemaString, expectedSchemaString, nameMappingString, io, encryptionManager, caseSensitive,
localityPreferred, InternalRowReaderFactory.INSTANCE));
}
return readTasks;
}
复制代码
这个方法是在driver端执行的,同样是调用tasks()去生成CombinedScanTask任务,然后生成ReadTask,ReadTask是Reader的一个内部类,继承了spark中的InputPartition接口
/**
* An input partition returned by {@link DataSourceReader#planInputPartitions()} and is
* responsible for creating the actual data reader of one RDD partition.
* The relationship between {@link InputPartition} and {@link InputPartitionReader}
* is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}.
*
* Note that {@link InputPartition}s will be serialized and sent to executors, then
* {@link InputPartitionReader}s will be created on executors to do the actual reading. So
* {@link InputPartition} must be serializable while {@link InputPartitionReader} doesn't need to
* be.
*/
@InterfaceStability.Evolving
public interface InputPartition<T> extends Serializable {
/**
* The preferred locations where the input partition reader returned by this partition can run
* faster, but Spark does not guarantee to run the input partition reader on these locations.
* The implementations should make sure that it can be run on any location.
* The location is a string representing the host name.
*
* Note that if a host name cannot be recognized by Spark, it will be ignored as it was not in
* the returned locations. The default return value is empty string array, which means this
* input partition's reader has no location preference.
*
* If this method fails (by throwing an exception), the action will fail and no Spark job will be
* submitted.
*/
default String[] preferredLocations() {
return new String[0];
}
/**
* Returns an input partition reader to do the actual reading work.
*
* If this method fails (by throwing an exception), the corresponding Spark task would fail and
* get retried until hitting the maximum retry times.
*/
InputPartitionReader<T> createPartitionReader();
}
复制代码
该接口即实际负责RDD partition的数据读取
InputPartition和InputPartitionReader的关系有点像Iterable和Iterator
Iterator接口中包含next等迭代器的需要的方法
Iterable接口中包含iterator方法返回一个Iterator接口
当集合如ArrayList实现Iterable时,会在内部通过内部类实现Iterator接口
然后实现iterator方法返回Iterator接口的实现类
这里也一样在Reader内部通过ReadTask实现InputPartition,同时实现InputPartition的方法createPartitionReader返回InputPartitionReader接口的实现类
这里是返回的RowReader类对象实现的InputPartitionReader接口,其中包含了读取数据的具体迭代器和方法,在其父类BaseDataReader中实现了InputPartitionReader接口中的next和get方法
在读取数据时就调用BaseDataReader中的next方法
public boolean next() throws IOException {
while (true) {
if (currentIterator.hasNext()) {
this.current = currentIterator.next();
return true;
} else if (tasks.hasNext()) {
this.currentIterator.close();
this.currentTask = tasks.next();
this.currentIterator = open(currentTask);
} else {
this.currentIterator.close();
return false;
}
}
}
}
复制代码
这里调用open方法生成currentIterator 迭代器
实现在RowDataReader中,返回一个CloseableIterator迭代器对象
CloseableIterator<InternalRow> open(FileScanTask task) {
SparkDeleteFilter deletes = new SparkDeleteFilter(task, tableSchema, expectedSchema);
// schema or rows returned by readers
Schema requiredSchema = deletes.requiredSchema();
Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, RowDataReader::convertConstant);
DataFile file = task.file();
// update the current file for Spark's filename() function
InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());
return deletes.filter(open(task, requiredSchema, idToConstant)).iterator();
}
复制代码
这里调用deletes.filter方法对open(task, requiredSchema, idToConstant)返回的具体文件的迭代器如newParquetIterable进行过滤
public CloseableIterable<T> filter(CloseableIterable<T> records) {
return applyEqDeletes(applyPosDeletes(records));
}
复制代码
filter方法先使用applyPosDeletes进行同position-delete,再调用applyEqDeletes进行equality-delete
private CloseableIterable<T> applyPosDeletes(CloseableIterable<T> records) {
if (posDeletes.isEmpty()) {
return records;
}
List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
// if there are fewer deletes than a reasonable number to keep in memory, use a set
if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
return Deletes.filter(
records, this::pos,
Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes)));
}
return Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(dataFile.path(), deletes));
}
复制代码
这里调用Deletes.deletePositions返回deletePositions的迭代器
public static <T extends StructLike> CloseableIterable<Long> deletePositions(CharSequence dataLocation,
List<CloseableIterable<T>> deleteFiles) {
DataFileFilter<T> locationFilter = new DataFileFilter<>(dataLocation);
List<CloseableIterable<Long>> positions = Lists.transform(deleteFiles, deletes ->
CloseableIterable.transform(locationFilter.filter(deletes), row -> (Long) POSITION_ACCESSOR.get(row)));
return new SortedMerge<>(Long::compare, positions);
}
复制代码
SortedMerge中创建了一个deletePositions堆,然后通过poll方法实现迭代
而在 返回实现了CloseableIterable接口的PositionStreamDeleteFilter对象,其中实现了FilterIterator接口的PositionFilterIterator类,
protected boolean shouldKeep(T row) {
long currentPos = extractPos.apply(row);
if (currentPos < nextDeletePos) {
return true;
}
// consume delete positions until the next is past the current position
boolean keep = currentPos != nextDeletePos;
while (deletePosIterator.hasNext() && nextDeletePos <= currentPos) {
this.nextDeletePos = deletePosIterator.next();
if (keep && currentPos == nextDeletePos) {
// if any delete position matches the current position, discard
keep = false;
}
}
return keep;
}
复制代码
其中主要通过shouldKeep实现data迭代器对排序过的delete迭代器的过滤操作
PosDeleteFile中的数据仅为文件名和需要删除的数据的行号
近期评论