icebergsparkdatasourceV2读取

iceberg通过实现spark的datasource v2中的DataSourceReader接口来实现读取数据,这里使用DataSourceReader的子接口SupportsScanColumnarBatch中实现enableBatchRead和planInputPartitions方法读取

/**
 * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
 * interface to output {@link ColumnarBatch} and make the scan faster.
 */
@InterfaceStability.Evolving
public interface SupportsScanColumnarBatch extends DataSourceReader {
  @Override
  default List<InputPartition<InternalRow>> planInputPartitions() {
    throw new IllegalStateException(
      "planInputPartitions not supported by default within SupportsScanColumnarBatch.");
  }

  /**
   * Similar to {@link DataSourceReader#planInputPartitions()}, but returns columnar data
   * in batches.
   */
  List<InputPartition<ColumnarBatch>> planBatchInputPartitions();

  /**
   * Returns true if the concrete data source reader can read data in batch according to the scan
   * properties like required columns, pushes filters, etc. It's possible that the implementation
   * can only support some certain columns with certain types. Users can overwrite this method and
   * {@link #planInputPartitions()} to fallback to normal read path under some conditions.
   */
  default boolean enableBatchRead() {
    return true;
  }
}
复制代码

如上所示enableBatchRead是一个方法去判断是否支持一些如required columns, pushes filters的特性,返回true则可以调用planInputPartitions()方法去读取数据。

public boolean enableBatchRead() {
    if (readUsingBatch == null) {
      boolean allParquetFileScanTasks =
          tasks().stream()
              .allMatch(combinedScanTask -> !combinedScanTask.isDataTask() && combinedScanTask.files()
                  .stream()
                  .allMatch(fileScanTask -> fileScanTask.file().format().equals(
                      FileFormat.PARQUET)));

      boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes);

      this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks ||
          (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));
    }
    return readUsingBatch;
  }
复制代码

enableBatchRead中调用tasks()去返回读取数据的任务列表

private List<CombinedScanTask> tasks() {
    if (tasks == null) {
      TableScan scan = table
          .newScan()
          .caseSensitive(caseSensitive)
          .project(lazySchema());

      try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
        this.tasks = Lists.newArrayList(tasksIterable);
      } catch (IOException e) {
        throw new RuntimeIOException(e, "Failed to close table scan: %s", scan);
      }
    }

    return tasks;
  }
复制代码

调用TableScan 的planTasks方法返回一个CombinedScanTask的迭代器,实现在BaseTableScan中

public CloseableIterable<CombinedScanTask> planTasks() {
    Map<String, String> options = context.options();
    
    ..........

    CloseableIterable<FileScanTask> fileScanTasks = planFiles();
    CloseableIterable<FileScanTask> splitFiles = TableScanUtil.splitFiles(fileScanTasks, splitSize);
    return TableScanUtil.planTasks(splitFiles, splitSize, lookback, openFileCost);
  }

复制代码

这里调用planFiles()方法返回一个FileScanTask的迭代器

  public CloseableIterable<FileScanTask> planFiles() {
      return planFiles(ops, snapshot,
          context.rowFilter(), context.ignoreResiduals(), context.caseSensitive(), context.returnColumnStats());
  }
复制代码

这里调用重载的planFiles()方法,这里采用了模板设计模式,重载的planFiles方法在BaseTableScan的子类DataTableScan中实现

public CloseableIterable<FileScanTask> planFiles(TableOperations ops, Snapshot snapshot,
                                                   Expression rowFilter, boolean ignoreResiduals,
                                                   boolean caseSensitive, boolean colStats) {
    ManifestGroup manifestGroup = new ManifestGroup(ops.io(), snapshot.dataManifests(), snapshot.deleteManifests())
        .caseSensitive(caseSensitive)
        .select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
        .filterData(rowFilter)
        .specsById(ops.current().specsById())
        .ignoreDeleted();

    if (ignoreResiduals) {
      manifestGroup = manifestGroup.ignoreResiduals();
    }

    if (PLAN_SCANS_WITH_WORKER_POOL && snapshot.dataManifests().size() > 1) {
      manifestGroup = manifestGroup.planWith(ThreadPools.getWorkerPool());
    }

    return manifestGroup.planFiles();
  }
复制代码

这里snapshot.dataManifests()中调用cacheManifests()方法

public List<ManifestFile> dataManifests() {
    if (dataManifests == null) {
      cacheManifests();
    }
    return dataManifests;
  }
复制代码
private void cacheManifests() {
    if (allManifests == null) {
      // if manifests isn't set, then the snapshotFile is set and should be read to get the list
      this.allManifests = ManifestLists.read(io.newInputFile(manifestListLocation));
    }

    if (dataManifests == null || deleteManifests == null) {
      this.dataManifests = ImmutableList.copyOf(Iterables.filter(allManifests,
          manifest -> manifest.content() == ManifestContent.DATA));
      this.deleteManifests = ImmutableList.copyOf(Iterables.filter(allManifests,
          manifest -> manifest.content() == ManifestContent.DELETES));
    }
  }
复制代码

如果allManifests为空从文件中读取,然后分为dataManifests和deleteManifests

image.png
这里可以看到这边入了12个snapshot的数据,所以有12个metadata.json文件和12个snapshot*.avro文件即 清单列表(Manifest list)文件,而清单文件(Manifest file)有24个,从文件名可以看到清单列表(Manifest list)文件和 清单文件(Manifest file)是有对应关系的

image.png

从运行时的数据也可看出对应于一个清单列表(Manifest list)文件有两个清单文件,m1结尾的是delete文件清单,m0结尾的是data文件清单

回到上面的planFiles方法中,最后调用manifestGroup.planFiles()方法

/**
   * Returns a iterable of scan tasks. It is safe to add entries of this iterable
   * to a collection as {@link DataFile} in each {@link FileScanTask} is defensively
   * copied.
   * @return a {@link CloseableIterable} of {@link FileScanTask}
   */
  public CloseableIterable<FileScanTask> planFiles() {
    LoadingCache<Integer, ResidualEvaluator> residualCache = Caffeine.newBuilder().build(specId -> {
      PartitionSpec spec = specsById.get(specId);
      Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : dataFilter;
      return ResidualEvaluator.of(spec, filter, caseSensitive);
    });

    DeleteFileIndex deleteFiles = deleteIndexBuilder.build();

    boolean dropStats = ManifestReader.dropStats(dataFilter, columns);
    if (!deleteFiles.isEmpty()) {
      select(Streams.concat(columns.stream(), ManifestReader.STATS_COLUMNS.stream()).collect(Collectors.toList()));
    }

    Iterable<CloseableIterable<FileScanTask>> tasks = entries((manifest, entries) -> {
      int specId = manifest.partitionSpecId();
      PartitionSpec spec = specsById.get(specId);
      String schemaString = SchemaParser.toJson(spec.schema());
      String specString = PartitionSpecParser.toJson(spec);
      ResidualEvaluator residuals = residualCache.get(specId);
      if (dropStats) {
        return CloseableIterable.transform(entries, e -> new BaseFileScanTask(
            e.file().copyWithoutStats(), deleteFiles.forEntry(e), schemaString, specString, residuals));
      } else {
        return CloseableIterable.transform(entries, e -> new BaseFileScanTask(
            e.file().copy(), deleteFiles.forEntry(e), schemaString, specString, residuals));
      }
    });

    if (executorService != null) {
      return new ParallelIterable<>(tasks, executorService);
    } else {
      return CloseableIterable.concat(tasks);
    }
  }
复制代码

这里返回的是CloseableIterable即文件级别的任务
中间DeleteFileIndex deleteFiles = deleteIndexBuilder.build();是读取delete文件索引

 DeleteFileIndex build() {
      // read all of the matching delete manifests in parallel and accumulate the matching files in a queue
      Queue<ManifestEntry<DeleteFile>> deleteEntries = new ConcurrentLinkedQueue<>();
      Tasks.foreach(deleteManifestReaders())
          .stopOnFailure().throwFailureWhenFinished()
          .executeWith(executorService)
          .run(deleteFile -> {
            try (CloseableIterable<ManifestEntry<DeleteFile>> reader = deleteFile) {
              for (ManifestEntry<DeleteFile> entry : reader) {
                // copy with stats for better filtering against data file stats
                deleteEntries.add(entry.copy());
              }
            } catch (IOException e) {
              throw new RuntimeIOException(e, "Failed to close");
            }
          });

      // build a map from (specId, partition) to delete file entries
      ListMultimap<Pair<Integer, StructLikeWrapper>, ManifestEntry<DeleteFile>> deleteFilesByPartition =
          Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
      for (ManifestEntry<DeleteFile> entry : deleteEntries) {
        int specId = entry.file().specId();
        StructLikeWrapper wrapper = StructLikeWrapper.forType(specsById.get(specId).partitionType())
            .set(entry.file().partition());
        deleteFilesByPartition.put(Pair.of(specId, wrapper), entry);
      }

      // sort the entries in each map value by sequence number and split into sequence numbers and delete files lists
      Map<Pair<Integer, StructLikeWrapper>, Pair<long[], DeleteFile[]>> sortedDeletesByPartition = Maps.newHashMap();
      // also, separate out equality deletes in an unpartitioned spec that should be applied globally
      long[] globalApplySeqs = null;
      DeleteFile[] globalDeletes = null;
      for (Pair<Integer, StructLikeWrapper> partition : deleteFilesByPartition.keySet()) {
        if (specsById.get(partition.first()).isUnpartitioned()) {
          Preconditions.checkState(globalDeletes == null, "Detected multiple partition specs with no partitions");

          List<Pair<Long, DeleteFile>> eqFilesSortedBySeq = deleteFilesByPartition.get(partition).stream()
              .filter(entry -> entry.file().content() == FileContent.EQUALITY_DELETES)
              .map(entry ->
                  // a delete file is indexed by the sequence number it should be applied to
                  Pair.of(entry.sequenceNumber() - 1, entry.file()))
              .sorted(Comparator.comparingLong(Pair::first))
              .collect(Collectors.toList());

          globalApplySeqs = eqFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
          globalDeletes = eqFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);

          List<Pair<Long, DeleteFile>> posFilesSortedBySeq = deleteFilesByPartition.get(partition).stream()
              .filter(entry -> entry.file().content() == FileContent.POSITION_DELETES)
              .map(entry -> Pair.of(entry.sequenceNumber(), entry.file()))
              .sorted(Comparator.comparingLong(Pair::first))
              .collect(Collectors.toList());

          long[] seqs = posFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
          DeleteFile[] files = posFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);

          sortedDeletesByPartition.put(partition, Pair.of(seqs, files));

        } else {
          List<Pair<Long, DeleteFile>> filesSortedBySeq = deleteFilesByPartition.get(partition).stream()
              .map(entry -> {
                // a delete file is indexed by the sequence number it should be applied to
                long applySeq = entry.sequenceNumber() -
                    (entry.file().content() == FileContent.EQUALITY_DELETES ? 1 : 0);
                return Pair.of(applySeq, entry.file());
              })
              .sorted(Comparator.comparingLong(Pair::first))
              .collect(Collectors.toList());

          long[] seqs = filesSortedBySeq.stream().mapToLong(Pair::first).toArray();
          DeleteFile[] files = filesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);

          sortedDeletesByPartition.put(partition, Pair.of(seqs, files));
        }
      }

      return new DeleteFileIndex(specsById, globalApplySeqs, globalDeletes, sortedDeletesByPartition);
    }
复制代码

这里首先调用deleteManifestReaders()读取deleteManifest文件

private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>> deleteManifestReaders() {
      LoadingCache<Integer, ManifestEvaluator> evalCache = specsById == null ? null :
          Caffeine.newBuilder().build(specId -> {
            PartitionSpec spec = specsById.get(specId);
            return ManifestEvaluator.forPartitionFilter(
                Expressions.and(partitionFilter, Projections.inclusive(spec, caseSensitive).project(dataFilter)),
                spec, caseSensitive);
          });

      Iterable<ManifestFile> matchingManifests = evalCache == null ? deleteManifests :
          Iterables.filter(deleteManifests, manifest ->
              manifest.content() == ManifestContent.DELETES &&
                  (manifest.hasAddedFiles() || manifest.hasDeletedFiles()) &&
                  evalCache.get(manifest.partitionSpecId()).eval(manifest));

      return Iterables.transform(
          matchingManifests,
          manifest ->
              ManifestFiles.readDeleteManifest(manifest, io, specsById)
                  .filterRows(dataFilter)
                  .filterPartitions(partitionFilter)
                  .caseSensitive(caseSensitive)
                  .liveEntries()
      );
    }
复制代码

deleteManifests中即读取的deleteManifests的文件map
最后调用ManifestFiles.readDeleteManifest 读取manifest文件返回ManifestReader

public static ManifestReader<DeleteFile> readDeleteManifest(ManifestFile manifest, FileIO io,
                                                              Map<Integer, PartitionSpec> specsById) {
    Preconditions.checkArgument(manifest.content() == ManifestContent.DELETES,
        "Cannot read a data manifest with a DeleteManifestReader: %s", manifest);
    InputFile file = io.newInputFile(manifest.path());
    InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest);
    return new ManifestReader<>(file, specsById, inheritableMetadata, FileType.DELETE_FILES);
  }
复制代码
protected ManifestReader(InputFile file, Map<Integer, PartitionSpec> specsById,
                           InheritableMetadata inheritableMetadata, FileType content) {
    this.file = file;
    this.inheritableMetadata = inheritableMetadata;
    this.content = content;

    try {
      try (AvroIterable<ManifestEntry<F>> headerReader = Avro.read(file)
          .project(ManifestEntry.getSchema(Types.StructType.of()).select("status"))
          .build()) {
        this.metadata = headerReader.getMetadata();
      }
    } catch (IOException e) {
      throw new RuntimeIOException(e);
    }

    int specId = TableMetadata.INITIAL_SPEC_ID;
    String specProperty = metadata.get("partition-spec-id");
    if (specProperty != null) {
      specId = Integer.parseInt(specProperty);
    }

    if (specsById != null) {
      this.spec = specsById.get(specId);
    } else {
      Schema schema = SchemaParser.fromJson(metadata.get("schema"));
      this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec"));
    }

    this.fileSchema = new Schema(DataFile.getType(spec.partitionType()).fields());
  }
复制代码

这里逻辑较为复杂 总之最后返回一个DeleteFileIndex对象deleteFiles,其中globalDeletes字段为equalitydelete文件列表,sortedDeletesByPartition字段为positiondelete文件列表

image.png

如图所示,这里sortedDeletesByPartition中Pair中的第一个元素为delete文件对应的snapshot,第二个元素为delete文件

继续planFiles方法中下面的步骤
这里调用entries方法返回FileScanTask的迭代器的迭代器
entries的参数是一个有两个参数的方法

private <T> Iterable<CloseableIterable<T>> entries(
            BiFunction<ManifestFile, CloseableIterable<ManifestEntry<DataFile>>, CloseableIterable<T>> entryFn) {

      Iterable<ManifestFile> matchingManifests = evalCache == null ? dataManifests :
              Iterables.filter(dataManifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));

      matchingManifests = Iterables.filter(matchingManifests, manifestPredicate::test);

      return Iterables.transform(
              matchingManifests,
              manifest -> {
                ManifestReader<DataFile> reader = ManifestFiles.read(manifest, io, specsById)
                        .filterRows(dataFilter)
                        .filterPartitions(partitionFilter)
                        .caseSensitive(caseSensitive)
                        .select(columns);

                CloseableIterable<ManifestEntry<DataFile>> entries = reader.entries();

                entries = CloseableIterable.filter(entries, manifestEntryPredicate);
                return entryFn.apply(manifest, entries);
              });
  }
复制代码

这里调用ManifestFiles.read方法读取Manifest文件,然后获取ManifestEntry迭代器,最后应用entries传入的参数方法去生成BaseFileScanTask

BaseFileScanTask(DataFile file, DeleteFile[] deletes, String schemaString, String specString,
                   ResidualEvaluator residuals) {
    this.file = file;
    this.deletes = deletes != null ? deletes : new DeleteFile[0];
    this.schemaString = schemaString;
    this.specString = specString;
    this.residuals = residuals;
  }
复制代码

file 为data文件,deletes 为delete文件列表,包含了所有序号比data文件大的delete文件

image.png
这里data文件为87,总文件有140个,比87大的delete文件有45个(还有一些比87大的data文件)
BaseFileScanTask的第二个参数delete文件是通过deleteFiles.forEntry(e)方法得到的

DeleteFile[] forEntry(ManifestEntry<DataFile> entry) {
    return this.forDataFile(entry.sequenceNumber(), (DataFile)entry.file());
}

DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {
    Pair<Integer, StructLikeWrapper> partition = this.partition(file.specId(), file.partition());
    Pair<long[], DeleteFile[]> partitionDeletes = (Pair)this.sortedDeletesByPartition.get(partition);
    Stream matchingDeletes;
    if (partitionDeletes == null) {
        matchingDeletes = limitBySequenceNumber(sequenceNumber, this.globalSeqs, this.globalDeletes);
    } else if (this.globalDeletes == null) {
        matchingDeletes = limitBySequenceNumber(sequenceNumber, (long[])partitionDeletes.first(), (DeleteFile[])partitionDeletes.second());
    } else {
        matchingDeletes = Stream.concat(limitBySequenceNumber(sequenceNumber, this.globalSeqs, this.globalDeletes), limitBySequenceNumber(sequenceNumber, (long[])partitionDeletes.first(), (DeleteFile[])partitionDeletes.second()));
    }

    return (DeleteFile[])matchingDeletes.filter((deleteFile) -> {
        return canContainDeletesForFile(file, deleteFile, ((PartitionSpec)this.specsById.get(file.specId())).schema());
    }).toArray((x$0) -> {
        return new DeleteFile[x$0];
    });
}
复制代码

最后运行完enableBatchRead后发现返回false.....
当然还是会通过planInputPartitions方法读取数据

/**
   * This is called in the Spark Driver when data is to be materialized into {@link InternalRow}
   */
  @Override
  public List<InputPartition<InternalRow>> planInputPartitions() {
    String tableSchemaString = SchemaParser.toJson(table.schema());
    String expectedSchemaString = SchemaParser.toJson(lazySchema());
    String nameMappingString = table.properties().get(DEFAULT_NAME_MAPPING);

    List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
    for (CombinedScanTask task : tasks()) {
      readTasks.add(new ReadTask<>(
          task, tableSchemaString, expectedSchemaString, nameMappingString, io, encryptionManager, caseSensitive,
          localityPreferred, InternalRowReaderFactory.INSTANCE));
    }

    return readTasks;
  }
复制代码

这个方法是在driver端执行的,同样是调用tasks()去生成CombinedScanTask任务,然后生成ReadTask,ReadTask是Reader的一个内部类,继承了spark中的InputPartition接口


/**
 * An input partition returned by {@link DataSourceReader#planInputPartitions()} and is
 * responsible for creating the actual data reader of one RDD partition.
 * The relationship between {@link InputPartition} and {@link InputPartitionReader}
 * is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}.
 *
 * Note that {@link InputPartition}s will be serialized and sent to executors, then
 * {@link InputPartitionReader}s will be created on executors to do the actual reading. So
 * {@link InputPartition} must be serializable while {@link InputPartitionReader} doesn't need to
 * be.
 */
@InterfaceStability.Evolving
public interface InputPartition<T> extends Serializable {

  /**
   * The preferred locations where the input partition reader returned by this partition can run
   * faster, but Spark does not guarantee to run the input partition reader on these locations.
   * The implementations should make sure that it can be run on any location.
   * The location is a string representing the host name.
   *
   * Note that if a host name cannot be recognized by Spark, it will be ignored as it was not in
   * the returned locations. The default return value is empty string array, which means this
   * input partition's reader has no location preference.
   *
   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
   * submitted.
   */
  default String[] preferredLocations() {
    return new String[0];
  }

  /**
   * Returns an input partition reader to do the actual reading work.
   *
   * If this method fails (by throwing an exception), the corresponding Spark task would fail and
   * get retried until hitting the maximum retry times.
   */
  InputPartitionReader<T> createPartitionReader();
}
复制代码

该接口即实际负责RDD partition的数据读取
InputPartition和InputPartitionReader的关系有点像Iterable和Iterator
Iterator接口中包含next等迭代器的需要的方法
Iterable接口中包含iterator方法返回一个Iterator接口
当集合如ArrayList实现Iterable时,会在内部通过内部类实现Iterator接口
然后实现iterator方法返回Iterator接口的实现类

这里也一样在Reader内部通过ReadTask实现InputPartition,同时实现InputPartition的方法createPartitionReader返回InputPartitionReader接口的实现类
这里是返回的RowReader类对象实现的InputPartitionReader接口,其中包含了读取数据的具体迭代器和方法,在其父类BaseDataReader中实现了InputPartitionReader接口中的next和get方法

在读取数据时就调用BaseDataReader中的next方法

public boolean next() throws IOException {
      while (true) {
        if (currentIterator.hasNext()) {
          this.current = currentIterator.next();
          return true;
        } else if (tasks.hasNext()) {
          this.currentIterator.close();
          this.currentTask = tasks.next();
          this.currentIterator = open(currentTask);
        } else {
          this.currentIterator.close();
          return false;
        }
      }
    }
  }
复制代码

这里调用open方法生成currentIterator 迭代器
实现在RowDataReader中,返回一个CloseableIterator迭代器对象

CloseableIterator<InternalRow> open(FileScanTask task) {
    SparkDeleteFilter deletes = new SparkDeleteFilter(task, tableSchema, expectedSchema);

    // schema or rows returned by readers
    Schema requiredSchema = deletes.requiredSchema();
    Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, RowDataReader::convertConstant);
    DataFile file = task.file();

    // update the current file for Spark's filename() function
    InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());

    return deletes.filter(open(task, requiredSchema, idToConstant)).iterator();
  }
复制代码

这里调用deletes.filter方法对open(task, requiredSchema, idToConstant)返回的具体文件的迭代器如newParquetIterable进行过滤

public CloseableIterable<T> filter(CloseableIterable<T> records) {
    return applyEqDeletes(applyPosDeletes(records));
  }
复制代码

filter方法先使用applyPosDeletes进行同position-delete,再调用applyEqDeletes进行equality-delete

private CloseableIterable<T> applyPosDeletes(CloseableIterable<T> records) {
    if (posDeletes.isEmpty()) {
      return records;
    }

    List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);

    // if there are fewer deletes than a reasonable number to keep in memory, use a set
    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
      return Deletes.filter(
          records, this::pos,
          Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes)));
    }

    return Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(dataFile.path(), deletes));
  }
复制代码

这里调用Deletes.deletePositions返回deletePositions的迭代器

public static <T extends StructLike> CloseableIterable<Long> deletePositions(CharSequence dataLocation,
                                                                               List<CloseableIterable<T>> deleteFiles) {
    DataFileFilter<T> locationFilter = new DataFileFilter<>(dataLocation);
    List<CloseableIterable<Long>> positions = Lists.transform(deleteFiles, deletes ->
        CloseableIterable.transform(locationFilter.filter(deletes), row -> (Long) POSITION_ACCESSOR.get(row)));

    return new SortedMerge<>(Long::compare, positions);
  }
复制代码

SortedMerge中创建了一个deletePositions堆,然后通过poll方法实现迭代
而在 返回实现了CloseableIterable接口的PositionStreamDeleteFilter对象,其中实现了FilterIterator接口的PositionFilterIterator类,

protected boolean shouldKeep(T row) {
  long currentPos = extractPos.apply(row);
  if (currentPos < nextDeletePos) {
    return true;
  }
  // consume delete positions until the next is past the current position
  boolean keep = currentPos != nextDeletePos;
  while (deletePosIterator.hasNext() && nextDeletePos <= currentPos) {
    this.nextDeletePos = deletePosIterator.next();
    if (keep && currentPos == nextDeletePos) {
      // if any delete position matches the current position, discard
      keep = false;
    }
  }
  return keep;
}

复制代码

其中主要通过shouldKeep实现data迭代器对排序过的delete迭代器的过滤操作
PosDeleteFile中的数据仅为文件名和需要删除的数据的行号

image.png