Python 数学应用(四)
原文:zh.annas-archive.org/md5/123a7612a4e578f6816d36f968cfec22
译者:飞龙
协议:CC BY-NC-SA 4.0
第十一章:其他主题
在本章中,我们将讨论一些在本书前几章中没有涉及的主题。这些主题大多涉及不同的计算方式以及优化代码执行的其他方式。其他主题涉及处理特定类型的数据或文件格式。
在前两个内容中,我们将介绍帮助跟踪计算中的单位和不确定性的软件包。这些对于涉及具有直接物理应用的数据的计算非常重要。在下一个内容中,我们将讨论如何从 NetCDF 文件加载和存储数据。NetCDF 通常用于存储天气和气候数据的文件格式。(NetCDF 代表网络通用数据格式。)在第四个内容中,我们将讨论处理地理数据,例如可能与天气或气候数据相关的数据。之后,我们将讨论如何可以在不必启动交互式会话的情况下从终端运行 Jupyter 笔记本。接下来的两个内容涉及验证数据和处理从 Kafka 服务器流式传输的数据。我们最后两个内容涉及两种不同的方式,即使用诸如 Cython 和 Dask 等工具来加速我们的代码。
在本章中,我们将涵盖以下内容:
-
使用 Pint 跟踪单位
-
在计算中考虑不确定性
-
从 NetCDF 文件加载和存储数据
-
处理地理数据
-
将 Jupyter 笔记本作为脚本执行
-
验证数据
-
处理数据流
-
使用 Cython 加速代码
-
使用 Dask 进行分布式计算
让我们开始吧!
技术要求
由于本章包含的内容的性质,需要许多不同的软件包。我们需要的软件包列表如下:
-
Pint
-
不确定性
-
NetCDF4
-
xarray
-
GeoPandas
-
Geoplot
-
Papermill
-
Cerberus
-
Faust
-
Cython
-
Dask
所有这些软件包都可以使用您喜欢的软件包管理器(如pip
)进行安装:
python3.8 -m pip install pint uncertainties netCDF4 xarray geopandas
geoplot papermill cerberus faust cython
安装 Dask 软件包,我们需要安装与软件包相关的各种额外功能。我们可以在终端中使用以下pip
命令来执行此操作:
python3.8 -m pip install dask[complete]
除了这些 Python 软件包,我们还需要安装一些支持软件。对于处理地理数据的内容,GeoPandas 和 Geoplot 库可能需要单独安装许多低级依赖项。详细说明在 GeoPandas 软件包文档中给出,网址为geopandas.org/install.html。
对于处理数据流的内容,我们需要安装 Kafka 服务器。如何安装和运行 Kafka 服务器的详细说明可以在 Apache Kafka 文档页面上找到,网址为kafka.apache.org/quickstart。
对于Cython 加速代码的内容,我们需要安装 C 编译器。如何获取GNU C 编译器(GCC)的说明在 Cython 文档中给出,网址为cython.readthedocs.io/en/latest/src/quickstart/install.html。
本章的代码可以在 GitHub 存储库的Chapter 10
文件夹中找到,网址为github.com/PacktPublishing/Applying-Math-with-Python/tree/master/Chapter%2010。
查看以下视频以查看代码的实际操作:bit.ly/2ZMjQVw。
使用 Pint 跟踪单位
在计算中正确跟踪单位可能非常困难,特别是在可以使用不同单位的地方。例如,很容易忘记在不同单位之间进行转换 – 英尺/英寸转换成米 – 或者公制前缀 – 比如将 1 千米转换成 1,000 米。
在这个内容中,我们将学习如何使用 Pint 软件包来跟踪计算中的测量单位。
准备工作
对于这个示例,我们需要 Pint 包,可以按如下方式导入:
import pint
如何做…
以下步骤向您展示了如何使用 Pint 包在计算中跟踪单位:
- 首先,我们需要创建一个
UnitRegistry
对象:
ureg = pint.UnitRegistry(system="mks")
- 要创建带有单位的数量,我们将数字乘以注册对象的适当属性:
distance = 5280 * ureg.feet
- 我们可以使用其中一种可用的转换方法更改数量的单位:
print(distance.to("miles"))
print(distance.to_base_units())
print(distance.to_base_units().to_compact())
这些print
语句的输出如下:
0.9999999999999999 mile
1609.3439999999998 meter
1.6093439999999999 kilometer
- 我们包装一个例程,使其期望以秒为参数并输出以米为结果:
@ureg.wraps(ureg.meter, ureg.second)
def calc_depth(dropping_time):
# s = u*t + 0.5*a*t*t
# u = 0, a = 9.81
return 0.5*9.81*dropping_time*dropping_time
- 现在,当我们使用分钟单位调用
calc_depth
例程时,它会自动转换为秒进行计算:
depth = calc_depth(0.05 * ureg.minute)
print("Depth", depth)
# Depth 44.144999999999996 meter
它是如何工作的…
Pint 包为数字类型提供了一个包装类,为类型添加了单位元数据。这个包装类型实现了所有标准的算术运算,并在这些计算过程中跟踪单位。例如,当我们将长度单位除以时间单位时,我们将得到速度单位。这意味着您可以使用 Pint 来确保在复杂计算后单位是正确的。
UnitRegistry
对象跟踪会话中存在的所有单位,并处理不同单位类型之间的转换等问题。它还维护一个度量参考系统,在这个示例中是标准国际系统,以米、千克和秒作为基本单位,表示为mks
。
wrap
功能允许我们声明例程的输入和输出单位,这允许 Pint 对输入函数进行自动单位转换-在这个示例中,我们将分钟转换为秒。尝试使用没有关联单位或不兼容单位的数量调用包装函数将引发异常。这允许对参数进行运行时验证,并自动转换为例程的正确单位。
还有更多…
Pint 包带有一个大型的预设测量单位列表,涵盖了大多数全球使用的系统。单位可以在运行时定义或从文件加载。这意味着您可以定义特定于您正在使用的应用程序的自定义单位或单位系统。
单位也可以在不同的上下文中使用,这允许在不同单位类型之间轻松转换,这些单位类型通常是不相关的。这可以在需要在计算的多个点之间流畅地移动单位的情况下节省大量时间。
在计算中考虑不确定性
大多数测量设备并不是 100%准确的,通常只能准确到一定程度,通常在 0 到 10%之间。例如,温度计可能准确到 1%,而一对数字卡尺可能准确到 0.1%。在这两种情况下,报告的数值不太可能是真实值,尽管它会非常接近。跟踪数值的不确定性是困难的,特别是当您有多种不同的不确定性以不同的方式组合在一起时。与其手动跟踪这些,最好使用一个一致的库来为您完成。这就是uncertainties
包的作用。
在这个示例中,我们将学习如何量化变量的不确定性,并看到这些不确定性如何通过计算传播。
准备工作
对于这个示例,我们将需要uncertainties
包,我们将从中导入ufloat
类和umath
模块:
from uncertainties import ufloat, umath
如何做…
以下步骤向您展示了如何在计算中对数值的不确定性进行量化:
- 首先,我们创建一个不确定的浮点值为
3.0
加减0.4
:
seconds = ufloat(3.0, 0.4)
print(seconds) # 3.0+/-0.4
- 接下来,我们进行涉及这个不确定值的计算,以获得一个新的不确定值:
depth = 0.5*9.81*seconds*seconds
print(depth) # 44+/-12
- 接下来,我们创建一个新的不确定浮点值,并在与之前计算相反的方向上应用
umath
模块的sqrt
例程:
other_depth = ufloat(44, 12)
time = umath.sqrt(2.0*other_depth/9.81)
print("Estimated time", time)
# Estimated time 3.0+/-0.4
它是如何工作的…
ufloat
类包装了float
对象,并在整个计算过程中跟踪不确定性。该库利用线性误差传播理论,使用非线性函数的导数来估计计算过程中传播的误差。该库还正确处理相关性,因此从自身减去一个值会得到 0,没有误差。
要跟踪标准数学函数中的不确定性,您需要使用umath
模块中提供的版本,而不是 Python 标准库或第三方包(如 NumPy)中定义的版本。
还有更多…
uncertainties
包支持 NumPy,并且前面示例中提到的 Pint 包可以与不确定性结合使用,以确保正确地将单位和误差边界归因于计算的最终值。例如,我们可以从本示例的步骤 2中计算出计算的单位,如下所示:
import pint
from uncertainties import ufloat
g = 9.81*ureg.meters / ureg.seconds ** 2
seconds = ufloat(3.0, 0.4) * ureg.seconds
depth = 0.5*g*seconds**2
print(depth)
如预期的那样,最后一行的print
语句给出了我们预期的44+/-12 米
。
从 NetCDF 文件加载和存储数据
许多科学应用程序要求我们以稳健的格式开始大量的多维数据。NetCDF 是天气和气候行业用于开发数据的格式的一个例子。不幸的是,数据的复杂性意味着我们不能简单地使用 Pandas 包的实用程序来加载这些数据进行分析。我们需要netcdf4
包来能够读取和导入数据到 Python 中,但我们还需要使用xarray
。与 Pandas 库不同,xarray
可以处理更高维度的数据,同时仍提供类似于 Pandas 的接口。
在这个示例中,我们将学习如何从 NetCDF 文件中加载数据并存储数据。
准备就绪
对于这个示例,我们需要导入 NumPy 包作为np
,Pandas 包作为pd
,Matplotlib pyplot
模块作为plt
,以及从 NumPy 导入默认随机数生成器的实例:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from numpy.random import default_rng
rng = default_rng(12345)
我们还需要导入xarray
包并使用别名xr
。您还需要安装 Dask 包,如“技术要求”部分所述,以及 NetCDF4 包:
import xarray as xr
我们不需要直接导入这两个包。
操作方法…
按照以下步骤加载和存储样本数据到 NetCDF 文件中:
- 首先,我们需要创建一些随机数据。这些数据包括一系列日期、位置代码列表和随机生成的数字:
dates = pd.date_range("2020-01-01", periods=365, name="date")
locations = list(range(25))
steps = rng.normal(0, 1, size=(365,25))
accumulated = np.add.accumulate(steps)
- 接下来,我们创建一个包含数据的 xarray
Dataset
对象。日期和位置是索引,而steps
和accumulated
变量是数据:
data_array = xr.Dataset({
"steps": (("date", "location"), steps),
"accumulated": (("date", "location"), accumulated)
},
{"location": locations, "date": dates}
)
print(data_array)
print
语句的输出如下所示:
<xarray.Dataset>
Dimensions: (date: 365, location: 25)
Coordinates:
* location (location) int64 0 1 2 3 4 5 6 7 8 ... 17 18 19 20 21 22 23 24
* date (date) datetime64[ns] 2020-01-01 2020-01-02 ... 2020-12-30
Data variables:
steps (date, location) float64 geoplot.pointplot(cities, ax=ax, fc="r", marker="2")
ax.axis((-180, 180, -90, 90))-1.424 1.264 ... -0.4547 -0.4873
accumulated (date, location) float64 -1.424 1.264 -0.8707 ... 8.935 -3.525
- 接下来,我们计算每个时间索引处所有位置的平均值:
means = data_array.mean(dim="location")
- 现在,我们在新的坐标轴上绘制平均累积值:
fig, ax = plt.subplots()
means["accumulated"].to_dataframe().plot(ax=ax)
ax.set(title="Mean accumulated values", xlabel="date", ylabel="value")
生成的绘图如下所示:
图 10.1:随时间累积平均值的绘图
- 使用
to_netcdf
方法将此数据集保存到新的 NetCDF 文件中:
data_array.to_netcdf("data.nc")
- 现在,我们可以使用
xarray
的load_dataset
例程加载新创建的 NetCDF 文件:
new_data = xr.load_dataset("data.nc")
print(new_data)
前面代码的输出如下所示:
<xarray.Dataset>
Dimensions: (date: 365, location: 25)
Coordinates:
* location (location) int64 0 1 2 3 4 5 6 7 8 ... 17 18 19 20 21 22 23 24
* date (date) datetime64[ns] 2020-01-01 2020-01-02 ... 2020-12-30
Data variables:
steps (date, location) float64 -1.424 1.264 ... -0.4547 -0.4873
accumulated (date, location) float64 -1.424 1.264 -0.8707 ... 8.935 -3.525
工作原理…
xarray
包提供了DataArray
和DataSet
类,它们(粗略地说)是 PandasSeries
和DataFrame
对象的多维等价物。在本例中,我们使用数据集,因为每个索引(日期和位置的元组)都与两个数据相关联。这两个对象都暴露了与它们的 Pandas 等价物类似的接口。例如,我们可以使用mean
方法沿着其中一个轴计算平均值。DataArray
和DataSet
对象还有一个方便的方法,可以将其转换为 PandasDataFrame
,称为to_dataframe
。我们在这个示例中使用它将其转换为DataFrame
进行绘图,这并不是真正必要的,因为xarray
内置了绘图功能。
这个配方的真正重点是to_netcdf
方法和load_dataset
例程。前者将DataSet
存储在 NetCDF 格式文件中。这需要安装 NetCDF4 包,因为它允许我们访问相关的 C 库来解码 NetCDF 格式的文件。load_dataset
例程是一个通用的例程,用于从各种文件格式(包括 NetCDF,这同样需要安装 NetCDF4 包)将数据加载到DataSet
对象中。
还有更多…
xarray
包支持除 NetCDF 之外的许多数据格式,如 OPeNDAP、Pickle、GRIB 和 Pandas 支持的其他格式。
处理地理数据
许多应用涉及处理地理数据。例如,当跟踪全球天气时,我们可能希望在地图上以各种传感器在世界各地的位置测量的温度为例进行绘图。为此,我们可以使用 GeoPandas 包和 Geoplot 包,这两个包都允许我们操纵、分析和可视化地理数据。
在这个配方中,我们将使用 GeoPandas 和 Geoplot 包来加载和可视化一些样本地理数据。
准备工作
对于这个配方,我们需要 GeoPandas 包,Geoplot 包和 Matplotlib 的pyplot
包作为plt
导入:
import geopandas
import geoplot
import matplotlib.pyplot as plt
如何做…
按照以下步骤,使用样本数据在世界地图上创建首都城市的简单绘图:
- 首先,我们需要从 GeoPandas 包中加载样本数据,其中包含世界地理信息:
world = geopandas.read_file(
geopandas.datasets.get_path("naturalearth_lowres")
)
- 接下来,我们需要加载包含世界各个首都城市名称和位置的数据:
cities = geopandas.read_file(
geopandas.datasets.get_path("naturalearth_cities")
)
- 现在,我们可以创建一个新的图形,并使用
polyplot
例程绘制世界地理的轮廓:
fig, ax = plt.subplots()
geoplot.polyplot(world, ax=ax)
- 最后,我们使用
pointplot
例程在世界地图上添加首都城市的位置。我们还设置轴限制,以使整个世界可见:
geoplot.pointplot(cities, ax=ax, fc="r", marker="2")
ax.axis((-180, 180, -90, 90))
结果绘制的世界各国首都城市的位置如下:
图 10.2:世界首都城市在地图上的绘图
工作原理…
GeoPandas 包是 Pandas 的扩展,用于处理地理数据,而 Geoplot 包是 Matplotlib 的扩展,用于绘制地理数据。GeoPandas 包带有一些我们在这个配方中使用的样本数据集。naturalearth_lowres
包含描述世界各国边界的几何图形。这些数据不是非常高分辨率,正如其名称所示,这意味着地理特征的一些细节可能在地图上不存在(一些小岛根本没有显示)。naturalearth_cities
包含世界各国首都城市的名称和位置。我们使用datasets.get_path
例程来检索包数据目录中这些数据集的路径。read_file
例程将数据导入 Python 会话。
Geoplot 包提供了一些专门用于绘制地理数据的附加绘图例程。polyplot
例程从 GeoPandas DataFrame 绘制多边形数据,该数据可能描述一个国家的地理边界。pointplot
例程从 GeoPandas DataFrame 在一组轴上绘制离散点,这种情况下描述了首都城市的位置。
将 Jupyter 笔记本作为脚本执行
Jupyter 笔记本是用于编写科学和数据应用的 Python 代码的流行媒介。 Jupyter 笔记本实际上是一个以JavaScript 对象表示(JSON)格式存储在带有ipynb
扩展名的文件中的块序列。每个块可以是多种不同类型之一,例如代码或标记。这些笔记本通常通过解释块并在后台内核中执行代码然后将结果返回给 Web 应用程序的 Web 应用程序访问。如果您在个人 PC 上工作,这很棒,但是如果您想在服务器上远程运行笔记本中包含的代码怎么办?在这种情况下,甚至可能无法访问 Jupyter 笔记本软件提供的 Web 界面。papermill 软件包允许我们从命令行参数化和执行笔记本。
在本教程中,我们将学习如何使用 papermill 从命令行执行 Jupyter 笔记本。
准备工作
对于本教程,我们需要安装 papermill 软件包,并且当前目录中需要有一个示例 Jupyter 笔记本。我们将使用本章的代码存储库中存储的sample.ipynb
笔记本文件。
如何做…
按照以下步骤使用 papermill 命令行界面远程执行 Jupyter 笔记本:
- 首先,我们从本章的代码存储库中打开样本笔记本
sample.ipynb
。笔记本包含三个代码单元格,其中包含以下代码:
import matplotlib.pyplot as plt
from numpy.random import default_rng
rng = default_rng(12345)
uniform_data = rng.uniform(-5, 5, size=(2, 100))
fig, ax = plt.subplots(tight_layout=True)
ax.scatter(uniform_data[0, :], uniform_data[1, :])
ax.set(title="Scatter plot", xlabel="x", ylabel="y")
- 接下来,我们在终端中打开包含 Jupyter 笔记本的文件夹并使用以下命令:
papermill --kernel python3 sample.ipynb output.ipynb
- 现在,我们打开输出文件
output.ipynb
,该文件现在应该包含已更新为执行代码结果的笔记本。在最终块中生成的散点图如下所示:
图 10.3:在远程使用 papermill 执行的 Jupyter 笔记本中生成的随机数据的散点图
它是如何工作的…
papermill 软件包提供了一个简单的命令行界面,用于解释和执行 Jupyter 笔记本,然后将结果存储在新的笔记本文件中。在本教程中,我们提供了第一个参数 - 输入笔记本文件 - sample.ipynb
和第二个参数 - 输出笔记本文件 - output.ipynb
。然后工具执行笔记本中包含的代码并生成输出。笔记本文件格式跟踪上次运行的结果,因此这些结果将添加到输出笔记本并存储在所需的位置。在本教程中,这是一个简单的本地文件,但是 papermill 也可以存储到云位置,例如Amazon Web Services(AWS)S3 存储或 Azure 数据存储。
在步骤 2中,我们在使用 papermill 命令行界面时添加了--kernel python3
选项。此选项允许我们指定用于执行 Jupyter 笔记本的内核。如果 papermill 尝试使用与用于编写笔记本的内核不同的内核执行笔记本,则可能需要这样做以防止错误。可以使用以下命令在终端中找到可用内核的列表:
jupyter kernelspec list
如果在执行笔记本时出现错误,您可以尝试切换到不同的内核。
还有更多…
Papermill 还具有 Python 接口,因此您可以从 Python 应用程序内执行笔记本。这对于构建需要能够在外部硬件上执行长时间计算并且结果需要存储在云中的 Web 应用程序可能很有用。它还具有向笔记本提供参数的能力。为此,我们需要在笔记本中创建一个标有默认值的参数标记的块。然后可以通过命令行界面使用-p
标志提供更新的参数,后跟参数的名称和值。
验证数据
数据通常以原始形式呈现,可能包含异常或不正确或格式不正确的数据,这显然会给后续处理和分析带来问题。通常最好在处理管道中构建验证步骤。幸运的是,Cerberus 包为 Python 提供了一个轻量级且易于使用的验证工具。
对于验证,我们必须定义一个模式,这是关于数据应该如何以及应该对数据执行哪些检查的技术描述。例如,我们可以检查类型并设置最大和最小值的边界。Cerberus 验证器还可以在验证步骤中执行类型转换,这使我们可以将直接从 CSV 文件加载的数据插入验证器中。
在这个示例中,我们将学习如何使用 Cerberus 验证从 CSV 文件加载的数据。
准备工作
对于这个示例,我们需要从 Python 标准库中导入csv
模块,以及 Cerberus 包:
import csv
import cerberus
我们还需要这一章的代码库中的sample.csv
文件。
如何做…
在接下来的步骤中,我们将使用 Cerberus 包从 CSV 中加载的一组数据进行验证:
- 首先,我们需要构建描述我们期望的数据的模式。为此,我们必须为浮点数定义一个简单的模式:
float_schema = {"type": "float", "coerce": float, "min": -1.0,
"max": 1.0}
- 接下来,我们为单个项目构建模式。这些将是我们数据的行:
item_schema = {
"type": "dict",
"schema": {
"id": {"type": "string"},
"number": {"type": "integer", "coerce": int},
"lower": float_schema,
"upper": float_schema,
}
}
- 现在,我们可以定义整个文档的模式,其中将包含一系列项目:
schema = {
"rows": {
"type": "list",
"schema": item_schema
}
}
- 接下来,我们使用刚刚定义的模式创建一个
Validator
对象:
validator = cerberus.Validator(schema)
- 然后,我们使用
csv
模块中的DictReader
加载数据:
with open("sample.csv") as f:
dr = csv.DictReader(f)
document = {"rows": list(dr)}
- 接下来,我们使用
Validator
上的validate
方法来验证文档:
validator.validate(document)
- 然后,我们从
Validator
对象中检索验证过程中的错误:
errors = validator.errors["rows"][0]
- 最后,我们可以打印出任何出现的错误消息:
for row_n, errs in errors.items():
print(f"row {row_n}: {errs}")
错误消息的输出如下:
row 11: [{'lower': ['min value is -1.0']}]
row 18: [{'number': ['must be of integer type', "field 'number' cannot be coerced: invalid literal for int() with base 10: 'None'"]}]
row 32: [{'upper': ['min value is -1.0']}]
row 63: [{'lower': ['max value is 1.0']}]
它是如何工作的…
我们创建的模式是对我们需要根据数据检查的所有标准的技术描述。这通常被定义为一个字典,其中项目的名称作为键,属性字典作为值,例如字典中的值的类型或值的边界。例如,在步骤 1中,我们为浮点数定义了一个模式,限制了数字的范围,使其在-1 和 1 之间。请注意,我们包括coerce
键,该键指定在验证期间应将值转换为的类型。这允许我们传入从 CSV 文档中加载的数据,其中只包含字符串,而不必担心其类型。
Validator
对象负责解析文档,以便对其进行验证,并根据模式描述的所有标准检查它们包含的数据。在这个示例中,我们在创建Validator
对象时向其提供了模式。但是,我们也可以将模式作为第二个参数传递给validate
方法。错误存储在一个嵌套字典中,其结构与文档的结构相似。
处理数据流
一些数据是从各种来源以恒定流的形式接收的。例如,我们可能会遇到多个温度探头通过 Kafka 服务器定期报告值的情况。Kafka 是一个流数据消息代理,根据主题将消息传递给不同的处理代理。
处理流数据是异步 Python 的完美应用。这使我们能够同时处理更大量的数据,这在应用程序中可能非常重要。当然,在异步上下文中我们不能直接对这些数据进行长时间的分析,因为这会干扰事件循环的执行。
使用 Python 的异步编程功能处理 Kafka 流时,我们可以使用 Faust 包。该包允许我们定义异步函数,这些函数将充当处理代理或服务,可以处理或以其他方式与来自 Kafka 服务器的数据流进行交互。
在这个食谱中,我们将学习如何使用 Faust 包来处理来自 Kafka 服务器的数据流。
准备工作
与本书中大多数食谱不同,由于我们将从命令行运行生成的应用程序,因此无法在 Jupyter 笔记本中运行此食谱。
对于这个食谱,我们需要导入 Faust 包:
import faust
我们还需要从 NumPy 包中运行默认随机数生成器的实例:
from numpy.random import default_rng
rng = default_rng(12345)
我们还需要在本地机器上运行 Kafka 服务的实例,以便我们的 Faust 应用程序可以与消息代理进行交互。
一旦您下载了 Kafka 并解压了下载的源代码,就导航到 Kafka 应用程序所在的文件夹。在终端中打开此文件夹。使用以下命令启动 ZooKeeper 服务器(适用于 Linux 或 Mac):
bin/zookeeper-server-start.sh config/zookeeper.properties
如果您使用 Windows,改用以下命令:
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
然后,在一个新的终端中,使用以下命令启动 Kafka 服务器(适用于 Linux 或 Mac):
bin/kafka-server-start.sh config/server.properties
如果您使用 Windows,改用以下命令:
bin\windows\kafka-server-start.bat config\server.properties
在每个终端中,您应该看到一些日志信息,指示服务器正在运行。
操作步骤…
按照以下步骤创建一个 Faust 应用程序,该应用程序将读取(和写入)数据到 Kafka 服务器并进行一些简单的处理:
- 首先,我们需要创建一个 Faust
App
实例,它将充当 Python 和 Kafka 服务器之间的接口:
app = faust.App("sample", broker="kafka://localhost")
- 接下来,我们将创建一个记录类型,模拟我们从服务器期望的数据:
class Record(faust.Record):
id_string: str
value: float
- 现在,我们将向 Faust
App
对象添加一个主题,将值类型设置为我们刚刚定义的Record
类:
topic = app.topic("sample-topic", value_type=Record)
- 现在,我们定义一个代理,这是一个包装在
App
对象上的agent
装饰器的异步函数:
@app.agent(topic)
async def process_record(records):
async for record in records:
print(f"Got {record
上一篇:
[备注] Dcoker 常用命令
下一篇:
大型语言模型摘要组织器(不定期更新)
推荐阅读
-
使用 Sphinx-4 语音识别引擎构建 Python Sphinx 文档:技术比较与应用实践
-
Python- 物联网编程实践(四)
-
快速排序的四种 Python 实现
-
数学和英语不好,学习 Python 难吗?看完这篇文章,相信你会坚定自己的选择!
-
贪婪算法在 Python、JavaScript、Java、C++ 和 C# 中的多种实现及其在硬币变化、分数骑士、活动选择和使用哈夫曼编码的最小生成树问题中的应用实例
-
python警告:工作簿不包含默认样式,应用openpyxl的默认警告
-
如何使用 Python 对图像应用扭曲效果
-
python 数据分析在电力行业中的应用 python 电影数据分析报告
-
图像取证 python 图像取证原理与应用
-
四种常见的应用程序弹出窗口设计:吐司、对话框、操作栏和小工具栏。