Spark源码翻阅笔记之BlockObjectWriter

Spark源码阅读笔记之BlockObjectWriter

Spark源码阅读笔记之BlockObjectWriter

Spark中Hash Shuffle阶段能将多个map的结果合并到一个文件,以减少文件的数量,主要依赖于BlockObjectWriterBlockObjectWriter是一个接口,用来直接操作Block对应的存储容器(目前只支持磁盘存储,即只能将数据添加到Block对应的磁盘文件中),可以直接向存储容器中添加数据,从而实现向相应的Block中添加数据的操作。一个存储容器(如一个文件)可以对应多个Block,这样就将多个Block合并到一个文件中,从而减少了文件的数量,而每个Block则对应该存储容器中一段连续的数据段。BlockObjectWriter支持回滚,因此在添加数据出错时,可以将数据回滚,以保证原子性,但该接口不支持并发操作。BlockObjectWriter只有一个实现:DiskBlockObjectWriter

An interface for writing JVM objects to some underlying storage. This interface allows appending data to an existing block, and can guarantee atomicity in the case of faults as it allows the caller to revert partial writes.
This interface does not support concurrent writes. Also, once the writer has been opened, it cannot be reopened again.

BlockObjectWriter的方法

  • open(): BlockObjectWriter
    打开输入流

  • close()
    关闭输入流

  • isOpen: Boolean
    判断是否打开

  • commitAndClose(): Unit
    提交缓冲中的内容,并把写入的内容对应到相应的Block上

    Flush the partial writes and commit them as a single atomic block.

  • revertPartialWritesAndClose()
    撤销所有的写入操作,将文件中的内容恢复到写入数据之前。

    Reverts writes that haven’t been flushed yet. Callers should invoke this function when there are runtime exceptions. This method will not throw, though it may be unsuccessful in truncating written data.

  • write(value: Any)
    写入一条数据。

  • fileSegment(): FileSegment
    返回FileSegment,该方法只有在commitAndClose方法调用之后才有效。

    Returns the file segment of committed data that this Writer has written.This is only valid after commitAndClose() has been called.

FileSegment表示文件的一个连续的片段:

References a particular segment of a file (potentially the entire file), based off an offset and a length.

class FileSegment(val file: File, val offset: Long, val length: Long)

BlockObjectWriter只有一个实现:DiskBlockObjectWriterDiskBlockObjectWriter直接操作Block对应的文件,将内容写入到文件中,从而实现向Block中添加内容的操作。该类可以将多个Block写入到一个文件中,从而减少文件的数量。

DiskBlockObjectWriter的初始化方法

class DiskBlockObjectWriter(
    blockId: BlockId,
    file: File,
    serializer: Serializer,
    bufferSize: Int,
    compressStream: OutputStream => OutputStream,
    syncWrites: Boolean,
    writeMetrics: ShuffleWriteMetrics)
  extends BlockObjectWriter(blockId)

DiskBlockObjectWriter的属性

  • blockId: BlockId
    DiskBlockObjectWriter对应的BlockId

  • file: File
    Block对应的文件

  • serializer: Serializer
    序列化方法

  • bufferSize: Int
    缓冲大小

  • compressStream: OutputStream => OutputStream
    压缩算法

  • syncWrites: Boolean
    是否同步写入

  • numRecordsWritten
    写入的记录数

  • initialPosition
    Block在File中开始的位置,不变量,值为file.length(),即位File中已经被其他Block写入的数据量

  • finalPosition
    Block在File中结束的位置,初始值为-1,当调用commitAndClose方法时更新为当前File的大小,然后不可再改变。

  • reportedPosition
    当前数据写入的位置,初始值为initialPosition,每写入32条数据时更新为channel.position()

  • initialized:Boolean
    DiskBlockObjectWriter是否初始化

  • hasBeenClosed:Boolean
    DiskBlockObjectWriter是否关闭

  • channel: FileChannel
    fos.getChannel()

  • bs: OutputStream
    compressStream(new BufferedOutputStream(ts, bufferSize))

  • fos: FileOutputStream
    new FileOutputStream(file, true)

  • ts: TimeTrackingOutputStream
    new TimeTrackingOutputStream(fos)

  • objOut: SerializationStream
    serializer.newInstance().serializeStream(bs)

TimeTrackingOutputStream记录每条数据写入花费的时间

/** Intercepts write calls and tracks total time spent writing. Not thread safe. */
  private class TimeTrackingOutputStream(out: OutputStream) extends OutputStream {
    def write(i: Int): Unit = callWithTiming(out.write(i))
    override def write(b: Array[Byte]) = callWithTiming(out.write(b))
    override def write(b: Array[Byte], off: Int, len: Int) = callWithTiming(out.write(b, off, len))
    override def close() = out.close()
    override def flush() = out.flush()
  }

其中callWithTiming方法:

private def callWithTiming(f: => Unit) = {
    val start = System.nanoTime()
    f
    writeMetrics.incShuffleWriteTime(System.nanoTime() - start)
  }

DiskBlockObjectWriter的方法

  • open(): BlockObjectWriter
    初始化各个输出流
override def open(): BlockObjectWriter = {
    if (hasBeenClosed) {
      throw new IllegalStateException("Writer already closed. Cannot be reopened.")
    }
    fos = new FileOutputStream(file, true)
    ts = new TimeTrackingOutputStream(fos)
    channel = fos.getChannel()
    bs = compressStream(new BufferedOutputStream(ts, bufferSize))
    objOut = serializer.newInstance().serializeStream(bs)
    initialized = true
    this
  }
  • close()
    关闭输出流
override def close() {
    if (initialized) {
      if (syncWrites) {
        // Force outstanding writes to disk and track how long it takes
        objOut.flush()
        def sync = fos.getFD.sync()
        callWithTiming(sync)
      }
      objOut.close()

      channel = null
      bs = null
      fos = null
      ts = null
      objOut = null
      initialized = false
      hasBeenClosed = true
    }
  }
  • isOpen: Boolean = objOut != null
    是否打开

  • commitAndClose(): Unit
    提交缓冲中的内容,并更新finalPosition

override def commitAndClose(): Unit = {
    if (initialized) {
      // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
      //       serializer stream and the lower level stream.
      objOut.flush()
      bs.flush()
      close()
    }
    finalPosition = file.length()
    // In certain compression codecs, more bytes are written after close() is called
    writeMetrics.incShuffleBytesWritten(finalPosition - reportedPosition)
  }
  • revertPartialWritesAndClose()
    回滚,抛弃当前Block写入的内容。
// Discard current writes. We do this by flushing the outstanding writes and then
  // truncating the file to its initial position.
  override def revertPartialWritesAndClose() {
    try {
      writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)
      writeMetrics.decShuffleRecordsWritten(numRecordsWritten)

      if (initialized) {
        objOut.flush()
        bs.flush()
        close()
      }

      val truncateStream = new FileOutputStream(file, true)
      try {
        truncateStream.getChannel.truncate(initialPosition)
      } finally {
        truncateStream.close()
      }
    } catch {
      case e: Exception =>
        logError("Uncaught exception while reverting partial writes to file " + file, e)
    }
  }
  • write(value: Any)
    写入一条数据
override def write(value: Any) {
    if (!initialized) {
      open()
    }

    objOut.writeObject(value)
    numRecordsWritten += 1
    writeMetrics.incShuffleRecordsWritten(1)

    if (numRecordsWritten % 32 == 0) {
      updateBytesWritten()
    }
  }
  • fileSegment(): FileSegment
    返回Block对应的文件数据段,必须在commitAndClose方法调 用之后才有效
override def fileSegment(): FileSegment = {
    new FileSegment(file, initialPosition, finalPosition - initialPosition)
  }

版权声明:本文为博主原创文章,未经博主允许不得转载。