Java:使用缓冲器迭代流

「这是我参与11月更文挑战的第10天,活动详情查看:2021最后一次更文挑战

有时,在迭代字节流时,仅单独查看每个字节是不够的。您可能需要随时从缓冲区中的流中提供至少N个下一个字节(例如1024字节)。例如,如果您正在解析编程语言,您可能需要在从流中获得的数据中来回移动——提前阅读,换句话说,回溯。为了能够在流数据中来回移动,您可能需要在缓冲区中保留下一个例如1024或2048(或更多)字节。

为了解决这个问题,我将开发一个RollingBufferInputStream类,它在一个缓冲区中保持至少N个字节可用。从另一个InputStream读取字节。在这篇文章的末尾有一个完整的RollingBufferInputStream的代码列表,你可以复制,所以你不必自己开发这个。

以下是上述问题的简单示例:

image.png

当您在流中前进时,您基本上将数据读取到缓冲区的末尾,并将数据从缓冲区顶部取出。事实上,您不会从缓冲区顶部提取数据。您只需稍后沿着缓冲区向上移动数据。然后,您将新数据从流读取到缓冲区的末尾。

基本上,您执行两项操作:

  1. 将数据从缓冲区的底部复制到顶部。这也被称为压缩缓冲区。
  2. 在已经读取到缓冲区中的任何现有数据后,将新数据读取到缓冲区中。我把这个操作称为在缓冲区中填充数据。如果缓冲区中还没有数据,则从顶部填充。

缓冲区大小

要解决的第一个问题是用于保存数据的缓冲区的大小。如果您需要访问至少1024字节,您还需要至少1024字节的缓冲区。换句话说,您有两个属性需要考虑:

  1. 块大小:您需要访问的字节数。
  2. 缓冲区大小:缓冲区的大小。必须至少等于块大小。

缓冲区大小=块大小

如果缓冲区的block size正好是字节,那么每次您从字节0检查缓冲区时,对于您要查找的内容,您需要压缩缓冲区。要么是因为您从索引0找到了您要找的东西,要么是因为您没有找到您要找的东西,并且需要从索引1和前面的block size字节中查找。

如果我们假设你只能跳过1字节之前每次你不找到你寻找的东西,那么你需要复制块大小- 1字节缓冲区,其余的缓冲区的顶端,再看,然后读1个字节缓冲区的最后一个单元。

例如,如果块大小和缓冲区大小均为1024字节,那么每次您没有从索引0中找到您要查找的内容时,您需要将缓冲区的1023最后一个字节复制到索引0,并将字节读取到缓冲区底部。这意味着为缓冲区中处理的每个字节复制1023字节。这很贵。

缓冲区大小>块大小

您可以使缓冲区大小大于块大小,而不是压缩从顶部读取的每个字节的缓冲区。如果您使缓冲区大小等于block size + 1,那么您只需要每读取两个字节压缩缓冲区。如果您使缓冲区大小等于block size * 2,那么您只需压缩每个block size字节的缓冲区。

例如,如果块大小为1024,缓冲区大小为2048,那么您可以沿着缓冲区处理1024字节,直到缓冲区中剩余的少于1024字节。这意味着每处理1024个字节只压缩1个。这比每处理的1个字节1个压缩性能要好得多。

由于缓冲区的压缩可能很昂贵,因此所需的缓冲区压缩越少越好。因此,缓冲区越大越好。当然,您必须权衡缓冲区的大小和应用程序中其他内存的需求。一个因为消耗太多内存而在硬盘上开始交换的应用程序也会损失很多性能。通常,缓冲区大小为2到4倍的block size就可以了。

以下是块大小、缓冲区大小和流迭代原理的说明:

image.png

RollingBufferInputStream

我开发的RollingBufferInputStream类可用于迭代InputStream,同时确保缓冲区中始终有block size字节。当然,除非达到流的尽头。

注意:即使该类称为RollingBufferInputStream,它也不是InputStream子类。这是有道理的,因为在使用RollingBufferInputStream时,您不会使用InputStream的任何传统方法。您只会使用其特殊方法,并且您将通过其内部缓冲区访问流中的数据。

使用RollingBufferInputStream

在向您展示RollingBufferInputStream是如何实现之前,我将首先向您展示如何使用它。然后,您可能会更好地了解其实施。

image.png

首先,您通过调用hasAvailableBytes()询问缓冲区是否有N个可用字节。如果缓冲区不包含N字节,此方法将在内部尝试填充缓冲区。尝试此操作后,该方法将检查缓冲区现在是否包含N字节。如果是这样,它就会返回true。否则它会返回false。

其次,您在缓冲区中寻找匹配项。RollingBufferInputStream包含访问缓冲区的方法,并了解缓冲区中有多少字节可用,以及当前开始索引的位置。

第三,您将缓冲区中的开始索引增加适当的值。如果找到匹配项,此示例将跳过整个块。如果您正在解析语言,您只需跳到找到语句的末尾。如果没有找到匹配项,您将开始索引移动1或任何适当的值,以跳过前面,开始从下一个索引中搜索。

RollingBufferInputStream实施大纲

这是RollingBufferInputStream类的第一个大纲。为了清晰起见,一些代码已经在这里删除了。完整的代码列表可以在本文末尾找到。此大纲主要用于向您展示RollingBufferInputStream类的界面。

image.png

startend变量标记缓冲区中的开始和结束索引。当您通过缓冲区向下移动开始索引时,开始索引会继续进行。结束索引总是标记读取到缓冲区中的字节数。

填充缓冲区

如果对hasAvailableBytes()的调用检测到缓冲区中没有足够的可用字节,它将尝试填充缓冲区。以下是完整的hasAvailableBytes()方法实现,因此您可以查看它的工作原理:

image.png

方法的作用如下:

  1. 检查缓冲区是否有所需的可用字节数。
  2. 如果没有,请检查流是否有更多的数据。
  3. 如果流有更多的数据,请检查缓冲区底部是否有足够数据的空间。
  4. 如果缓冲区没有足够的空间,则缓冲区将压缩。
  5. 缓冲区中装满了尽可能多的数据。

将数据填充到缓冲区中的方法由fillDataFromStreamIntoBuffer()完成,如下所示:

image.png

如您所见,此方法首先将尽可能多的数据读取到缓冲区中,并在内部存储字节数。然后,它使用读取的字节数来增加结束索引。

实际上,这种方法也许应该将数据读取到while-loop中的缓冲区中,这样它就会继续将数据读取到缓冲区中,直到缓冲区中有足够的字节。现在,它只是假设单个读取将产生足够的字节。如果你愿意,你可以解决这个问题。

压缩缓冲区

每当缓冲区中没有足够的空间读取新数据时,都会压缩缓冲区。然后,缓冲区中剩余的任何数据(在开始索引和结束索引之间)都会移动到缓冲区的顶部。然后可以在底部再次填充缓冲区。以下是compact()方法的实现方式:

image.png

完整代码列表

以下是RollingBufferInputStream的完整代码列表:

package com.jenkov.rsync;

import java.io.InputStream;
import java.io.IOException;

public class RollingBufferInputStream {

  InputStream source    = null;

  protected byte[]      buffer    = null;
  protected int         start     = 0; //current location in buffer.
  protected int         end       = 0; //current limit of data read
                                       //into the buffer
                                       //= next element to read into.

  protected int         bytesRead = 0;

  public RollingBufferInputStream(InputStream source, byte[] buffer) {
    this.source = source;
    this.buffer = buffer;
  }

  public byte[] getBuffer() {
    return buffer;
  }

  public int getStart() {
    return start;
  }

  public int getEnd() {
    return end;
  }

  public void moveStart(int noOfBytesToMove){
    if(this.start + noOfBytesToMove > this.end){
      throw new RuntimeException(
        "Attempt to move buffer 'start' beyond 'end'. start= "
        + this.start + ", end: " + this.end + ", bytesToMove: "
        + noOfBytesToMove);
      }
    this.start += noOfBytesToMove;
  }

  public int availableBytes() {
    return this.end - this.start;
  }

  public boolean hasAvailableBytes(int numberOfBytes) throws IOException {
    if(! hasAvailableBytesInBuffer(numberOfBytes)){
      if(streamHasMoreData()){
        if(!bufferHasSpaceFor(numberOfBytes)){
          compact();
         }
         fillDataFromStreamIntoBuffer();
      }
    }

    return hasAvailableBytesInBuffer(numberOfBytes);
  }

  private void fillDataFromStreamIntoBuffer() throws IOException {
    this.bytesRead  = this.source.read(this.buffer, this.end,
                      this.buffer.length - this.end);
    this.end       += this.bytesRead;
  }

  private void compact() {
    int bytesToCopy = end - start;

      for(int i=0; i<bytesToCopy; i++){
        this.buffer[i] = this.buffer[start + i];
      }

      this.start = 0;
      this.end   = bytesToCopy;
  }

  private boolean bufferHasSpaceFor(int numberOfBytes) {
    return (this.buffer.length - this.start) >= numberOfBytes;
  }

  public boolean streamHasMoreData() {
    return this.bytesRead > -1;
  }

  private boolean hasAvailableBytesInBuffer(int numberOfBytes) {
    return   (this.end - this.start) >= numberOfBytes;
  }

}
复制代码