大 numpy 矩阵作为数据流侧输入
我正在尝试用 Python 编写一个 Dataflow 管道,它需要一个大的 numpy 矩阵作为辅助输入.矩阵保存在云存储中.理想情况下,每个 Dataflow 工作器都会直接从云存储中加载矩阵.
I'm trying to write a Dataflow pipeline in Python that requires a large numpy matrix as a side input. The matrix is saved in cloud storage. Ideally, each Dataflow worker would load the matrix directly from cloud storage.
我的理解是,如果我说matrix = np.load(LOCAL_PATH_TO_MATRIX)
,然后
My understanding is that if I say matrix = np.load(LOCAL_PATH_TO_MATRIX)
, and then
p | "computation" >> beam.Map(computation, matrix)
矩阵从我的笔记本电脑运送到每个 Datflow 工作人员.
the matrix get shipped from my laptop to each Datflow worker.
我该如何指导每个工作人员直接从云存储中加载矩阵?是否有二进制 blob"的光束源?
How could I instead direct each worker to load the matrix directly from cloud storage? Is there a beam source for "binary blob"?
你的方法是正确的.
在这种情况下,Dataflow 所做的是将 NumPy 矩阵作为侧输入进行处理.这意味着它从您的机器上传到服务一次,Dataflow 服务会将其发送给每个工作人员.
What Dataflow does, in this case, is handle the NumPy matrix as a side input. This means that it's uploaded once from your machine to the service, and the Dataflow service will send it to each worker.
鉴于矩阵很大,这将使您的工作人员使用 I/O 从服务接收它,并承担将整个矩阵保存在内存中的负担,但它应该可以工作.
Given that the matrix is large, this will make your workers use I/O to receive it from the service, and carry the burden of keeping the whole matrix in memory, but it should work.
如果您想避免在您的机器中计算/加载矩阵,您可以将您的矩阵作为文本文件上传到 GCS,读入该文件并获取矩阵.你可以这样做:
If you want to avoid computing/loading the matrix in your machine, you can upload your matrix to GCS as a text file, read that file in, and obtain the matrix. You can do something like so:
matrix_file = 'gs://mybucket/my/matrix'
p | beam.ParDo(ComputationDoFn(matrix_file))
而您的 DoFn 可能类似于:
And your DoFn could be something like:
class ComputationDoFn(beam.DoFn):
def __init__(self, matrix_file):
self._matrix_file = matrix_file
self._matrix = None
def start_bundle(self, element):
# We check because one DoFn instance may be reused
# for different bundles.
if self._matrix is None:
self.load_matrix(self._matrix_file)
def process(self, element):
# Now process the element
def load_matrix(self, matrix_file):
# Load the file from GCS using the GCS API
我希望这是有道理的.如果您觉得需要更多帮助,我可以充实这些功能.
I hope this makes sense. I can flesh up the functions if you feel like you need some more help.