Executor 端进程间通信和序列化

对于 Spark 内置的算子,在 Python 中调用 RDD、DataFrame 的接口后,从上文可以看出会通过 JVM 去调用到 Scala 的接口,最后执行和直接使用 Scala 并无区别。而 对于需要使用 UDF 的情形,在 Executor 端就需要启动一个 Python worker 子进程,然后执行 UDF 的逻辑。那么 Spark 是怎样判断需要启动子进程的呢?

在 Spark 编译用户的 DAG 的时候,Catalyst Optimizer 会创建 BatchEvalPython 或者 ArrowEvalPython 这样的 Logical Operator,随后会被转换成 PythonEvals 这个 Physical Operator。在 PythonEvals(https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala)中:

object PythonEvals extends Strategy {
  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    case ArrowEvalPython(udfs, output, child, evalType) =>
      ArrowEvalPythonExec(udfs, output, planLater(child), evalType) :: Nil
    case BatchEvalPython(udfs, output, child) =>
      BatchEvalPythonExec(udfs, output, planLater(child)) :: Nil
    case _ =>

创建了 ArrowEvalPythonExec 或者 BatchEvalPythonExec,而这二者内部会创建 ArrowPythonRunner、PythonUDFRunner 等类的对象实例,并调用了它们的 compute 方法。由于它们都继承了 BasePythonRunner,基类的 compute 方法中会去启动 Python 子进程:

def compute(
      inputIterator: Iterator[IN],
      partitionIndex: Int,
      context: TaskContext): Iterator[OUT] = {
  // ......
  val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
  // Start a thread to feed the process input from our parent's iterator
  val writerThread = newWriterThread(env, worker, inputIterator, partitionIndex, context)
  val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
  val stdoutIterator = newReaderIterator(
    stream, writerThread, startTime, env, worker, releasedOrClosed, context)
  new InterruptibleIterator(context, stdoutIterator)

这里 env.createPythonWorker 会通过 PythonWorkerFactory

(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala)去启动 Python 进程。

Executor 端启动 Python 子进程后,会创建一个 socket 与 Python 建立连接。所有 RDD 的数据都要序列化后,通过 socket 发送,而结果数据需要同样的方式序列化传回 JVM。

对于直接使用 RDD 的计算,或者没有开启 spark.sql.execution.arrow.enabled 的 DataFrame,是将输入数据按行发送给 Python,可想而知,这样效率极低。

在 Spark 2.2 后提供了基于 Arrow 的序列化、反序列化的机制(从 3.0 起是默认开启),从 JVM 发送数据到 Python 进程的代码在 sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala。这个类主要是重写了 newWriterThread 这个方法,使用了 ArrowWriter 向 socket 发送数据:

val arrowWriter = ArrowWriter.create(root)
val writer = new ArrowStreamWriter(root, null, dataOut)
while (inputIterator.hasNext) {
val nextBatch = inputIterator.next()
while (nextBatch.hasNext) {

可以看到,每次取出一个 batch,填充给 ArrowWriter,实际数据会保存在 root 对象中,然后由 ArrowStreamWriter 将 root 对象中的整个 batch 的数据写入到 socket 的 DataOutputStream 中去。ArrowStreamWriter 会调用 writeBatch 方法去序列化消息并写数据,代码参考 ArrowWriter.java#L131。

protected ArrowBlock writeRecordBatch(ArrowRecordBatch batch) throws IOException {
  ArrowBlock block = MessageSerializer.serialize(out, batch, option);
  LOGGER.debug("RecordBatch at {}, metadata: {}, body: {}",
      block.getOffset(), block.getMetadataLength(), block.getBodyLength());
  return block;

在 MessageSerializer 中,使用了 flatbuffer 来序列化数据。flatbuffer 是一种比较高效的序列化协议,它的主要优点是反序列化的时候,不需要解码,可以直接通过裸 buffer 来读取字段,可以认为反序列化的开销为零。我们来看看 Python 进程收到消息后是如何反序列化的。

Python 子进程实际上是执行了 worker.py 的 main 函数 (python/pyspark/worker.py):

if __name__ == '__main__':
    # Read information about how to connect back to the JVM from the environment.
    java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"])
    auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"]
    (sock_file, _) = local_connect_and_auth(java_port, auth_secret)
    main(sock_file, sock_file)

这里会去向 JVM 建立连接,并从 socket 中读取指令和数据。对于如何进行序列化、反序列化,是通过 UDF 的类型来区分:

eval_type = read_int(infile)
if eval_type == PythonEvalType.NON_UDF:
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)

在 read_udfs 中,如果是 PANDAS 类的 UDF,会创建 ArrowStreamPandasUDFSerializer,其余的 UDF 类型创建 BatchedSerializer。我们来看看 ArrowStreamPandasUDFSerializer(python/pyspark/serializers.py):

def dump_stream(self, iterator, stream):
    import pyarrow as pa
    writer = None
        for batch in iterator:
            if writer is None:
                writer = pa.RecordBatchStreamWriter(stream, batch.schema)
        if writer is not None:
def load_stream(self, stream):
    import pyarrow as pa
    reader = pa.ipc.open_stream(stream)
    for batch in reader:
        yield batch

可以看到,这里双向的序列化、反序列化,都是调用了 PyArrow 的 ipc 的方法,和前面看到的 Scala 端是正好对应的,也是按 batch 来读写数据。对于 Pandas 的 UDF,读到一个 batch 后,会将 Arrow 的 batch 转换成 Pandas Series。

def arrow_to_pandas(self, arrow_column):
    from pyspark.sql.types import _check_series_localize_timestamps
    # If the given column is a date type column, creates a series of datetime.date directly
    # instead of creating datetime64[ns] as intermediate data to avoid overflow caused by
    # datetime64[ns] type handling.
    s = arrow_column.to_pandas(date_as_object=True)
    s = _check_series_localize_timestamps(s, self._timezone)
    return s
def load_stream(self, stream):
    Deserialize ArrowRecordBatches to an Arrow table and return as a list of pandas.Series.
    batches = super(ArrowStreamPandasSerializer, self).load_stream(stream)
    import pyarrow as pa
    for batch in batches:
        yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]

Pandas UDF

前面我们已经看到,PySpark 提供了基于 Arrow 的进程间通信来提高效率,那么对于用户在 Python 层的 UDF,是不是也能直接使用到这种高效的内存格式呢?答案是肯定的,这就是 PySpark 推出的 Pandas UDF。区别于以往以行为单位的 UDF,Pandas UDF 是以一个 Pandas Series 为单位,batch 的大小可以由 spark.sql.execution.arrow.maxRecordsPerBatch 这个参数来控制。这是一个来自官方文档的示例:

def multiply_func(a, b):
    return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
df.select(multiply(col("x"), col("x"))).show()

上文已经解析过,PySpark 会将 DataFrame 以 Arrow 的方式传递给 Python 进程,Python 中会转换为 Pandas Series,传递给用户的 UDF。在 Pandas UDF 中,可以使用 Pandas 的 API 来完成计算,在易用性和性能上都得到了很大的提升。



