将Cassandra数据加载到Dask Dataframe中
我正在尝试将cassandra数据库中的数据加载到Dask数据框中。我尝试查询以下内容均未成功:
I am trying to load data from a cassandra database into a Dask dataframe. I have tried querying the following with no success:
query="""SELECT * FROM document_table"""
df = man.session.execute(query)
df = dd.DataFrame(list(df))
TypeError Traceback (most recent call last)
<ipython-input-135-021507f6f2ab> in <module>()
----> 1 a = dd.DataFrame(list(df))
TypeError: __init__() missing 3 required positional arguments: 'name', 'meta', and 'divisions'
有人知道一种简单的方法将数据直接从Cassandra加载到Dask吗?
Does anybody know an easy way to load data directly from Cassandra into Dask? It is too much memory too load into pandas first.
您的代码存在一些问题:
Some problems with your code:
-
行
df =
可能会将整个数据集加载到内存中。 Dask在这里没有被调用,它在其中没有任何作用。知道Cassandra驱动程序的人可以确认这一点。
the line
df =
presumably loads the whole data-set into memory. Dask is not invoked here, it plays no part in this. Someone with knowledge of the Cassandra driver can confirm this.
list(df)
会生成一个列表。数据框的列名,并删除所有数据
list(df)
produces a list of the column names of a dataframe and drops all the data
dd.DataFrame
,如果您阅读了文档不是这样构造的。
dd.DataFrame
, if you read the docs is not constructed like this.
您可能想做的是:a)创建一个返回数据分区的函数,b)延迟此函数并使用分区的各种值进行调用c)使用 dd.from_delayed
来创建dask数据帧。例如,假设该表具有字段 partfield
,该字段方便地具有值1..6,并且每个分区的行数相似:
What you probably want to do is a) make a function that returns one partition of the data, b) delay this function and call with the various values of the partitions c) use dd.from_delayed
to make the dask dataframe. E.g., assuming the table has a field partfield
which handily has possible values 1..6 and similar number of rows for each partition:
@dask.delayed
def part(x):
session = # construct Cassandra session
q = "SELECT * FROM document_table WHERE partfield={}".format(x)
df = man.session.execute(query)
return dd.DataFrame(list(df))
parts = [part(x) for x in range(1, 7)]
df = dd.from_delayed(parts)